# Using ML

In [70]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
df = sqlCtx\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv('file:///home/devuser1/online-retail-dataset.csv')

In [6]:
df.columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country']

In [7]:
df.schema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

In [84]:
df.limit(5).collect()

[Row(InvoiceNo=u'536365', StockCode=u'85123A', Description=u'WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate=u'12/1/2010 8:26', UnitPrice=2.55, CustomerID=17850, Country=u'United Kingdom'),
 Row(InvoiceNo=u'536365', StockCode=u'71053', Description=u'WHITE METAL LANTERN', Quantity=6, InvoiceDate=u'12/1/2010 8:26', UnitPrice=3.39, CustomerID=17850, Country=u'United Kingdom'),
 Row(InvoiceNo=u'536365', StockCode=u'84406B', Description=u'CREAM CUPID HEARTS COAT HANGER', Quantity=8, InvoiceDate=u'12/1/2010 8:26', UnitPrice=2.75, CustomerID=17850, Country=u'United Kingdom'),
 Row(InvoiceNo=u'536365', StockCode=u'84029G', Description=u'KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, InvoiceDate=u'12/1/2010 8:26', UnitPrice=3.39, CustomerID=17850, Country=u'United Kingdom'),
 Row(InvoiceNo=u'536365', StockCode=u'84029E', Description=u'RED WOOLLY HOTTIE WHITE HEART.', Quantity=6, InvoiceDate=u'12/1/2010 8:26', UnitPrice=3.39, CustomerID=17850, Country=u'United Kingdom')]

## handle missing values and extract day of week

In [73]:
from pyspark.sql.functions import date_format, col, unix_timestamp

prepped_df = df\
    .na.fill(0)\
    .withColumn("day_of_week", date_format(unix_timestamp(col("InvoiceDate"), 
                                                          "MM/dd/yyyy HH:mm").cast('timestamp'), "EEEE"))\
    .coalesce(5)

In [75]:
prepped_df.limit(5).collect()

[Row(InvoiceNo=u'536365', StockCode=u'85123A', Description=u'WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate=u'12/1/2010 8:26', UnitPrice=2.55, CustomerID=17850, Country=u'United Kingdom', day_of_week=u'Wednesday'),
 Row(InvoiceNo=u'536365', StockCode=u'71053', Description=u'WHITE METAL LANTERN', Quantity=6, InvoiceDate=u'12/1/2010 8:26', UnitPrice=3.39, CustomerID=17850, Country=u'United Kingdom', day_of_week=u'Wednesday'),
 Row(InvoiceNo=u'536365', StockCode=u'84406B', Description=u'CREAM CUPID HEARTS COAT HANGER', Quantity=8, InvoiceDate=u'12/1/2010 8:26', UnitPrice=2.75, CustomerID=17850, Country=u'United Kingdom', day_of_week=u'Wednesday'),
 Row(InvoiceNo=u'536365', StockCode=u'84029G', Description=u'KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, InvoiceDate=u'12/1/2010 8:26', UnitPrice=3.39, CustomerID=17850, Country=u'United Kingdom', day_of_week=u'Wednesday'),
 Row(InvoiceNo=u'536365', StockCode=u'84029E', Description=u'RED WOOLLY HOTTIE WHITE HEART.', Quantity=

In [76]:
prepped_df.schema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,false),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true),StructField(day_of_week,StringType,true)))

## split data into train and test data sets

In [77]:
train_df, test_df = prepped_df.randomSplit([0.8, 0.2], seed=12345)

## convert string column values into numeric indexers

In [78]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer()\
    .setInputCol("day_of_week")\
    .setOutputCol("day_of_week_index")

## use one hot encoding

In [79]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder()\
    .setInputCol("day_of_week_index")\
    .setOutputCol("day_of_week_encoded")

## leverage vector assembler

In [80]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
    .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
    .setOutputCol("features")

## set up transformation pipeline

In [81]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
    .setStages([indexer, encoder, vectorAssembler])

## transform training data

In [82]:
fittedPipeline = transformationPipeline.fit(train_df)
transformed_train_df = fittedPipeline.transform(train_df)

In [83]:
transformed_train_df.limit(5).collect()

[Row(InvoiceNo=u'536365', StockCode=u'21730', Description=u'GLASS STAR FROSTED T-LIGHT HOLDER', Quantity=6, InvoiceDate=u'12/1/2010 8:26', UnitPrice=4.25, CustomerID=17850, Country=u'United Kingdom', day_of_week=u'Wednesday', day_of_week_index=3.0, day_of_week_encoded=SparseVector(5, {3: 1.0}), features=SparseVector(7, {0: 4.25, 1: 6.0, 5: 1.0})),
 Row(InvoiceNo=u'536365', StockCode=u'22752', Description=u'SET 7 BABUSHKA NESTING BOXES', Quantity=2, InvoiceDate=u'12/1/2010 8:26', UnitPrice=7.65, CustomerID=17850, Country=u'United Kingdom', day_of_week=u'Wednesday', day_of_week_index=3.0, day_of_week_encoded=SparseVector(5, {3: 1.0}), features=SparseVector(7, {0: 7.65, 1: 2.0, 5: 1.0})),
 Row(InvoiceNo=u'536365', StockCode=u'71053', Description=u'WHITE METAL LANTERN', Quantity=6, InvoiceDate=u'12/1/2010 8:26', UnitPrice=3.39, CustomerID=17850, Country=u'United Kingdom', day_of_week=u'Wednesday', day_of_week_index=3.0, day_of_week_encoded=SparseVector(5, {3: 1.0}), features=SparseVector(7

## use logistic regression

In [None]:
from pyspark.ml.classification import LogisticRegression
e = LogisticRegression(featuresCol="features", labelCol="label_col", weightCol="class_weight", regParam=0.01)

### train model over training data

In [None]:
model = e.fit(transformed_train_df)

### make predictions over training and test data

In [None]:
train_predictions = model.transform(transformed_train_df)
test_predictions = model.transform(transformed_test_df)

### use multi class classification evaluator to evaluate the trained model

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label_col")

### compute various evaluation measures over training and test data

In [None]:
train_acc = evaluator.evaluate(train_predictions, {evaluator.metricName: "accuracy"})
train_f1 = evaluator.evaluate(train_predictions, {evaluator.metricName: "f1"})
train_precision = evaluator.evaluate(train_predictions, {evaluator.metricName: "weightedPrecision"})
train_recall = evaluator.evaluate(train_predictions, {evaluator.metricName: "weightedRecall"})

test_acc = evaluator.evaluate(test_predictions, {evaluator.metricName: "accuracy"})
test_f1 = evaluator.evaluate(test_predictions, {evaluator.metricName: "f1"})
test_precision = evaluator.evaluate(test_predictions, {evaluator.metricName: "weightedPrecision"})
test_recall = evaluator.evaluate(test_predictions, {evaluator.metricName: "weightedRecall"})