## Introduction

This notebook will show how to use the PySpark on a Sagemaker notebook. 

We will manipulate data through Spark using a SparkSession and upload the resulting dataframe to S3.


You can visit SageMaker Spark's GitHub repository at https://github.com/aws/sagemaker-spark to learn more about SageMaker Spark.

This notebook was created and tested on an ml.m4.xlarge notebook instance.


In [118]:
import os
import boto3
import matplotlib.pyplot as plt
import numpy as np

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import PCA

import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark
from sagemaker_pyspark import *
from sagemaker_pyspark.algorithms import KMeansSageMakerEstimator, PCASageMakerEstimator
from sagemaker_pyspark.transformation.serializers import ProtobufRequestRowSerializer
from sagemaker_pyspark.transformation.deserializers import KMeansProtobufResponseRowDeserializer



We can now create the SparkSession with the SageMaker-Spark dependencies attached.


In [119]:
role = get_execution_role()

# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

# See the SageMaker Spark Github to learn how to connect to EMR from a notebook instance
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath)\
    .master("local[*]").getOrCreate()
    
spark


## Loading the Data

Now, we load the MovieLens dataset into a Spark Dataframe.

Here, we load into a DataFrame in the SparkSession running on the local Notebook Instance, but you can connect your Notebook Instance to a remote Spark cluster for heavier workloads. Starting from EMR 5.11.0, SageMaker Spark is pre-installed on EMR Spark clusters. For more on connecting your SageMaker Notebook Instance to a remote EMR cluster, please see this blog post.


In [129]:
S3_BUCKET = "pedro-spark-sagemaker"
S3_TARGET_PREFIX = "/raw/train.csv"
S3_LOCATION = "s3a://"+S3_BUCKET+S3_TARGET_PREFIX 
print(S3_LOCATION)

s3a://pedro-spark-sagemaker/raw/train.csv


In [122]:
import numpy as np
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType, IntegerType, FloatType
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf, lit
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from time import mktime, strptime

In [123]:
!wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -o ml-100k.zip

--2019-05-30 16:53:33--  http://files.grouplens.org/datasets/movielens/ml-100k.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4924029 (4.7M) [application/zip]
Saving to: ‘ml-100k.zip.1’


2019-05-30 16:53:35 (2.87 MB/s) - ‘ml-100k.zip.1’ saved [4924029/4924029]

Archive:  ml-100k.zip
  inflating: ml-100k/allbut.pl       
  inflating: ml-100k/mku.sh          
  inflating: ml-100k/README          
  inflating: ml-100k/u.data          
  inflating: ml-100k/u.genre         
  inflating: ml-100k/u.info          
  inflating: ml-100k/u.item          
  inflating: ml-100k/u.occupation    
  inflating: ml-100k/u.user          
  inflating: ml-100k/u1.base         
  inflating: ml-100k/u1.test         
  inflating: ml-100k/u2.base         
  inflating: ml-100k/u2.test         
  inflating: ml-100k/u3.base         
  inflating:

In [124]:
!aws s3 cp './ml-100k/ua.base' 's3://pedro-spark-sagemaker/raw/train.csv'

upload: ml-100k/ua.base to s3://pedro-spark-sagemaker/raw/train.csv


In [125]:
!aws s3 cp './ml-100k/ua.test' 's3://pedro-spark-sagemaker/raw/test.csv'

Completed 182.3 KiB/182.3 KiB (1.6 MiB/s) with 1 file(s) remainingupload: ml-100k/ua.test to s3://pedro-spark-sagemaker/raw/test.csv


In [126]:
!aws s3 ls s3://pedro-spark-sagemaker/raw/

                           PRE train/
2019-05-30 06:11:21   48039726 RetailData.csv
2019-05-28 01:40:00   23715344 ecommerce-data.csv
2019-05-30 16:53:44     186672 test.csv
2019-05-30 16:53:43    1792501 train.csv


Load the movielens data set to a Spark DF

In [132]:
invoiceSchema =  StructType([StructField('USER_ID', StringType(), False),
                    StructField('ITEM_ID', StringType(), False),
                    StructField('RATING', StringType(), True),
                    StructField('TIMESTAMP', StringType())])

invoicesDf=spark.read.schema(invoiceSchema).option("delimiter", "\t").options(header=True).csv(S3_LOCATION)
invoicesDf.show(3)


+-------+-------+------+---------+
|USER_ID|ITEM_ID|RATING|TIMESTAMP|
+-------+-------+------+---------+
|      1|      2|     3|876893171|
|      1|      3|     4|878542960|
|      1|      4|     3|876893119|
+-------+-------+------+---------+
only showing top 3 rows



### We'll do some data preprocessing using Spark SQL

In [133]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)

In [135]:
invoicesDf.registerTempTable("invoices")
invoicesCleanDf = sqlContext.sql(
    "select * from invoices where RATING is null"
)
invoicesCleanDf.count()

0

Create a new column binarizing the ratings ( >=4 is 1 else 0). We will use both rating systems for creating our models.

In [136]:

invoicesCleanDf = sqlContext.sql(
    "select *, \
    CASE \
        WHEN RATING >3 THEN 1 \
        ELSE 0 \
    END AS RATING_B \
    from invoices \
    where USER_ID is not null and ITEM_ID is not null and RATING > 0"
)
invoicesCleanDf.count()

90569

In [137]:
invoicesCleanDf.show(5)

+-------+-------+------+---------+--------+
|USER_ID|ITEM_ID|RATING|TIMESTAMP|RATING_B|
+-------+-------+------+---------+--------+
|      1|      2|     3|876893171|       0|
|      1|      3|     4|878542960|       1|
|      1|      4|     3|876893119|       0|
|      1|      5|     3|889751712|       0|
|      1|      6|     5|887431973|       1|
+-------+-------+------+---------+--------+
only showing top 5 rows



In [138]:
timeSeriesDF=invoicesCleanDf.withColumn("USER_ID", invoicesCleanDf["USER_ID"].cast(IntegerType())).withColumn("ITEM_ID", invoicesCleanDf["ITEM_ID"].cast(IntegerType())).withColumn("RATING", invoicesCleanDf["RATING"].cast(IntegerType())).withColumn("RATING_B", invoicesCleanDf["RATING_B"].cast(IntegerType()))

### Upload the resulting dataframe to S3 for use for our training job.

In [139]:
S3_TARGET_PREFIX = "/raw/train"
S3_LOCATION = "s3a://"+S3_BUCKET+S3_TARGET_PREFIX

print(S3_LOCATION)

s3a://pedro-spark-sagemaker/raw/train


In [141]:
timeSeriesDF.coalesce(1).write.csv(S3_LOCATION)

### Process the test data set as well (movielens data already has train/test split).

In [150]:
testDf=spark.read.schema(invoiceSchema).option("delimiter", "\t").options(header=True).csv('s3a://pedro-spark-sagemaker/raw/test.csv')
testDf=testDf.withColumn("USER_ID", testDf["USER_ID"].cast(IntegerType())).withColumn("ITEM_ID", testDf["ITEM_ID"].cast(IntegerType())).withColumn("RATING", testDf["RATING"].cast(IntegerType()))
testDf.show(3)

+-------+-------+------+---------+
|USER_ID|ITEM_ID|RATING|TIMESTAMP|
+-------+-------+------+---------+
|      1|     33|     4|878542699|
|      1|     61|     4|878542420|
|      1|    117|     3|874965739|
+-------+-------+------+---------+
only showing top 3 rows



In [155]:
testDf.registerTempTable("test")
testfinalDf = sqlContext.sql(
    "select *, \
    CASE \
        WHEN RATING >3 THEN 1 \
        ELSE 0 \
    END AS RATING_B \
    from test \
    where USER_ID is not null and ITEM_ID is not null and RATING > 0"
)
testfinalDf.count()

9429

In [156]:
testfinalDf.show(4)

+-------+-------+------+---------+--------+
|USER_ID|ITEM_ID|RATING|TIMESTAMP|RATING_B|
+-------+-------+------+---------+--------+
|      1|     33|     4|878542699|       1|
|      1|     61|     4|878542420|       1|
|      1|    117|     3|874965739|       0|
|      1|    155|     2|878542201|       0|
+-------+-------+------+---------+--------+
only showing top 4 rows



### Upload test dataframe to S3.

In [158]:
testfinalDf.coalesce(1).write.csv('s3a://pedro-spark-sagemaker/raw/test')

### Create an ALS collaborative filtering model using MLlib from Spark.

In [142]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [65]:
als = ALS(maxIter=5, regParam=0.01, userCol="USER_ID", itemCol="ITEM_ID", ratingCol="RATING",
          coldStartStrategy="drop")
model = als.fit(timeSeriesDF)


In [151]:
predictions = model.transform(testDf)

In [152]:
predictions.show(10)

+-------+-------+------+---------+----------+
|USER_ID|ITEM_ID|RATING|TIMESTAMP|prediction|
+-------+-------+------+---------+----------+
|    251|    148|     2|886272547| 3.1896665|
|    580|    148|     4|884125773| 3.6820707|
|    602|    148|     4|888638517| 4.4973254|
|    372|    148|     5|876869915| 3.3083372|
|    274|    148|     2|878946133| 2.9454553|
|    923|    148|     4|880387474| 3.0570798|
|    447|    148|     4|878854729| 2.9873352|
|    586|    148|     3|884065745|  3.355267|
|    761|    148|     5|876189829| 3.6957355|
|    677|    148|     4|889399265| 2.7402072|
+-------+-------+------+---------+----------+
only showing top 10 rows



In [153]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="RATING",
                                predictionCol="prediction")

In [154]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.235444431262701
