In [2]:
import json
import time
import pytz
import traceback
import time_uuid
from pytz import timezone
from datetime import datetime
from pyspark.sql import types
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SQLContext, Row
from pyspark import SparkContext, SparkConf
from config import *

In [3]:
import warnings
import matplotlib
warnings.filterwarnings('ignore')

%matplotlib inline
import pandas as pd

In [4]:
sc.stop()

### Connecting to Cassandra and loading data into Pyspark's Dataframes

In [5]:
conf = SparkConf()\
    .setAppName(APPNAME)\
    .setMaster(MASTER)\
    .set("spark.cassandra.connection.host", CASSANDRA_HOST)\
    .set("spark.cassandra.connection.port", CASSANDRA_PORT)\
    .set("spark.cassandra.auth.username", CASSANDRA_USERNAME)\
    .set("spark.cassandra.auth.password", CASSANDRA_PASSWORD)

In [6]:
sc = SparkContext(MASTER, APPNAME, conf=conf)
sqlContext = SQLContext(sc)
sqlContext.sql("""CREATE TEMPORARY TABLE %s \
                  USING org.apache.spark.sql.cassandra \
                  OPTIONS ( table "%s", \
                            keyspace "%s", \
                            cluster "Test Cluster", \
                            pushdown "true") \
              """ % (TABLE_QUERYABLE, TABLE_QUERYABLE, KEYSPACE))

DataFrame[]

## Creating a Random Forest Classifier ML Model on the Cassandra Data

In [68]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
import random
import functools
from pyspark.ml.feature import OneHotEncoder
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint

#### Loading data from Cassandra 

In [69]:
tableData = sqlContext.sql("SELECT * FROM %s" % (TABLE_QUERYABLE))
tableData.dtypes

[('bucket_id', 'string'),
 ('unix_timestamp', 'bigint'),
 ('age', 'int'),
 ('city', 'string'),
 ('created_at', 'timestamp'),
 ('email', 'string'),
 ('event_id', 'string'),
 ('event_name', 'string'),
 ('gender', 'string'),
 ('job', 'string'),
 ('name', 'string'),
 ('zipcode', 'string')]

In [70]:
cols_select = ['age', 'city', 'email', 'gender', 'job', 'zipcode']
df = tableData.select(cols_select).dropDuplicates()
df.show(5, False)

+---+--------------+--------------------------+------+------------------------+-------+
|age|city          |email                     |gender|job                     |zipcode|
+---+--------------+--------------------------+------+------------------------+-------+
|60 |Zacharyborough|msingleton@green-myers.com|M     |Veterinary surgeon      |25009  |
|60 |Torreschester |robert38@gmail.com        |M     |Dancer                  |85770  |
|20 |Kevinport     |janet94@bennett.net       |F     |Risk analyst            |32970  |
|51 |Zacharyview   |powelljames@osborne.info  |M     |Journalist, broadcasting|67384  |
|32 |Lunaside      |cookerika@sandoval.com    |F     |Geologist, engineering  |58001  |
+---+--------------+--------------------------+------+------------------------+-------+
only showing top 5 rows



#### Adding user behavior (POSITIVE, NOT-POSITIVE) randomly as our data is randomly generated

In [71]:
def func(d):
    p = {}
    for x in d:
        p['age'] = d.age
        p['city'] = d.city
        p['email'] = d.email
        p['gender'] = d.gender
        p['job'] = d.job
        p['zipcode'] = d.zipcode
    p['behaviour'] = str(random.choice(['POSITIVE', 'NEGATIVE']))
    return p

features = df.map(lambda x: func(x))

In [72]:
features.toDF().show()

+---+---------+-----------------+--------------------+------+--------------------+-------+
|age|behaviour|             city|               email|gender|                 job|zipcode|
+---+---------+-----------------+--------------------+------+--------------------+-------+
| 60| NEGATIVE|   Zacharyborough|msingleton@green-...|     M|  Veterinary surgeon|  25009|
| 60| NEGATIVE|    Torreschester|  robert38@gmail.com|     M|              Dancer|  85770|
| 20| NEGATIVE|        Kevinport| janet94@bennett.net|     F|        Risk analyst|  32970|
| 51| POSITIVE|      Zacharyview|powelljames@osbor...|     M|Journalist, broad...|  67384|
| 32| NEGATIVE|         Lunaside|cookerika@sandova...|     F|Geologist, engine...|  58001|
| 29| NEGATIVE|         Riceberg| diazterri@yahoo.com|     F|Architectural tec...|  53955|
| 56| POSITIVE|     North Sandra|vjimenez@hotmail.com|     F|Historic building...|  11325|
| 53| POSITIVE|          New Amy|jeffreyarmstrong@...|     F|     Publishing copy|  20437|

#### Performing label encoding on the columns

In [73]:
from pyspark.sql.functions import UserDefinedFunction
def labelForAge(s):
    s = int(s)
    if s <= 20:
        return 0.0
    elif s > 20 and s <= 40:
        return 1.0
    elif s > 40 and s <= 50:
        return 2.0
    elif s > 50 and s <= 60:
        return 3.0
    else:
        return -1.0

def labelForCity(s):
    if len(s) <= 8:
        return 0.0
    elif s > 8 and s <= 10:
        return 1.0
    elif s > 10 and s <= 14:
        return 2.0
    elif s > 14 and s <= 20:
        return 3.0
    else:
        return -1.0

def labelForEmail(s):
    if len(s) <= 5:
        return 0.0
    elif s > 5 and s <= 7:
        return 1.0
    elif s > 7 and s <= 9:
        return 2.0
    elif s > 9 and s <= 12:
        return 3.0
    else:
        return -1.0

def labelForGender(s):
    if s == 'M':
        return 0.0
    elif s == 'F':
        return 1.0
    else:
        return -1.0
    
def labelForJob(s):
    s = s.lower()
    if 'engineer' in s:
        return 0.0
    elif 'architect' in s:
        return 1.0
    elif 'analyst' in s:
        return 2.0
    elif 'designer' in s:
        return 3.0
    elif 'officer' in s:
        return 4.0
    elif 'teacher' in s:
        return 5.0
    elif 'it' in s:
        return 6.0
    else:
        return -1.0   

def labelForZipcode(s):
    s = int(s)
    if s <= 10000:
        return 0.0
    elif s > 10000 and s <= 30000:
        return 1.0
    elif s > 30000 and s <= 50000:
        return 2.0
    elif s > 50000 and s <= 70000:
        return 3.0
    elif s > 70000 and s <= 90000:
        return 4.0
    elif s > 90000:
        return 5.0
    else:
        return -1.0

label_Age = UserDefinedFunction(labelForAge, DoubleType())
label_City = UserDefinedFunction(labelForCity, DoubleType())
label_Email = UserDefinedFunction(labelForEmail, DoubleType())
label_Gender = UserDefinedFunction(labelForGender, DoubleType())
label_Job = UserDefinedFunction(labelForJob, DoubleType())
label_Zipcode = UserDefinedFunction(labelForZipcode, DoubleType())

features_df = features.toDF()
labeledData = features_df.select(label_Age(features_df.age).alias('age_label'),\
                              label_City(features_df.city).alias('city_label'),\
                              label_Email(features_df.email).alias('email_label'),\
                              label_Gender(features_df.gender).alias('gender_label'),\
                              label_Job(features_df.job).alias('job_label'),\
                              label_Zipcode(features_df.zipcode).alias('zipcode_label'),\
                              features_df.behaviour)

In [74]:
labeledData.dtypes

[('age_label', 'double'),
 ('city_label', 'double'),
 ('email_label', 'double'),
 ('gender_label', 'double'),
 ('job_label', 'double'),
 ('zipcode_label', 'double'),
 ('behaviour', 'string')]

#### This is final labeled data, we are ready to train the model now

In [75]:
labeledData.show(5, False)

+---------+----------+-----------+------------+---------+-------------+---------+
|age_label|city_label|email_label|gender_label|job_label|zipcode_label|behaviour|
+---------+----------+-----------+------------+---------+-------------+---------+
|3.0      |-1.0      |-1.0       |0.0         |-1.0     |1.0          |NEGATIVE |
|3.0      |-1.0      |-1.0       |0.0         |-1.0     |4.0          |NEGATIVE |
|0.0      |-1.0      |-1.0       |1.0         |2.0      |2.0          |NEGATIVE |
|3.0      |-1.0      |-1.0       |0.0         |-1.0     |3.0          |POSITIVE |
|1.0      |0.0       |-1.0       |1.0         |0.0      |3.0          |NEGATIVE |
+---------+----------+-----------+------------+---------+-------------+---------+
only showing top 5 rows



#### Splitting our data into training and test data. 80%-20%

In [76]:
allData = pipeline.fit(labeledData).transform(labeledData)
allData.cache()
trainingData, testData = allData.randomSplit([0.8,0.2], seed=0) # need to ensure same split for each time
print("Distribution of Pos and Neg in trainingData is: ", trainingData.groupBy("label").count().take(3))

('Distribution of Pos and Neg in trainingData is: ', [Row(label=1.0, count=40563), Row(label=0.0, count=39615)])


#### Converting the columns into label points as required by the model

In [53]:
td = trainingData.map(lambda row: LabeledPoint(row.label, row.features))

#### Creating the model

In [55]:
model = DecisionTree.trainClassifier(td, numClasses=2, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=5, maxBins=32)

In [57]:
model.toDebugString()

u'DecisionTreeModel classifier of depth 5 with 61 nodes\n  If (feature 0 <= 0.0)\n   If (feature 1 <= -1.0)\n    If (feature 4 <= -1.0)\n     If (feature 5 <= 0.0)\n      If (feature 3 <= 0.0)\n       Predict: 0.0\n      Else (feature 3 > 0.0)\n       Predict: 1.0\n     Else (feature 5 > 0.0)\n      If (feature 5 <= 2.0)\n       Predict: 1.0\n      Else (feature 5 > 2.0)\n       Predict: 1.0\n    Else (feature 4 > -1.0)\n     If (feature 3 <= 0.0)\n      If (feature 5 <= 4.0)\n       Predict: 1.0\n      Else (feature 5 > 4.0)\n       Predict: 0.0\n     Else (feature 3 > 0.0)\n      If (feature 5 <= 4.0)\n       Predict: 0.0\n      Else (feature 5 > 4.0)\n       Predict: 1.0\n   Else (feature 1 > -1.0)\n    If (feature 5 <= 4.0)\n     If (feature 5 <= 2.0)\n      If (feature 3 <= 0.0)\n       Predict: 0.0\n      Else (feature 3 > 0.0)\n       Predict: 1.0\n     Else (feature 5 > 2.0)\n      If (feature 5 <= 3.0)\n       Predict: 0.0\n      Else (feature 5 > 3.0)\n       Predict: 0.0\n  

#### Now, performing prediction using model on testData

In [78]:
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

#### Calculating test errors

In [79]:
testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
print('Test Error = ' + str(testErr))

Test Error = 0.500655506253


#### Saving the model for future use and online models

In [80]:
model.save(sc, "my_model.model")