In [1]:
import warnings
warnings.filterwarnings('ignore')

from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import json

conf = SparkConf().setAppName('ITAPP-Preprocess').setMaster('spark://tf2:7077').set("spark.executor.memory", "5g").set("spark.driver.memory", "5g")

sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")

spark = SparkSession(sc)

In [None]:
df = spark.read.option("inferSchema", "true").json('/home/ubuntu/export/*.json')

In [None]:
from pyspark.sql.functions import udf
import re
from pyspark.ml.feature import Tokenizer, RegexTokenizer, HashingTF, IDF, CountVectorizer
from pyspark.ml.feature import StringIndexer


def clean_text(t):
    # to lower
    t = t.lower()
    # remove quotes
    t = re.sub(r'"@.*', '', t)
    t = re.sub(r'^“.*”$', '', t)
    # remove URLs
    t = re.sub(r'https*:\/\/\S*', '', t)
    t = re.sub(r'pic\.twitter\.com\/\S*', '', t)
    # remove \n
    t = re.sub('\n', ' ', t)
    # remove extra whitespaces
    t = re.sub(r'\s+', ' ', t)
    # replace '&amp' with 'and'
    t = re.sub('&amp;', 'and', t)     
    # replace abbreviations
    t = re.sub("'ll", ' will', t)
    t = re.sub("won't", 'will not', t)
    t = re.sub("n't", ' not', t) 
    # remove @mention
    t = re.sub(r'@[A-Za-z0-9_]+', '', t) 
    # remove #tag
    t = re.sub(r'#[A-Za-z0-9_]+', '', t) 
    # remove special characters
    t = re.sub(r'[^a-zA-Z ]', '', t) 
    # remove multiple spaces 
    t = re.sub("\s\s+", " ", t) 
    return(t)

cleaner = udf(lambda x : clean_text(x))
toLower = udf(lambda x : str(x).lower())

In [None]:
def lookup_vocab(vocab, char):
    for i in range(len(vocab)):
        if vocab[i] == char:
            return i
    return -1

def inverse_lookup_vocab(vocab, int_val):
    return vocab[int_val]


def vocab_to_dict(vocab):
    dict_v = {}
    for i in range(len(vocab)):
        dict_v[vocab[i]] = i
    return dict_v

def encode_arr(arr, dict_v):
    res = []
    for i in arr:
        res.append(dict_v[i])
    return res

In [None]:
def preprocess_candidate(df, candidate="Trump", dict_v=None):
    df = df.select("Text").where("Tag == '"+ candidate + "'") 
    df = df.withColumn("Text", toLower(df.Text))
    # Remove duplicated tweets
    df = df.select("Text").groupBy("Text").count()
    df = df.withColumn("Text", cleaner(df.Text))
    
    tokenizer = RegexTokenizer(inputCol="Text", outputCol="chars", pattern="")
    tokenized = tokenizer.transform(df)
    vectorizer = CountVectorizer(inputCol="chars", outputCol="features").fit(tokenized)
    vocab = vectorizer.vocabulary
    chars = sorted(vocab)
    print(chars)
    if dict_v is None:
        dict_v = vocab_to_dict(chars)
        
    labelEncoder = udf(lambda x : encode_arr(x, dict_v))
    encoded = tokenized.withColumn("chars", labelEncoder(tokenized.chars))
    
    listchars = [row.chars for row in encoded.collect()]
    encoded_texts = []
    for r in listchars:
        for num in r:
            if num.isnumeric():
                encoded_texts.append(int(num))

    # summarize the loaded data
    n_chars = len(encoded_texts)
    n_vocab = len(dict_v)
    print("Total Characters: ", n_chars)
    print("Total Vocab: ", n_vocab)
    
    # prepare the dataset of input to output pairs encoded as integers
    seq_length = 100
    dataX = []
    dataY = []
    for i in range(0, n_chars - seq_length, 1):
        seq_in = encoded_texts[i:i + seq_length]
        seq_out = encoded_texts[i + seq_length]
        dataX.append(seq_in)
        dataY.append(seq_out)
    n_patterns = len(dataX)
    print("Total Patterns: ", n_patterns)
    return dataX, dataY


In [None]:
X, y = preprocess_candidate(df, candidate="Trump")
f = open("/home/ubuntu/train_trump_x.txt", "w")
for i in X:
    f.write(str(i) +"\n")
f.close()

f = open("/home/ubuntu/train_trump_y.txt", "w")
for i in y:
    f.write(str(i) +"\n")
f.close()

In [None]:
X, y = preprocess_candidate(df, candidate="Biden")
f = open("/home/ubuntu/train_biden_x.txt", "w")
for i in X:
    f.write(str(i) +"\n")
f.close()

f = open("/home/ubuntu/train_biden_y.txt", "w")
for i in y:
    f.write(str(i) +"\n")
f.close()