# Import Required Packages

In [14]:
import pandas as pd
import numpy as np
import matplotlib as plt
import seaborn as sns
import nltk
color_pal = sns.color_palette()
from datetime import datetime
from pyspark.sql.types import DateType
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.functions import to_date, unix_timestamp, from_unixtime, col
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
#from pyspark.sql.types import ArrayType, StringType
from nltk.stem import WordNetLemmatizer
from pyspark.ml.feature import Tokenizer, StopWordsRemover

In [15]:
#nltk.download('all')

In [16]:
sc

# Load Original Data File

In [17]:
custom_schema = StructType([
    StructField("index", StringType(), True), 
    StructField("id", StringType(), True), 
    StructField("date", StringType(), True), 
    StructField("flag", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True)])
    

df = spark.read.csv("hdfs://localhost:9000/sharesprk1/ProjectTweets.csv", header=False, schema=custom_schema)
df.show()

+-----+----------+--------------------+--------+---------------+--------------------+
|index|        id|                date|    flag|           user|                text|
+-----+----------+--------------------+--------+---------------+--------------------+
|    0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|    1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|    2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|    3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|    4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|    5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|    6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|    7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|    8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4

# Perform Sentiment Analysis Data Preparation Tasks

In [18]:
# Define lemmatizer
lemmatizer = WordNetLemmatizer()

# Define UDF for lemmatization
lemmatize_udf = udf(lambda tokens: [lemmatizer.lemmatize(token) for token in tokens], ArrayType(StringType()))

# Tokenize text column
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df = tokenizer.transform(df)

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df = remover.transform(df)

# Lemmatize remaining words
df = df.withColumn("lemmatized", lemmatize_udf("filtered"))

In [19]:
df.select("text", "filtered").show()
#df.show()

+--------------------+--------------------+
|                text|            filtered|
+--------------------+--------------------+
|@switchfoot http:...|[@switchfoot, htt...|
|is upset that he ...|[upset, update, f...|
|@Kenichan I dived...|[@kenichan, dived...|
|my whole body fee...|[whole, body, fee...|
|@nationwideclass ...|[@nationwideclass...|
|@Kwesidei not the...|[@kwesidei, whole...|
|         Need a hug |         [need, hug]|
|@LOLTrish hey  lo...|[@loltrish, hey, ...|
|@Tatiana_K nope t...|  [@tatiana_k, nope]|
|@twittera que me ...|[@twittera, que, ...|
|spring break in p...|[spring, break, p...|
|I just re-pierced...|  [re-pierced, ears]|
|@caregiving I cou...|[@caregiving, bea...|
|@octolinz16 It it...|[@octolinz16, cou...|
|@smarrison i woul...|[@smarrison, woul...|
|@iamjazzyfizzle I...|[@iamjazzyfizzle,...|
|Hollis' death sce...|[hollis', death, ...|
|about to file taxes |       [file, taxes]|
|@LettyA ahh ive a...|[@lettya, ahh, iv...|
|@FakerPattyPattz ...|[@fakerpat

In [20]:
print(df.columns)

['index', 'id', 'date', 'flag', 'user', 'text', 'words', 'filtered', 'lemmatized']


In [21]:
columns_to_drop = ["text", "words", "lemmatized"]
df = df.drop(*columns_to_drop)
df.show()

+-----+----------+--------------------+--------+---------------+--------------------+
|index|        id|                date|    flag|           user|            filtered|
+-----+----------+--------------------+--------+---------------+--------------------+
|    0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|[@switchfoot, htt...|
|    1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|[upset, update, f...|
|    2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|[@kenichan, dived...|
|    3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|[whole, body, fee...|
|    4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|[@nationwideclass...|
|    5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|[@kwesidei, whole...|
|    6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         [need, hug]|
|    7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|[@loltrish, hey, ...|
|    8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4

In [22]:
df.dtypes

[('index', 'string'),
 ('id', 'string'),
 ('date', 'string'),
 ('flag', 'string'),
 ('user', 'string'),
 ('filtered', 'array<string>')]

In [23]:
# Need to modify filtered column for export to a CSV file
df = df.withColumn('filtered', df['filtered'].cast('string'))

In [24]:
# Confirm the change is successful
df.dtypes

[('index', 'string'),
 ('id', 'string'),
 ('date', 'string'),
 ('flag', 'string'),
 ('user', 'string'),
 ('filtered', 'string')]

In [25]:
# convert spark dataframe to pandas for output to local file
pandas_df = df.select("*").toPandas()

In [29]:
# Create a CSV from the modified dataframe on the local file system
pandas_df.to_csv('/home/master/Semester2/CA2/Final/ProjectTweets_filtered.csv', header=False, index=False)

In [12]:
# Create a CSV from the modified dataframe on HDFS
df.write.mode('overwrite').csv('hdfs://localhost:9000/sharesprk1/ProjectTweets_filtered.csv', header=False)

In [11]:
# Confirm the file can be read and all original observations are present with the new filtered text column
custom_schema = StructType([
    StructField("index", StringType(), True), 
    StructField("id", StringType(), True), 
    StructField("date", StringType(), True), 
    StructField("flag", StringType(), True),
    StructField("user", StringType(), True),
    StructField("filtered", StringType(), True)])
    

df_filtered = spark.read.csv("/home/master/Semester2/CA2/Final/ProjectTweets_filtered.csv", header=False, schema=custom_schema)
df_filtered.show()

+-----+----------+--------------------+--------+---------------+--------------------+
|index|        id|                date|    flag|           user|            filtered|
+-----+----------+--------------------+--------+---------------+--------------------+
|    0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|[@switchfoot, htt...|
|    1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|[upset, update, f...|
|    2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|[@kenichan, dived...|
|    3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|[whole, body, fee...|
|    4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|[@nationwideclass...|
|    5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|[@kwesidei, whole...|
|    6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         [need, hug]|
|    7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|[@loltrish, hey, ...|
|    8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4