# Here I build a model to classify reddit comments as positive or negative based on the content of each comment.

In [6]:
%pylab inline
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import Row, functions as F
from pyspark.ml.feature import Tokenizer, HashingTF, Binarizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator

## Read in the Reddit data
#### takes approximately 17 minutes. 

In [7]:
fields = [StructField("archived", BooleanType(), True),
        StructField("author", StringType(), True),
        StructField("author_flair_css_class", StringType(), True),
        StructField("body", StringType(), True),
        StructField("controversiality", LongType(), True),
        StructField("created_utc", StringType(), True),
        StructField("day", LongType(), True),
        StructField("distinguished", StringType(), True),
        StructField("downs", LongType(), True),
        StructField("edited", StringType(), True),
        StructField("gilded", LongType(), True),
        StructField("id", StringType(), True),
        StructField("link_id", StringType(), True),
        StructField("month", LongType(), True),
        StructField("name", StringType(), True),
        StructField("parent_id", StringType(), True),
        StructField("retrieved_on", LongType(), True),
        StructField("score", LongType(), True),
        StructField("score_hidden", BooleanType(), True),
        StructField("subreddit", StringType(), True),
        StructField("subreddit_id", StringType(), True),
        StructField("ups", LongType(), True),
        StructField("year", LongType(), True)]

rawDF = sqlContext.read.parquet("s3a://reddit-comments-parquet/year=2009").persist(StorageLevel.MEMORY_AND_DISK_SER)

In [14]:
# rawDF.printSchema()

## selecting the reddit.com subreddit comments here..

In [15]:
# Select columns that are needed for the training and testing

# Cast columns to the correct datatype for Transformers
def cast_col(df, col, cast_type):
    '''
    Function to cast column into datatype for Transformers. 
    '''
    return df.withColumn("temp_col", df[col].cast(cast_type))\
             .drop(col)\
             .withColumnRenamed("temp_col", col)

# filter out comments with no score, comments that are deleted, and only use the reddit.com subreddit comments
filteredDF = rawDF.select("id", "body", "score", "score_hidden", "subreddit")\
                  .filter(rawDF.body != "[deleted]")\
                  .filter(rawDF.score_hidden == False)\
                  .filter(rawDF.subreddit == "reddit.com")
castedDF = cast_col(filteredDF, "score", DoubleType())

print "Sample size: {}".format(castedDF.count())

castedDF.registerTempTable("rc")

query = sqlContext.sql("""
    SELECT score, COUNT(*) as cnt FROM rc
    GROUP BY score
    ORDER BY cnt DESC
    """)
result = query.toPandas()


Plot the distribution of comment scores to see what the data looks like:

In [11]:
result.plot(x="score", y="cnt", kind="scatter")
plt.xlim([-20,20])
plt.ylim([0, 800000])

Most comments have a score of 1, and the number of comments drops very rapidly on either side of this.

## Creating a labeled dataset

Since the majority of comments have a score of 0-3, I assume that a comment needs a score < 0 to be a negative comment and > 10 to be a positive comment (10% of the data falls into this classification scheme). 

In [16]:
negativeDF = castedDF.filter(castedDF["score"] < 0)
positiveDF = castedDF.filter(castedDF["score"] > 10)

print negativeDF.count(), positiveDF.count(), castedDF.count()

## Split into training and testing data

Combine the positive comments and negative comments, and randomly split them into training and testing datasets (80% in the training set, 20% in the testing set). 

I put the testing dataset aside for now and use the training dataset to train the model. Once the model is trained, I use the testing dataset to validate the model.



In [17]:
# Split dataset into training and testing
mergedDF = negativeDF.unionAll(positiveDF)
splitDF = mergedDF.randomSplit([0.8, 0.2])
trainingDF = splitDF[0]
testingDF = splitDF[1]

trainingDF.persist(StorageLevel.MEMORY_AND_DISK)
testingDF.persist(StorageLevel.MEMORY_AND_DISK)

print "training size: {}".format(trainingDF.count())
print "negative sentiment: {}".format(trainingDF.filter(trainingDF.score<0).count())
print "positive sentiment: {}".format(trainingDF.filter(trainingDF.score>0).count())
print "testing size: {}".format(testingDF.count())


## Training a model

I use the frequency of each word in a comment as my features, other methods are: pyspark.ml.feature.IDF transformer, or removing stopwords.

Build a ML pipeline by putting together the binarizer, tokenizer, hashingTF and logisticregression. In the end, I fit the model to the training dataset and make predictions on the testing dataset.


In [18]:
#create a column called 'label' that has converted the score into a column 
#that contains a 0 or 1, depending on the threshold variable
binarizer = Binarizer(threshold=0.0, inputCol="score", outputCol="label")

#tokenize the text into individual words
tokenizer = Tokenizer(inputCol="body", outputCol="words")

#calculate the term frequency and send the resulting values to a column called 'features'
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

#maxIter is the maximum number of iterations completed when running the fit
#regParam is the regularization strength
lr = LogisticRegression(maxIter=10, regParam=0.01)

#put it all into a pipeline, fit on the training data, and test on the testing data!
pipeline = Pipeline(stages=[binarizer, tokenizer, hashingTF, lr])
model = pipeline.fit(trainingDF)
prediction = model.transform(testingDF)

## Model validation

In [19]:
evaluator = BinaryClassificationEvaluator() # area under the RoC curve
train_RoC = evaluator.evaluate(model.transform(trainingDF))
test_RoC = evaluator.evaluate(model.transform(testingDF))

print "The area under the RoC curve for the training set is {} and for the test set is {}".format(train_RoC, test_RoC)

### Check out the errors

In [20]:
selected = prediction.select("id", "body", "prediction", "label")
positive_score_rate = binarizer.transform(mergedDF).map(lambda r: r.label).mean()

def typeI_II(row):
    if row.prediction == 0 and row.label == 0:
        return Row(error_type="true_neg", cnt=1)
    elif row.prediction == 0 and row.label == 1:
        return Row(error_type="false_neg", cnt=1)
    elif row.prediction == 1 and row.label == 0:
        return Row(error_type="false_pos", cnt=1)
    else:
        return Row(error_type="true_pos", cnt=1)

typeI_II_DF = selected.map(lambda r: typeI_II(r)).toDF()
type_error_pd = typeI_II_DF.groupBy("error_type")\
                           .sum("cnt")\
                           .withColumnRenamed("SUM(cnt)", "cnt").toPandas()

type_error_pd["tot"] = type_error_pd["cnt"].sum(axis=0)
type_error_pd["perc"] = type_error_pd["cnt"]/type_error_pd["tot"]
print type_error_pd
print "percentage of comments with positive score in full set: {0:.2f}".format(positive_score_rate)



### Tune the hyperparameters using grid search cross-validation:

In [24]:
lr_grid = LogisticRegression()
pipeline = Pipeline(stages=[binarizer, tokenizer, hashingTF, lr_grid])

grid = ParamGridBuilder()\
        .baseOn({lr_grid.labelCol: 'label'})\
        .addGrid(lr_grid.regParam, [0.01, 0.1])\
        .addGrid(lr_grid.elasticNetParam, [0.5,0.75])\
        .addGrid(lr_grid.maxIter, [1, 2])\
        .build()
    
evaluator = BinaryClassificationEvaluator() # the area under the RoC curve
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator, numFolds=4)
cvModel = cv.fit(trainingDF)


In [25]:
print evaluator.evaluate(cvModel.transform(trainingDF))
print evaluator.evaluate(cvModel.transform(testingDF))