In [1]:
import findspark
# my local spark install
findspark.init('/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/')

In [2]:
import pyspark
from pyspark.sql import SQLContext

# create spark contexts
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

In [3]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import preproc as pp
# Register all the functions in Preproc with Spark Context
check_lang_udf = udf(pp.check_lang, StringType())
remove_stops_udf = udf(pp.remove_stops, StringType())
remove_features_udf = udf(pp.remove_features, StringType())
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())
lemmatize_udf = udf(pp.lemmatize, StringType())
check_blanks_udf = udf(pp.check_blanks, StringType())

In [5]:
# Load a text file and convert each line to a Row.
data_rdd = sc.textFile("data/raw_classified.txt")
parts_rdd = data_rdd.map(lambda l: l.split("\t"))
# Filter bad rows out
garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
# Create DataFrame
data_df = sqlContext.createDataFrame(typed_rdd, ["text", "id", "label"])

In [6]:
# predict language and filter out those with less than 90% chance of being English
lang_df = data_df.withColumn("lang", check_lang_udf(data_df["text"]))
en_df = lang_df.filter(lang_df["lang"] == "en")

In [7]:
# remove stop words to reduce dimensionality
rm_stops_df = en_df.withColumn("stop_text", remove_stops_udf(en_df["text"]))

In [8]:
# remove other non essential words, think of it as my personal stop word list
rm_features_df = rm_stops_df.withColumn("feat_text", remove_features_udf(rm_stops_df["stop_text"]))

In [9]:
# tag the words remaining and keep only Nouns, Verbs and Adjectives
tagged_df = rm_features_df.withColumn("tagged_text", tag_and_remove_udf(rm_features_df["feat_text"]))

In [18]:
# lemmatization of remaining words to reduce dimensionality & boost measures
lemm_df = tagged_df.withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"]))

In [None]:
# remove all rows containing only blank spaces
check_blanks_df = lemm_df.withColumn("is_blank", check_blanks_udf(lemm_df["lemm_text"]))
no_blanks_df = check_blanks_df.filter(check_blanks_df["is_blank"] == "False")

In [None]:
# rename columns
no_blanks_df.withColumnRenamed(no_blanks_df["lemm_text"], "text")

In [None]:
# dedupe important since alot of the tweets only differed by url's and RT mentions
dedup_df = no_blanks_df.dropDuplicates(['text', 'label'])

In [None]:
# select only the columns we care about
data_set = dedup_df.select(dedup_df['id'], dedup_df['text'], dedup_df['label'])

In [None]:
# split training & validation sets with 60% to training and use a seed value of 1987
splits = data_set.randomSplit([0.6, 0.4])
training_df = splits[0]
test_df = splits[1]

In [None]:
##################################################################
#
#   Spark ML Section
#   
#   Skip Preprocessing and use cleaned files by running next cell
#
##################################################################

In [11]:
# Load already cleaned data
def reload_checkpoint(data_rdd):
    parts_rdd = data_rdd.map(lambda l: l.split("\t"))
    # Filter bad rows out
    garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
    typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
    # Create DataFrame
    df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"])
    return df


# Load precleaned training set
training_rdd = sc.textFile("data/clean_training.txt")
training_df = reload_checkpoint(training_rdd)
# Load precleaned test set
test_rdd = sc.textFile("data/clean_test.txt")
test_df = reload_checkpoint(test_rdd)

In [4]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
idf = IDF(minDocFreq=3, inputCol="features", outputCol="idf")
nb = NaiveBayes()
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])


paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 1.0]).build()


cv = CrossValidator(estimator=pipeline, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=MulticlassClassificationEvaluator(), 
                    numFolds=4)

cvModel = cv.fit(training_df)

In [None]:
result = cvModel.transform(test_df)
prediction_df = result.select("text", "label", "prediction")

In [None]:
datasci_df = prediction_df.filter(prediction_df['label']==0.0)
datasci_df.show(truncate=False)

In [None]:
ao_df = prediction_df.filter(prediction_df['label']==1.0)
ao_df.show(truncate=False)

In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(result, {evaluator.metricName: "precision"})

0.9228856806385485