# PSTAT 135 Final Project: Data Preprocessing

In [None]:
# global imports
import pyspark
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as W
from pyspark.sql.types import StringType, ArrayType
# transformations
from pyspark.ml.feature import StringIndexer
# text transformations
import contractions
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from nltk.stem import WordNetLemmatizer

spark = SparkSession.builder.getOrCreate()

# Examine Dataset

In [None]:
# import data
tweets = spark.read.csv('Tweets.csv', header=True, inferSchema=True)
# output dataframe
tweets.toPandas().head(2)

In [None]:
# output dimensions of the dataset
print("Number of rows: ",tweets.count())
print("Number of columns: ",len(tweets.columns))

In [None]:
# output data types of each column
tweets.dtypes

# Clean Data

## Duplicates

- Drop exact duplicate rows
- Drop rows that differ only by `tweet_id` 

In [None]:
# number of distinct observations
tweets.distinct().count()

In [None]:
# drop duplicates
tweets = tweets.dropDuplicates()
# confirm they were dropped
tweets.count()

In [None]:
# count of distinct rows excluding id
tweets.select([c for c in tweets.columns if c!='tweet_id']).distinct().count()

In [None]:
# output values that are duplicates for all columns except tweet_id
tweets.groupBy('airline_sentiment','airline_sentiment_confidence','negativereason',
               'negativereason_confidence','airline','airline_sentiment_gold',
               'name','negativereason_gold','retweet_count','text','tweet_coord',
               'tweet_created','tweet_location','user_timezone')\
    .count()\
    .where(W.col('count')>1)\
    .toPandas()

In [None]:
# drop duplicates for rows excluding id
tweets = tweets.dropDuplicates(subset=[c for c in tweets.columns if c!='tweet_id'])
# confirm duplicates were dropped
tweets.count()

## Missing Values

- Remove all rows with missing values for `text`
- Ignore missing values later for columns used in exploratory data analysis

In [None]:
# number of missing values for each column
tweets.select(*(W.sum(W.col(c).isNull().cast('int')).alias(c) for c in tweets.columns))\
      .toPandas()

In [None]:
# remove rows where text is missing
tweets = tweets.dropna(subset='text')

## Correct Categorical Data

`airline_sentiment`
- change `neutral` to `positive` to create a binary classification problem

`negativereason`
- change `null` to `Can't Tell`

In [None]:
# output unique values for each categorical variable
cat_vars = ['airline_sentiment','negativereason','airline','tweet_location','user_timezone']
for col in cat_vars:
    tweets.select(col).distinct().show(20,truncate=False)

In [None]:
# convert neutral airline tweets to positive
tweets = tweets.withColumn('label', W.when(W.col('airline_sentiment')=='neutral','positive')\
                                          .otherwise(W.col('airline_sentiment')))

In [None]:
# change missing values for negative reason to "Can't Tell"
tweets = tweets.fillna({'negativereason':"Can't Tell"})

## Create Index Column

Used later when combining data frames

In [None]:
# create index column
tweets = tweets.withColumn('id', W.monotonically_increasing_id())
# output data frame
tweets.toPandas().head()

## Select Desired Columns

Columns will be used later for exploratory data analysis

In [None]:
# select columns
tweets = tweets.select('id', 'label','negativereason','airline',
                       'retweet_count','text','tweet_created')
# output dataframe
tweets.toPandas().head(2)

# Transform Text Data

In [None]:
# select columns
text_df = tweets.select('id','label','text')
# output dataframe
text_df.toPandas().head(2)

## Expand Contractions

In [None]:
# function for expanding contractions
def fix_contractions(text):
    return contractions.fix(text)
# udf for expanding contractions
contractions_udf = W.udf(lambda row: fix_contractions(row) , StringType())
# add column with contractions expanded
text_df = text_df.withColumn('text_clean', contractions_udf('text'))
# output data frame
text_df.toPandas().head()

## Tokenize the Text

In [None]:
# tokenize the text, @():;,.!?\-\/"
rt = RegexTokenizer().setInputCol('text_clean')\
                     .setOutputCol('text_vec')\
                     .setPattern('\s+|[\W]')\
                     .setToLowercase(True)
# transform data
text_df = rt.transform(text_df)
# output dataframe
text_df.toPandas().head()

## Convert Digits to Words

Convert `0` through `9` to `zero` through `9`

In [None]:
# dictionary of number and word keys
num_word_dict = {'0':'zero','1':'one','2':'two','3':'three','4':'four',
                 '5':'five','6':'six','7':'seven','8':'eight','9':'nine'}
# function for converting number to word
def num_to_word(row):
    new_row = []
    for x in row:
        if x in num_word_dict.keys():
            new_row.append(num_word_dict[x])
        else:
            new_row.append(x)
    return new_row
# udf for converting number to word
num_to_word_udf = W.udf(lambda row: num_to_word(row) , ArrayType(StringType()))
# add column with numbers converted to word
text_df = text_df.withColumn('text_vec_num', num_to_word_udf('text_vec'))
# output dataframe
text_df.toPandas().head()

## Remove Stopwords

In [None]:
# create english stopwords
english = StopWordsRemover().loadDefaultStopWords('english')
# stopwords transformer
stops = StopWordsRemover().setStopWords(english)\
                          .setInputCol('text_vec_num')\
                          .setOutputCol('text_vec_stop')
# transform dataframe
text_df = stops.transform(text_df)
# output dataframe
text_df.toPandas().head()

## Lemmatize Words

In [None]:
# create word lemmatizer object
wnl = WordNetLemmatizer()
# function for lemmatizing words
def wnl_row(row):
    return [wnl.lemmatize(x) for x in row]
# udf for lemmatizing words
lemmatizer_udf = W.udf(lambda row: wnl_row(row) , ArrayType(StringType()))
# create column of lemmatized words
text_df = text_df.withColumn('text_vec_lem', lemmatizer_udf('text_vec_stop'))
# output dataframe
text_df.toPandas().head()

## Remove Characters

In [None]:
# function for removing characters
def not_letter(row):
    return [x for x in row if len(x)>1]
# udf for removing characters
not_letter_udf = W.udf(lambda row: not_letter(row) , ArrayType(StringType()))
# create column with characters removed
text_df = text_df.withColumn('text_vec_clean', not_letter_udf('text_vec_lem'))
# output dataframe
text_df.toPandas().head()

## Vectorize Words

Create a vocabulary of words that appear in 0.5% of the documents

In [None]:
# create transformer
cv = CountVectorizer().setInputCol('text_vec_clean')\
                      .setOutputCol('count_vec')\
                      .setMinDF(text_df.count()*0.005)
# fit the transformer
fit_cv = cv.fit(text_df)
# transform the data
text_df = fit_cv.transform(text_df)
# output the vectorized column
text_df.select('count_vec').show(5, truncate=False)

In [None]:
# create vocabulary variable
vocab = fit_cv.vocabulary

## Convert Label Column to Float

`negative` becomes `0` and `positive` becomes `1`

In [None]:
# create string indexer
indxr = StringIndexer().setInputCol('label')\
                       .setOutputCol('label_idx')
# transform data
text_df = indxr.fit(text_df).transform(text_df)
# output dataframe
text_df.toPandas().head()

# Train and Test Datasets

70% of the data is used to train the model, 30% of the data is used to test the model

In [None]:
# create train and test sets
train,test = text_df.randomSplit([0.7,0.3],116)

In [None]:
# output train data
train.toPandas().head()

## Tweets Dataset for Exploratory Data Analysis

Make a dataset for the training data with additional features for exploratory data analysis

In [None]:
# merge tweets and train to get a data frame for exploratory data analysis with additional features
tweets_train = tweets.join(train, on=['id'], how='left_semi')
tweets_train.toPandas().head()

## Select Columns for Train and Test

In [None]:
# select columns for train and test
train = train.select('count_vec','label_idx')
test = test.select('count_vec','label_idx')

## Store Data Frames for Later Use

In [None]:
# store dataframes for additional programs
dfs = [train.toPandas(),test.toPandas(),tweets_train.toPandas(),vocab]
%store dfs