In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DoubleType
from pyspark.sql.functions import lit, isnan, size, col

In [43]:
STS_RAW_TRAIN_FILE = '../data/sts/training.1600000.processed.noemoticon.csv'
STS_RAW_TEST_FILE  = '../data/sts/testdata.manual.2009.06.14.csv'

STS_PROCESED_TRAIN_PATH = '../data/processed/sts/sts_train'
STS_PROCESED_TEST_PATH  = '../data/processed/sts/sts_test'

COVID_PROCESSED_PATH = '../data/processed/full-tweets-sanitized/tweets-sanitized'

In [4]:
spark = SparkSession.builder.master('local').appName('local').getOrCreate()

# Load Data

In [68]:
sts_raw_schema = StructType([
    StructField('label', IntegerType(), True),
    StructField('id', LongType(), True),
    StructField('date', StringType(), True),
    StructField('query', StringType(), True),
    StructField('user', StringType(), True),
    StructField('text', StringType(), True),
])

sts_processed_schema = StructType([
    StructField('id', LongType(), True),
    StructField('text', StringType(), True),
])

covid_processed_schema = StructType([
    StructField('id', LongType(), True),
    StructField('text', StringType(), True),
])

In [105]:
# Google Drive file name     - md5sum 
# Final_preprocessed_sts.csv - ec4e0de0560e2ce9a3c11055b6f41894
# Test_data_processed.csv    - ee4e572acdbb6dc129ca397f7d3f37bc
# 
# Recover the labels from the raw data necessary for training and testing
# 
df_sts_processed_train = spark.read.csv(STS_PROCESED_TRAIN_PATH + '/' + '*.csv', header=False, schema=sts_processed_schema).withColumn('type', lit('train'))
df_sts_processed_test  = spark.read.csv(STS_PROCESED_TEST_PATH + '/' + '*.csv', header=False, schema=sts_processed_schema).withColumn('type', lit('test'))

df_sts_raw_train = spark.read.csv(STS_RAW_TRAIN_FILE, header=False, schema=sts_raw_schema).select('id', 'text', 'label').withColumnRenamed('text', 'raw_text')
df_sts_processed_train_with_labels = df_sts_processed_train.join(df_sts_raw_train, on=['id']).select('id', 'text', 'label', 'type')

df_sts_raw_test = spark.read.csv(STS_RAW_TEST_FILE, header=False, schema=sts_raw_schema).select('id', 'text', 'label').withColumnRenamed('text', 'raw_text')
df_sts_processed_test_with_labels = df_sts_processed_test.join(df_sts_raw_test, on=['id']).select('id', 'text', 'label', 'type')

In [106]:
df_covid_processed     = spark.read.csv(COVID_PROCESSED_PATH + '/' + '*.csv', header=False, schema=covid_processed_schema).select('id', 'text', lit(None).alias('label')).withColumn('type', lit('covid'))

In [107]:
df_all = df_sts_processed_train_with_labels.union(df_sts_processed_test_with_labels).union(df_covid_processed)

In [108]:
# Clean up null rows on text column
df_all = df_all.filter(~ col("text").isNull())

# Build Pipeline
1. Tokenize Words
2. Build Feature Vector

In [109]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer

In [110]:
# Hyper Parameters
VOCAB_SIZE = 10000
MIN_DF     = 5

# Default Stop Words
default_stop_words = StopWordsRemover.loadDefaultStopWords("english")

In [111]:
tokenizer          = RegexTokenizer(inputCol="text", outputCol="tokenized_text", pattern="\\W")
stop_words_remover = StopWordsRemover(inputCol="tokenized_text", outputCol="filtered_text").setStopWords(default_stop_words)
vectorizer         = CountVectorizer(inputCol="filtered_text", outputCol="features", vocabSize=VOCAB_SIZE, minDF=MIN_DF)

pipeline = Pipeline(stages=[tokenizer, stop_words_remover, vectorizer])

In [None]:
pipeline_fit = pipeline.fit(df_all)
df_all_fit   = pipeline_fit.transform(df_all)

In [None]:
df_all_fit.show()
df_all.groupBy('type').count().show()

# Train Naive Bayes
Train the NB model

In [97]:
from pyspark.ml.classification import NaiveBayes

In [98]:
df_train = df_all_fit.filter(df_all_fit.type == 'train')
df_test  = df_all_fit.filter(df_all_fit.type == 'test')
df_covid = df_all_fit.filter(df_all_fit.type == 'covid')

In [99]:
nb = NaiveBayes()
model = nb.fit(df_train)

In [100]:
preds = model.transform(df_test)

In [101]:
preds.show()

+---+----+-----+----+--------------+-------------+--------+-------------+-----------+----------+
| id|text|label|type|tokenized_text|filtered_text|features|rawPrediction|probability|prediction|
+---+----+-----+----+--------------+-------------+--------+-------------+-----------+----------+
+---+----+-----+----+--------------+-------------+--------+-------------+-----------+----------+



# Labeling COVID Dataset

In [44]:
COVID_LABELED_PATH = '../data/processed/full-tweets-labeled'

In [45]:
covid_preds = model.transform(df_covid)

In [46]:
covid_preds.show()

+-------------------+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                 id|                text|label| type|      tokenized_text|       filtered_text|            features|       rawPrediction|         probability|prediction|
+-------------------+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|1253972715574157314|americans gonna h...| null|covid|[americans, gonna...|[americans, gonna...|(10000,[113,163,2...|[-31.665983410608...|[0.89051145501911...|       0.0|
|1253972715599519745|rt the independen...| null|covid|[rt, the, indepen...|[rt, independence...|(10000,[0,444,479...|[-101.52088510706...|[0.01678199294975...|       1.0|
|1253972715645497344|rt coronavirus sh...| null|covid|[rt, coronavirus,...|[rt, coronavirus,...|(10000,[0,2,630],...|[-35.581744008635...|[0.2260

In [47]:
from pyspark.sql.functions import udf

weighted_prob = udf(lambda v: float(v[1]), DoubleType())
covid_preds_final = covid_preds.withColumn("weighted_label", weighted_prob("probability")).select("id", "prediction", "weighted_label")

In [48]:
covid_preds_final.repartition(1).write.csv(COVID_LABELED_PATH)