In [1]:
import time
from pyspark.sql import SparkSession, functions

start = time.time()

spark = SparkSession.builder.appName('ecomm').getOrCreate()

input_bucket = 's3://pyspark-emr-week4'
input_path = '/eCommerce.csv'

# Step 1: Load the Data
df_raw = spark.read.csv(input_bucket + input_path, header = True, inferSchema = True)

print(time.time() - start)

df_raw.printSchema()

3.0.0


In [4]:
# Step 2: Prepare data for classification.

start = time.time()

# Create a new column which just uses first word before ‘.’ in category_code.
split_col = functions.split(df_raw['category_code'], '\.')
df_new = df_raw.withColumn('category', split_col.getItem(0))

print(time.time() - start)

Wall time: 42.9 ms


In [5]:
# Step 2: Prepare data for classification.

start = time.time()

# Use category_code, brand, and price as input variables.
# Use event type as target variable.
df = df_new[['category', 'brand', 'price', 'event_type']]

print(time.time() - start)

df.show()

+-----------+---------+-------+----------+
|   category|    brand|  price|event_type|
+-----------+---------+-------+----------+
|electronics|   huawei| 270.42|      view|
|electronics|    yasin| 359.08|      view|
| appliances|  almacom| 180.16|      view|
|    apparel|  respect|  44.79|      view|
|electronics|  samsung| 150.95|      view|
|electronics|   xiaomi|  98.51|      cart|
|electronics|    apple| 180.13|      view|
|electronics|    casio|   27.0|      view|
|electronics|  samsung| 385.85|      view|
|electronics|  samsung| 396.15|      view|
|  computers|     asus| 357.25|      view|
|  computers|     acer| 437.57|      view|
|electronics|    artel| 231.64|      view|
|       kids|   chicco| 141.55|      view|
|  furniture|      brw| 387.14|      view|
| appliances|    artel|  93.62|      view|
|electronics|  samsung| 130.99|      view|
|electronics|     oris|1891.94|      view|
|electronics|changhong| 359.37|      view|
| appliances|  gorenje| 373.21|      view|
+----------

In [6]:
# Step 2: Prepare data for classification.

start = time.time()

# Change categorical variables to numerical variables if necessary by using one-hot encoding. 
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

categoricalColumns = ['category', 'brand']
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'event_type', outputCol = 'label')
stages += [label_stringIdx]

numericCols = ['price'] # Numerical Features
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df_pipe = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + df.columns
df_select = df_pipe.select(selectedCols)

print(time.time() - start)

df_select.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- category: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- event_type: string (nullable = true)

Wall time: 24.9 s


In [7]:
# Step 3: Split data into train and test.

start = time.time()

train, test = df_select.randomSplit([0.7, 0.3], seed = 2018)

print(time.time() - start)

print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 6137234
Test Dataset Count: 2627771
Wall time: 1min 18s


In [8]:
# Step 3: Use Logistic Regression to perform classification on event type (view, cart, purchase).

start = time.time()

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

predictions = lrModel.transform(test)
predictions.select('category', 'brand', 'price', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

print(time.time() - start)

+-----------+-------+-----+-----+--------------------+----------+--------------------+
|   category|  brand|price|label|       rawPrediction|prediction|         probability|
+-----------+-------+-----+-----+--------------------+----------+--------------------+
|electronics|samsung| 5.64|  0.0|[1.81796790662471...|       0.0|[0.87965783234372...|
|electronics|samsung| 5.64|  0.0|[1.81796790662471...|       0.0|[0.87965783234372...|
|electronics|samsung| 6.16|  0.0|[1.81824177641175...|       0.0|[0.87970283194473...|
|electronics|samsung| 6.22|  0.0|[1.81827337677180...|       0.0|[0.87970802325653...|
|electronics|samsung| 6.67|  0.0|[1.81851037947213...|       0.0|[0.87974695183188...|
|electronics|samsung| 6.67|  0.0|[1.81851037947213...|       0.0|[0.87974695183188...|
|electronics|samsung| 6.67|  0.0|[1.81851037947213...|       0.0|[0.87974695183188...|
|electronics|samsung| 6.67|  0.0|[1.81851037947213...|       0.0|[0.87974695183188...|
|electronics|samsung| 6.67|  0.0|[1.8185103

In [9]:
# Step 3: Use decision tree to perform classification on event type (view, cart, purchase).

start = time.time()

from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train)
predictions_dt = dtModel.transform(test)
predictions_dt.select('category', 'brand', 'price', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

evaluator_dt = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator_dt.evaluate(predictions_dt, {evaluator_dt.metricName: "areaUnderROC"})))

print(time.time() - start)

+-----------+-------+-----+-----+--------------------+----------+--------------------+
|   category|  brand|price|label|       rawPrediction|prediction|         probability|
+-----------+-------+-----+-----+--------------------+----------+--------------------+
|electronics|samsung| 5.64|  0.0|[5823074.0,186861...|       0.0|[0.94881081607773...|
|electronics|samsung| 5.64|  0.0|[5823074.0,186861...|       0.0|[0.94881081607773...|
|electronics|samsung| 6.16|  0.0|[5823074.0,186861...|       0.0|[0.94881081607773...|
|electronics|samsung| 6.22|  0.0|[5823074.0,186861...|       0.0|[0.94881081607773...|
|electronics|samsung| 6.67|  0.0|[5823074.0,186861...|       0.0|[0.94881081607773...|
|electronics|samsung| 6.67|  0.0|[5823074.0,186861...|       0.0|[0.94881081607773...|
|electronics|samsung| 6.67|  0.0|[5823074.0,186861...|       0.0|[0.94881081607773...|
|electronics|samsung| 6.67|  0.0|[5823074.0,186861...|       0.0|[0.94881081607773...|
|electronics|samsung| 6.67|  0.0|[5823074.0

In [10]:
# Step 3: Use random forest to perform classification on event type (view, cart, purchase). 

start = time.time()

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train)
predictions_rf = rfModel.transform(test)
predictions_rf.select('category', 'brand', 'price', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

evaluator_rf = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator_rf.evaluate(predictions, {evaluator_rf.metricName: "areaUnderROC"})))

print(time.time() - start)

+-----------+-------+-----+-----+--------------------+----------+--------------------+
|   category|  brand|price|label|       rawPrediction|prediction|         probability|
+-----------+-------+-----+-----+--------------------+----------+--------------------+
|electronics|samsung| 5.64|  0.0|[18.9758111525724...|       0.0|[0.94879055762862...|
|electronics|samsung| 5.64|  0.0|[18.9758111525724...|       0.0|[0.94879055762862...|
|electronics|samsung| 6.16|  0.0|[18.9758111525724...|       0.0|[0.94879055762862...|
|electronics|samsung| 6.22|  0.0|[18.9758111525724...|       0.0|[0.94879055762862...|
|electronics|samsung| 6.67|  0.0|[18.9758111525724...|       0.0|[0.94879055762862...|
|electronics|samsung| 6.67|  0.0|[18.9758111525724...|       0.0|[0.94879055762862...|
|electronics|samsung| 6.67|  0.0|[18.9758111525724...|       0.0|[0.94879055762862...|
|electronics|samsung| 6.67|  0.0|[18.9758111525724...|       0.0|[0.94879055762862...|
|electronics|samsung| 6.67|  0.0|[18.975811