In [None]:
!pip install findspark


In [None]:
import findspark
findspark.init()
 
#import pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

import pyspark

number_cores = 8
memory_gb = 24

#conf = (
#    pyspark.SparkConf()
#        .setMaster('local[{}]'.format(number_cores))
#        .set('spark.driver.memory', '{}g'.format(memory_gb))
#)
#sc = pyspark.SparkContext(conf=conf)

spark = SparkSession.builder \
.appName("ModelTraining")\
.master('local[{}]'.format(number_cores))\
.config("spark.executor.memory", "24g")\
.getOrCreate()


In [None]:
TRAININGDATA="training.1600000.processed.noemoticon.csv"
TESTINGDATA="testdata.manual.2009.06.14.csv"

In [None]:
# !wget http://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip

In [None]:
# !unzip trainingandtestdata.zip

In [None]:
schema = "polarity FLOAT, id INT, date_time STRING, query STRING, user STRING, tweets STRING"
raw_training_data = spark.read.csv(
    TRAININGDATA, schema=schema
).cache()

In [None]:
#raw_train_df = raw_training_data.toPandas()

# Natural Language Processing

We are going to use Stop Words, Lemmatization, Stemming and special characters replacement to clean our data.

In [None]:
import re
import string
from stop_words import ENGLISH_STOP_WORDS
def cleaner(x, stemming):
    text = str(x).lower()
    s = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', ' _ip_ ', text)
    # Isolate punctuation
    s = re.sub(r'([.\(\)\!\?\-\\\/\,])', r' \1 ', s)
    # Remove some special characters
    s = re.sub(r'([\;\:\|•"«\n])', ' ', s)
    # Replace numbers and symbols with language
    s = s.replace('&', ' and ')
    s = s.replace('@', ' at ')
    s = s.replace('0', ' zero ')
    s = s.replace('1', ' one ')
    s = s.replace('2', ' two ')
    s = s.replace('3', ' three ')
    s = s.replace('4', ' four ')
    s = s.replace('5', ' five ')
    s = s.replace('6', ' six ')
    s = s.replace('7', ' seven ')
    s = s.replace('8', ' eight ')
    s = s.replace('9', ' nine ')
    tweet = re.sub(r"can'?t", ' can not', s)
    tweet = re.sub(r"n't", ' not', tweet)
    tweet = re.sub(r"'s", ' is', tweet)
    tweet = re.sub(r"i'm", ' i am ', tweet)
    tweet = re.sub(r"'ll", ' will', tweet)
    tweet = re.sub(r"'ve", ' have', tweet)
    tweet = re.sub(r"'d", ' would', tweet)
    tweet = re.sub(r'\&amp;|\&gt;|&lt;|\&', ' and ', tweet)
    url = re.compile(r'(https?[^\s]*)')
    smile = re.compile(r'[8:=;][\'`\-]?[\)d]+|[)d]+[\'`\-][8:=;]')
    sad = re.compile(r'[8:=;][\'`\-]?\(+|\)+[\'`\-][8:=;]')
    lol = re.compile(r'[8:=;][\'`\-]?p+')
    tweet = re.sub(r'\@[^\s]+', ' U ', tweet)
    tweet = url.sub(' ', tweet)
    tweet = re.sub(r'\/', ' ', tweet)
    tweet = smile.sub(' H ', tweet)
    tweet = lol.sub(' H ', tweet)
    tweet = sad.sub(' S ', tweet)
    tweet = re.sub(r'([\!\?\.]){2,}', '\g<1>', tweet)
    tweet = re.sub(r'\b(\S*?)([^\s])\2{2,}\b', '\g<1>\g<2>', tweet)
    tweet = re.sub(r'\#', ' #', tweet)
    tweet = re.sub(r'[^\w\#\s\?\<\>]+', ' ', tweet)
    tweet = re.sub('\s+', ' ', tweet)
    text = re.sub('\[.*?\]', '', tweet)
    text = re.sub('<.*?>+', '', text)
    text = re.sub('[%s]' % re.escape(string.punctuation), '', text)
    text = re.sub('\n', '', text)
    text = re.sub('\w*\d\w*', '', text)
    chain = ''
    if stemming == 'lemmatize':
        chain = ' '.join([Word(word).lemmatize() for word in text.split(' ') if word not in ENGLISH_STOP_WORDS])
    elif stemming == 'stemming':
        chain = ' '.join([Word(word).stem() for word in text.split(' ') if word not in ENGLISH_STOP_WORDS])
    else:
        chain = ' '.join([word for word in text.split(' ') if word not in ENGLISH_STOP_WORDS])
    return chain

In [None]:
clean_train_data = raw_training_data.withColumn('tweets', f.udf(lambda x:cleaner(x, 'nature'))("tweets"))

# Plot our data

In [None]:
import seaborn as sb

We check if classes are balanced.

In [None]:
clean_train_data.select("polarity").toPandas().value_counts()

In [None]:
sb.distplot(clean_train_data.select("polarity").toPandas())

# Make a Train and Test samples

In [None]:
clean_train_data.show()

In [None]:
import numpy as np
from sklearn.model_selection import StratifiedShuffleSplit

In [None]:
X = clean_train_data.select("tweets").toPandas().to_numpy()
y = clean_train_data.select("polarity").toPandas().to_numpy()
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.3, random_state=42)
sss.get_n_splits(X, y)

for train_index, test_index in sss.split(X, y):
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y[train_index], y[test_index]

In [None]:
import pandas as pd

In [None]:
train = pd.DataFrame(X_train, columns=['tweets']).join(pd.DataFrame(y_train, columns=['polarity']))
test = pd.DataFrame(X_test, columns=['tweets']).join(pd.DataFrame(y_test, columns=['polarity']))

In [None]:
def pandas_to_spark_df (data):
    return spark.createDataFrame(data)

In [None]:
train = pandas_to_spark_df(train)
test = pandas_to_spark_df(test)

In [None]:
train.toPandas()['polarity'].value_counts(), test.toPandas()['polarity'].value_counts()

In [None]:
#spark.stop()

In [None]:
train.show()

In [None]:
train.printSchema()

In [None]:
from pyspark.ml.feature import (
    Tokenizer,
    HashingTF,
    IDF,
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
tokenizer = Tokenizer(inputCol="tweets", outputCol="tokens")
hashing_tf = HashingTF(inputCol="tokens", outputCol="term_frequency")

idf = IDF(
    inputCol="term_frequency",
    outputCol="features",
    minDocFreq=5,  # minDocFreq: remove sparse terms
)

lr = LogisticRegression(labelCol="polarity")


In [None]:
pipeline = Pipeline(
    stages=[tokenizer, hashing_tf, idf, lr]
)

model = pipeline.fit(train)

In [None]:
%%time
test_prediciton = semantic_analysis_model.transform(test)
test_prediction.show()

In [None]:
%%time
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="polarity", metricName="accuracy")
accuracy = evaluator.evaluate(test_prediction)
print(f"Model Accuracy: {accuracy*100:.5f}%")

In [None]:
spark.stop()