In [None]:
''' 
Ricky's Mobile Gaming with Spark
################################
This script uses a demo data set from Mixpanel for a hypothetical mobile game.  Our goal here is to 
build a random forest model to predict which users are most likely to make a purchase with the game.
This script will demonstrate a few key features of Apache Spark as we transform the data, train our
model, and make predictions on the data set.  This is not meant to be a definitive example of 
implementing machine learning, but is meant to demonstrate using Apache Spark for an ML application.

Some notes on what I've done:  I wanted to demonstrate a few key features of Spark:
1. Using mapping a la MapReduce to transform my data
2. Demonstrating the use of Spark Dataframes
3. Using accumulators to act as counters

I trade off in my solution between these three features to show how to use each of them.  Enjoy!!
################################
'''
from pyspark import SparkContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest
from pyspark.sql import SQLContext

# Initialize constants
EVENT_INDEX = 0
DISTINCT_ID_INDEX = 19
EVENT_LIST = ['Level Completed', 'In-App Purchase', 'Game Played', 'Tutorial Exited', 'App Install', 'Character Created', '$experiment_started', 'Registration Complete', '$campaign_received', 'App Open', 'Session End', '$campaign_delivery']
TARGET_EVENT = 'In-App Purchase'

# Initialize accumulators for aggregate counting
count = sc.accumulator(0)
numCorr = sc.accumulator(0)
numFP = sc.accumulator(0)
numFN = sc.accumulator(0)

# Custom partitioner that splits data based on distinct_id associated with each event
def getDistinctId(row):
    r = []
    r = row.split(",")
    return r[DISTINCT_ID_INDEX]

# Map function that takes each row of data and outputs (distinct_id, event_name)
def idEventMapper(x):
    row = x.split(",")
    id = row[DISTINCT_ID_INDEX].encode('ascii', 'ignore')
    event = row[EVENT_INDEX].encode('ascii', 'ignore')
    return [id, event]

# Function that takes events grouped by distinct_id and outputs (id, freqDict)
# where freqDict is a dictionary with key=event_name, value=num times user has done event
def freqDictBuilder(x):
    id = x[0]
    freqDict = {}
    for each in x[1]:
        if (each in freqDict):
            freqDict[each] += 1
        else:
            freqDict[each] = 1

    return [id, freqDict]

# Function that takes EVENT_LIST and removes TARGET_EVENT
def makeEventList(target):
    eventVec = []
    for each in EVENT_LIST:
        if (each != target):
            eventVec.append(each)
    return eventVec

# Map function to convert FreqDict RDD to LabeledPoint RDD
def makeLabelPoints(row, eventList, target):
    vals = row[1]
    l = 0
    keys = vals.keys()
    if (target in keys):
        l = 1
    
    vec = []
    for each in eventList:
        if (each in keys):
            ct = vals[each]
        else:
            ct = 0
        vec.append(ct)

    count.add(1)
    return LabeledPoint(l, vec)

# Helper functions for counting number of correct predictions, false positives, and false negatives
def getCorrAndFP(row):
    actual = int(row['label'])
    prediction = int(row['prediction'])
    numCorrect(actual, prediction)
    numFalsePos(actual, prediction)

def numCorrect(actual, prediction):
    if (actual == prediction):
        numCorr.add(1)

def numFalsePos(actual, prediction):
    if (actual == 0 & prediction == 1):
        numFP.add(1)

def numFalseNeg(actual, prediction):
    if (prediction == 0 & actual == 1):
        numFN.add(1)

# Main Function - load data and run mapper to extract tuple of (distinct_id, event_name)
r = sc.textFile("rickys_data.csv", 200, getDistinctId)
result = r.map(lambda x: idEventMapper(x)).filter(lambda x: x[0] != '').filter(lambda x: x[0] != 'property_distinct_id')

# Group tuples by distinct_id, then build Frequency Dictionary for each distinct_id
res = result.groupByKey()
f = res.map(lambda x: freqDictBuilder(x))

# Take list of events and remove our target event (one we are predicting)
eventL = makeEventList(TARGET_EVENT)

# Build RDD of LabeledPoints to train Random Forest: LabeledPoint(classification, feature vector)
labels = f.map(lambda x: makeLabelPoints(x, eventL, TARGET_EVENT))

# train model on RDD of LabeledPoints - see https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#module-pyspark.mllib.tree
model = RandomForest.trainClassifier(labels, 2, {}, 4, seed=42)

# make predictions
print model.predict([9.0,9.0,1.0,1.0,7.0,1.0,1.0,0.0,2.0,1.0,1.0])
print model.predict([1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,1.0])
print model.predict([3.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,2.0,1.0])

# Demonstrating Spark DataFrames here by creating a DataFrame and making predictions over each user and adding new column
labels_df = SQLContext(sc).createDataFrame(labels)
predictions = model.predict(labels_df.map(lambda row: row.features))
predicted_df = labels_df.map(lambda row: row.label).zip(predictions).toDF().withColumnRenamed('_1', 'label').withColumnRenamed('_2', 'prediction')

# Variables for the number of predicted and actual positives and negatives
predNeg = predicted_df[predicted_df['prediction'] == 0].count()
predPos = predicted_df[predicted_df['prediction'] == 1].count()
actNeg = predicted_df[predicted_df['label'] == 0].count()
actPos = predicted_df[predicted_df['label'] == 1].count()

# Applying my wrapper function to get number of correct predictions, false positives, and fales negatives.
# This is an example of a row-wise operation with Pandas and Spark
predicted_df.map(lambda row: getCorrAndFP(row))
numCorrect = numCorr.value
numFalsePos = numFP.value
numFalseNeg = numFN.value

# Calculation of percent of predictions that are correct, precision, and recall
ct = predicted_df.count()
pctCorr = numCorrect / float(ct)
precision = float(actPos) / (actPos + numFalsePos)
recall = float(actPos) / (actPos + numFalseNeg)

# Output results
print "Total percent correct: %.2f" % pctCorr
print "Precision: %.2f" % precision
print "Recall: %.2f" % recall
print "F1 Score: %.2f" % float(2 * (precision * recall) / (precision + recall))