In [1]:
import matplotlib
# Force matplotlib to not use any Xwindows backend.
matplotlib.use('Agg')

In [2]:
import matplotlib.pyplot as plt
import json
import os
import sys
import re
import numpy as np
import time
import datetime
 
# Path for spark source folder
os.environ['SPARK_HOME'] = "/usr/local/spark"

# Append pyspark to Python Path
sys.path.append("/usr/local/spark/python")

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
# Load in the testing code and check to see if your answer is correct
# If incorrect it will report back '1 test failed' for each failed test
# Make sure to rerun any cell you change before trying the test again
from test_helper import Test
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Append afinn to Python Path and import afinn.  Used for pulling data from percentiles.
sys.path.append("/usr/local/lib/python2.7/dist-packages/afinn")
from afinn import Afinn

# Stuff for logistic regression
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.linalg import SparseVector

In [3]:
sys.path.append("/usr/local/lib/python2.7/dist-packages")
from tdigest import TDigest
from numpy.random import random
from operator import add



# 0.0 Read data

Read in data.  For now, one file on S3.  Reading one 85 MB file on S3 doesn't seem to take too long. 

<B>ADD CODE TO READ multiple files FROM S3.  

df = sqlContext.read.json("/home/ubuntu/RC_2007-10")

print datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')

print datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')

ts1 = time.time()

ts2 = time.time()

print ts2-ts1

df = sqlContext.read.json("s3n://reddit-comments/2007/RC_2007-10")

In [4]:
ts1 = time.time()
df = sqlContext.read.json("s3n://reddit-comments/2012/RC_2012-04")
ts2 = time.time()
print ts2 - ts1
# df.printSchema()
# df.take(5)

493.751390934


# 1.0 Find minimum comment timestamp for each post

For each post link_id, find minimum created_utc timestamp [can't trust data is ordered by time] and store in key-value pair (pair RDD) {link_id: min_created_utc},  (Plan B:  set up API to get timestamp for all posts in Reddit)

<B>I'M KEEPING TOO MUCH DATA.  AT THIS STEP, AND SUBSEQUENT STEPS, THROW OUT DATA I'M NOT USING.

In [5]:
rRDD = df.map(lambda r: (r.id, (r.body, int(r.created_utc), r.link_id, r.parent_id, int(r.score), r.subreddit, r.subreddit_id)))

rRDD.take(1)

Sort to find minimum timestamp created_utc for each post link_id

rRDD.count()

In [6]:
minTimeRDD = (rRDD.map(lambda (k, v): (v[2],v[1])) # maps to (link_id, created_utc)
                  .reduceByKey(lambda a, b:  a if a < b else b)
              )

minTimeRDD.take(1)

minTimeRDD.count()

# 2.0 Filter to include only extreme up and down votes (top 3% of subreddit)

Filter to retain only records that are top or bottom 3% in comment score (upvotes-downvotes) of their subreddit.  Reduces dataset for all subsequent processing.

Find 3% and 97% percentiles for each subreddit.  Use T-digest data structure for highly accurate approximate percentiles: 
http://dataorigami.net/blogs/napkin-folding/19055451-percentile-and-quantile-estimation-of-big-data-the-t-digest

def digest1(value):
    digest = TDigest()
    digest.update(value)
    return digest

In [7]:
def digest_batch(values):
    digest = TDigest()
    digest.batch_update(values)
    return digest

testd = digest_batch([1,1,4,5,6,7,5,4,3,2,1])

Need to get at scores for each subreddit, THEN map using digest.

tigest WAS a bottleneck, taking ~20 minutes for 85 MB dataset when ingesting line by line.  I refactored to use reduceByKey() to create a big list, then feed the list into tdigest in batch, which goes about 20x faster.  

<b>NEED TO SPLIT INTO TRAINING, VAL AND TEST BEFORE CREATING SUBREDDIT DIGEST ON TRAINING DATA ONLY.  OTHERWISE I'M CHEATING.</b>

subredditDigestRDD = (rRDD.map(lambda (k, v):  (v[5], v[4]))
                        .map(lambda (k, v): (k, digest1(v)))  
                        .reduceByKey(lambda a, b:  a + b)
                   )

In [8]:
subredditDigestRDD = (rRDD.map(lambda (k, v):  (v[5], [v[4]]))
                          .reduceByKey(lambda a, b:  a + b)
                          .map(lambda (k, v): (k, digest_batch(v)))  
                        
                   )

type(subredditDigestRDD)

REFACTOR options to increase speed.  I went with 2.  
1.  RUN TDIGEST ON PARTITION INSTEAD OF EACH VALUE WHEN CREATING subredditDigestRDD.
2.  reduceByKey() on subreddits and create huge lists for each subreddit, then create tdigests from huge lists.
3.  reduceByKey() on year-month and create huger lists for each month, then create tdigests from huger lists.
4.  Some combination of 2 and 3.

NOTE:  Ronak says that reduceByKey() won't generate a list but groupByKey() will. He was wrong- I can add lists. 

In [9]:
ts1 = time.time()
subredditDigest = subredditDigestRDD.collectAsMap()
ts2 = time.time()
print ts2-ts1

2058.2965951


print ts2-ts1

type(subredditDigest['politics'])

subredditDigest['politics'].percentile(3)

subredditDigest['politics'].percentile(97)

Alyssa's values for politics were (-6.0, 24.0) rounded.  

Set limits for what data to include by percentile.  Use only data less than lowPercentile or greater than highPercentile.

In [10]:
lowPercentile, highPercentile = 3, 97

In [11]:
srDigestR = {key : (round(subredditDigest[key].percentile(lowPercentile)), 
                    round(subredditDigest[key].percentile(highPercentile))) 
             for key in subredditDigest.keys()}

srDigestR['politics']

len(srDigestR)

print srDigestR

print len(srDigestR)

print df.filter(df['subreddit'] == 'politics').count()

for key in srDigestR.keys():
    print key, df.filter(df['subreddit'] == key).count()

Filter records to retain only top and bottom 3% of comment scores.

<b>NOTE:  NEED TO BROADCAST srDigestR WHEN I MOVE TO CLUSTER.

In [12]:
rRDDExtreme = rRDD.filter(lambda (k,v): v[4] < srDigestR[v[5]][0] or v[4] > srDigestR[v[5]][1])

rRDDExtreme.count()

# 3.0  Calculate timeSince

Calculate time since post was created based on created_utc and min_created_utc from pair RDD.  In Alyssa's IPython notebook this is called timeSince.  In her R code it's called recency.  

For this I need a left outer join.  For each element (k, v) in self, the resulting RDD will either contain all pairs (k, (v, w)) for w in other, or the pair (k, (v, None)) if no elements in other have key k.  I need to know if there are records in my comment dataset for which there is no "minimum time", as this would indicate a processing error.

Map RDD to get post link_id as key, then join with minTimeRDD.

<b>CAN I USE DATAFRAMES FOR ALL THIS? GETTING TIRED OF REMEMBERING WHAT v[4] IS.  </b>

The only data I need for regression are:  C(subreddit) + timeSince + commentLength + posNegDiff.  Need to keep only comment id (key), score, subreddit, timeSince, comment text from this step.

Format of output RDD is (id,(body,timeSince,score,subreddit))

In [13]:
rRDDXts = (rRDDExtreme.map(lambda (k,v):  (v[2],(k,v[0],v[1],v[2],v[3],v[4],v[5],v[6])))  # pull link_id as key
                      .leftOuterJoin(minTimeRDD) # join on link_id (post)
                      .map(lambda (link_id,(x,min_utc)):  (x[0], (x[1],x[2]-min_utc,x[5],x[6])))
                      .cache()
          )

rRDDXts.take(10)

# 4.0  Calculate commentLength

Clean comment body and calculate commentLength.

R gsub:
gsub(pattern, replacement, x, ignore.case = FALSE, perl = FALSE, fixed = FALSE, useBytes = FALSE)

Python re:
re.sub(pattern, repl, string, count=0, flags=0).  Return the string obtained by replacing the leftmost non-overlapping occurrences of pattern in string by the replacement repl.

<B>NOTE:  ALYSSA REMOVED QUOTED COMMENTS.  I REMOVING THEM ALSO BUT IN MY FIRST EXAMPLE I FOUND A "MADE UP" QUOTE THAT ISN'T REALLY QUOTING SOMEONE ELSE'S POST.  

In [14]:
def cleanup(body):

	# Recode HTML codes
	body = re.sub("&gt;", ">", body)
	body = re.sub("&lt;", "<", body)
	body = re.sub("&amp;", "&", body)
	body = re.sub("&nbsp;", " ", body)

	# Remove deleted
	body = re.sub("^[deleted]$", "", body)

	# Remove URL
	body = re.sub("http[[:alnum:][:punct:]]*", " ", body) # url

	# Remove /r/subreddit, /u/user
	body = re.sub("/r/[[:alnum:]]+|/u/[[:alnum:]]+", " ", body)

	# Remove quoted comments
	body = re.sub("(>.*?\\n\\n)+", " ", body)

	# Remove control characters (\n, \b)
	body = re.sub("[[:cntrl:]]", " ", body)

	# Remove single quotation marks (contractions)
	body = re.sub("'", "", body)

	# Remove punctuation
	body = re.sub("[[:punct:]]", " ", body)

	# Replace multiple spaces with single space
	body = re.sub("\\s+", " ", body) # Multiple spaces
	# body = re.sub("^\\s+", "", body) # Space at the start of the string
	# body = re.sub("+\\s$", "", body) # Space at the end of the string
	body = body.strip()

	# Lower case
	body = body.lower()

	# Return comment length (number of words) and body (cleaned up text)
	return body

clbody = cleanup(u"Basically, the hospital's position amounts to:\n\n&gt; If she can't hold her roofies she deserves to be a**f****d and denied medical care and collection of evidence!\n\nNot the *most* progressive attitude...")

print len(clbody.split())

print clbody

Current format of RDD:  (id,(body,timeSince,score,subreddit))
Format of rRDDXtscl:  (id,(commentLength,body,timeSince,score,subreddit))

In [15]:
rRDDXtscl = (rRDDXts.map(lambda (id,(body,timeSince,score,subreddit)): (id,(cleanup(body),timeSince,score,subreddit)))
                    .map(lambda (id,(body,timeSince,score,subreddit)): (id,(len(body.split()),body,timeSince,score,subreddit)))
             )

# 5.0 (Filter out exclusions if necessary; skip for now)

Filter out exclusions.  Further reduces dataset.

# 6.0 Run sentiment analysis and calculate posNegDiff

Use AFINN model to do sentiment analysis.

Finn Årup Nielsen, "A new ANEW: evaluation of a word list for sentiment analysis in microblogs" , Proceedings of the ESWC2011 Workshop on 'Making Sense of Microposts': Big things come in small packages 718 in CEUR Workshop Proceedings: 93-98. 2011 May. Matthew Rowe, Milan Stankovic, Aba-Sah Dadzie, Mariann Hardey (editors)

<B>I'M WONDERING IF CREATING AN AFINN OBJECT EACH TIME IS TAKING TOO LONG.  MIGHT WANT TO REFACTOR THIS TO A PYTHON FUNCTION WITH A DICT LOOKUP AND USE A BROADCAST VARIABLE.  

In [16]:
def sentiment(body):
    afinn = Afinn()
    return afinn.score(body)

sentiment("This is utterly excellent!")

In [17]:
rRDDtscls = (rRDDXtscl.map(lambda (id,(commentLength,body,timeSince,score,subreddit)):  
                        (id,(commentLength,sentiment(body),timeSince,score,subreddit)))
                      .cache()
             )

rRDDtscls.take(5)

rRDDtscls.count()

# 7.0 Set up logistic regression inputs with OHE features for categorical variable subredddit

Calculate label from score using srDigestR and create rawData RDD in proper format:  (label, non-categorical variables, categorical variable)

In [18]:
def label(score, subreddit, percentMap):
    if score <= percentMap[subreddit][0]: return 0
    else: return 1

Format of rRDDtscls:  (id,(commentLength,posNegDiff,timeSince,score,subreddit)))

Format of rawData is a tuple:  (label, (0,commentLength), (1,posNegDiff), (2,timeSince), subreddit))

def tupToString(tup):
    return ','.join([str(item) for item in tup])

In [19]:
rawData = (rRDDtscls.map(lambda (id,(commentLength,posNegDiff,timeSince,score,subreddit)):  
                    (label(score,subreddit,srDigestR), (0,commentLength), (1,posNegDiff), (2,timeSince), subreddit))
                    .cache()
          )

type(rawData)

rawData.take(5)

rawData.count()

Split data into training, validation and test. Takes about 1-2 minutes for ~5 MB data set, greatly reduced from 85 MB.

In [20]:
weights = [.8, .1, .1]
seed = 42
# Use randomSplit with weights and seed
rawTrainData, rawValData, rawTestData = rawData.randomSplit(weights, seed)

# Cache the data
rawTrainData.cache()
rawValData.cache()
rawTestData.cache()

# These counts are expensive:  ~1 hour for 10 GB input data.
# nAll = rawData.count()
# nTrain = rawTrainData.count()
# nVal = rawValData.count()
# nTest = rawTestData.count()
# print nTrain, nVal, nTest, nTrain + nVal + nTest, nAll

# print rawData.take(1)

PythonRDD[34] at RDD at PythonRDD.scala:43

Create one hot encoding dictionary.

I DON'T NEED ML LAB 4 createOneHotDict BECAUSE I ALREADY HAVE A LIST OF SUBREDDITS IN srDigestR.  Just need to pull keys out of this dict to create my OHEdict. NOTE that my OHEdict is already shifted by 3 to avoid collision with non-categorical variables.

<B> NEED TO FIX OHEdict:  should only pull categories out of TRAINING set, NOT entire set.  Otherwise it's cheating.

OHEdict = {(0, v): k+3 for (k, v) in enumerate(srDigestR)}

print OHEdict

I am getting all 1's as predicted values when I run using all subreddits.   I suspect that this is because I just don't have enough data.  The only subreddits that have more than 10,000 comments are programming, politics and reddit.com.  I am hard-coding this to NOT use OHEdict for now.  Instead, I am manually creating my dictionary so that the encoding will be

programming [1 0 0]
politics [0 1 0]
reddit.com [0 0 1]
other [0 0 0]

Later when I scale up, I will re-set values in the variable "subreddit" to have only values that are above a certain threshold in number of comments.  I won't know the right threshold until I read in all the data and look at it.  There will probably be thousands of subreddits with too few comments to do regression on.  

In [21]:
def createOHEMap(sr):
    if sr == u'programming': return (3,1)
    elif sr == u'politics' : return (4,1)
    elif sr == u'reddit.com' : return (5,1)
    else: return (5,0)

OHEdict[(0,'politics')]

Create OHETrainData.

In [22]:
def createLabeledPoint(point,numFeats):
    label = point[0]
    feats = point[1:]
    sv = SparseVector(numFeats, feats)
    # print sv
    return LabeledPoint(label, sv)

len(OHEdict)+3

createLabeledPoint((1, (0, 36), (1, -11.0), (2, 811), (25, 1)),   len(OHEdict+3  )

In [23]:
# numFeats = len(OHEdict)+3
numFeats = 6
OHETrainData = (rawTrainData.map(lambda (label, t1, t2, t3, sr):
                                       (label, t1, t2, t3, createOHEMap(sr) ))
                            .map(lambda point:  createLabeledPoint(point, numFeats))
                )


In [None]:
OHETrainData.cache()

OHETrainData.take(10)

(Create OHEValData and OHETestData; skip for now)

OHEValData = (rawValData.map(lambda (label, t1, t2, t3, sr):
                                       (label, t1, t2, t3, (OHEdict[(0,sr)], 1) ))
                            .map(lambda point:  createLabeledPoint(point, numFeats))
                )

OHETestData = (rawTestData.map(lambda (label, t1, t2, t3, sr):
                                       (label, t1, t2, t3, (OHEdict[(0,sr)], 1) ))
                            .map(lambda point:  createLabeledPoint(point, numFeats))
                )

# 8.0 Run logistic regression

Set up hyperparameters

In [24]:
# fixed hyperparameters
numIters = 50
stepSize = 10.
regParam = 1e-6
regType = 'l2'
includeIntercept = True

Run logistic regression.

<b> LATER, ADD VALIDATION STEP:  ITERATE OVER HYPERPARAMETERS TO FIND BEST COMBINATION.

In [25]:
model0 = LogisticRegressionWithSGD.train(OHETrainData, iterations = numIters, step = stepSize,
                                        regParam = regParam, regType = regType,
                                        intercept = includeIntercept)
sortedWeights = sorted(model0.weights)
# print sortedWeights[:5], model0.intercept

print sortedWeights, model0.intercept

In [26]:
print model0.weights, model0.intercept

[799.436724878,14.2447331487,-5895.16997467,0.0258696469419,0.292561270905,0.0] 17.250306938


# 9.0 Evaluate results using test set

Calculate accuracy = number of correctly classified examples / total number of examples.

In [27]:
model0TotalCorrect = OHETrainData.map(lambda point:  1 if model0.predict(point.features) == point.label else 0).sum()

In [28]:
print model0TotalCorrect

382101


In [29]:
OHETrainDataCount = OHETrainData.count()

In [30]:
print OHETrainDataCount

796082


In [31]:
model0Accuracy = float(model0TotalCorrect) / float(OHETrainData.count())

In [32]:
print model0Accuracy

0.479976937049


<b> TO DO:  Calculate ROC AUC (receiver operating characteristic - area under curve).  

<b>TO DO:  calculate confusion matrix.

In [33]:
trainingPredictionsAuto=(OHETrainData
                     .map(lambda lpoint: model0.predict(lpoint.features))
                     )

In [34]:
print trainingPredictionsAuto.take(150)

[1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]


In [52]:
numNotNinesAuto = trainingPredictionsAuto.filter(lambda P: 1-P > 0.001).count() # how many predictions are not ~= 1

In [53]:
print numNotNinesAuto

0


In [54]:
print trainingPredictionsAuto.count()

7027


Train model with default values.  Any better?

In [105]:
modelDefault = LogisticRegressionWithSGD.train(OHETrainData, iterations = numIters,
                                        intercept = includeIntercept)
sortedWeights = sorted(modelDefault.weights)
# print sortedWeights[:5], model0.intercept

In [106]:
print sortedWeights

[0.079614324903586983, 0.14585882859422947, 0.57116771102127339, 1.1500497467017805, 74.303512866796581, 615.93297828345442]


In [107]:
print modelDefault.weights

[74.3035128668,1.1500497467,615.932978283,0.0796143249036,0.145858828594,0.571167711021]


In [108]:
trainingPredDefault=(OHETrainData
                     .map(lambda lpoint: modelDefault.predict(lpoint.features))
                     )

In [109]:
print trainingPredDefault.take(150)

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


In [110]:
print trainingPredDefault.filter(lambda P: 1-P > 0.001).count()

0


ROC plot

def bucketFeatByCount(featCount):
    """Bucket the counts by powers of two."""
    for i in range(11):
        size = 2 ** i
        if featCount <= size:
            return size
    return -1

featCounts = (OHETrainData
              .flatMap(lambda lp: lp.features.indices)
              .map(lambda x: (x, 1))
              .reduceByKey(lambda x, y: x + y))
featCountsBuckets = (featCounts
                     .map(lambda x: (bucketFeatByCount(x[1]), 1))
                     .filter(lambda (k, v): k != -1)
                     .reduceByKey(lambda x, y: x + y)
                     .collect())

x, y = zip(*featCountsBuckets)
x, y = np.log(x), np.log(y)

def preparePlot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
                gridWidth=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hideLabels: axis.set_ticklabels([])
    plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

labelsAndScores = OHEValData.map(lambda lp:
                                            (lp.label, getP(lp.features, model0.weights, model0.intercept)))
labelsAndWeights = labelsAndScores.collect()
labelsAndWeights.sort(key=lambda (k, v): v, reverse=True)
labelsByWeight = np.array([k for (k, v) in labelsAndWeights])

length = labelsByWeight.size
truePositives = labelsByWeight.cumsum()
numPositive = truePositives[-1]
falsePositives = np.arange(1.0, length + 1, 1.) - truePositives

truePositiveRate = truePositives / numPositive
falsePositiveRate = falsePositives / (length - numPositive)

# Generate layout and plot data
fig, ax = preparePlot(np.arange(0., 1.1, 0.1), np.arange(0., 1.1, 0.1))
ax.set_xlim(-.05, 1.05), ax.set_ylim(-.05, 1.05)
ax.set_ylabel('True Positive Rate (Sensitivity)')
ax.set_xlabel('False Positive Rate (1 - Specificity)')
plt.plot(falsePositiveRate, truePositiveRate, color='#8cbfd0', linestyle='-', linewidth=3.)
plt.plot((0., 1.), (0., 1.), linestyle='--', color='#d6ebf2', linewidth=2.)  # Baseline model
pass