### Data Cleaning

In [1]:
from operator import add
from pyspark.ml import Pipeline
from pyspark.sql import (Row, functions as F)
from sparknlp.base import DocumentAssembler, EmbeddingsFinisher
from sparknlp.annotator import Tokenizer, Normalizer, Word2VecApproach

import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import sparknlp
import torch 
import torch.nn as nn
import warnings
warnings.filterwarnings('ignore')

spark = sparknlp.start(gpu=True, memory='28G')
spark.sparkContext.setLogLevel('ERROR')

PATH = r"C:\Users\samue\Documents\Applied Data Science\INFO-H518 Deep Learning\Assignments\A3\Input"


In [2]:
IN_PATH = os.path.join(os.getcwd(), 'Assignments', 'A3', 'Input')
train = spark.createDataFrame(
    pd.read_csv(PATH + r'\Original\train.csv', header="infer", index_col=0).dropna()
)
test = spark.createDataFrame(
    pd.read_csv(PATH + r'\Original\test.csv', header="infer", index_col=0).dropna()
)
train.toPandas()

Unnamed: 0,Party,Handle,Tweet
0,Democrat,RepDarrenSoto,"Today, Senate Dems vote to #SaveTheInternet. P..."
1,Democrat,RepDarrenSoto,RT @WinterHavenSun: Winter Haven resident / Al...
2,Democrat,RepDarrenSoto,RT @NBCLatino: .@RepDarrenSoto noted that Hurr...
3,Democrat,RepDarrenSoto,RT @NALCABPolicy: Meeting with @RepDarrenSoto ...
4,Democrat,RepDarrenSoto,RT @Vegalteno: Hurricane season starts on June...
...,...,...,...
72729,Republican,RepTomPrice,Check out my op-ed on need for End Executive O...
72730,Republican,RepTomPrice,"Yesterday, Betty &amp; I had a great time lear..."
72731,Republican,RepTomPrice,We are forever grateful for the service and sa...
72732,Republican,RepTomPrice,Happy first day of school @CobbSchools! #CobbB...


In [3]:
doc_assembler = DocumentAssembler() \
    .setInputCol('Tweet') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

normalizer = Normalizer() \
    .setInputCols(['token']) \
    .setOutputCol('normal_token') \
    .setCleanupPatterns(["[^#A-Za-z]", "^https(.*)", "^#$"])

token_pipeline = Pipeline().setStages([
    doc_assembler,
    tokenizer,
    normalizer
])

In [4]:
train = token_pipeline \
    .fit(train) \
    .transform(train) \
    .selectExpr(['Party', 'Handle', 'Tweet', 'normal_token.result as Tokens']) \
    .withColumnRenamed('result', 'Tokens')

train_vocab = train.select('Tokens') \
    .select(F.explode('Tokens').alias('Terms')) \
    .distinct() \
    .sort('Terms') \
    .toPandas()

train_vocab_frequency = train.select('Tokens') \
    .select(F.explode('Tokens').alias('Terms')) \
    .groupBy('Terms').count() \
    .toDF('Terms', 'Count') \
    .sort('Count') \
    .toPandas()



test = token_pipeline \
    .fit(test) \
    .transform(test) \
    .selectExpr(['Party', 'Handle', 'Tweet', 'normal_token.result as Tokens']) \
    .withColumnRenamed('result', 'Tokens')

test_vocab = test.select('Tokens') \
    .select(F.explode('Tokens').alias('Terms')) \
    .distinct() \
    .sort('Terms') \
    .toPandas()

test_vocab_frequency = test.select('Tokens') \
    .select(F.explode('Tokens').alias('Terms')) \
    .groupBy('Terms').count() \
    .toDF('Terms', 'Count') \
    .sort('Count') \
    .toPandas()

In [5]:
train.toPandas().to_pickle(PATH + r'\train_tokenized.pickle')
train_vocab.to_csv(PATH + r'\train_vocab.csv')
train_vocab_frequency.to_csv(PATH + r'\train_vocab_frequency.csv')

In [6]:
test.toPandas().to_pickle(PATH + r'\test_tokenized.pickle')
test_vocab.to_csv(PATH + r'\test_vocab.csv')
test_vocab_frequency.to_csv(PATH + r'\test_vocab_frequency.csv')