## Setup Notebook Instance Backed by Spark on EMR using Apache Livy 
- Before you begin, Follow the steps here to create an EMR cluster: https://github.com/ruchikaabbi/sagemaker-spark
- Follow the steps here to connect SageMaker Notebook to Spark Livy EMR Cluster: https://aws.amazon.com/blogs/machine-learning/build-amazon-sagemaker-notebooks-backed-by-spark-in-amazon-emr/. The instructions cover the following:
    - Make sure this SageMaker Notebook was launched with a security group and you have added access on port 8998 of EMR Cluster Master node to Sagemaker Notebook Security Group.
    - Get the Private IP of EMR (with Livy) Master Node
    - Open Terminal and make sure you can conenct to EMR using cmd: curl <EMR Master Private IP>:8998/sessions
    - Modify ~/.sparkmagic/config.json from the terminal using steps in the blogpost. 
    - Come back to this notebook and restart Kernel
- Change your s3_model_bucket and spark_model_location 
- Use US-West-2 (Oregon Region for the lab)

In [1]:
## Do not update source input file path ##
input_file_path = "s3://ruchika-wibd-west-2/mleap/ml/data/data.csv"
## Update your bucket name and model location path ##
s3_model_bucket='ruchika-wibd-west-2'
spark_model_location='s3://'+s3_model_bucket+'/mleap/models/car_price_prediction_model/'
pipeline_name='pipeline1'
model_name='model'

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,,pyspark,idle,,,✔


SparkSession available as 'spark'.


In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor

data = spark.read.csv(path=input_file_path, header=True, quote='"', sep=",", inferSchema=True)

In [3]:
data.printSchema()

root
 |-- Price: double (nullable = true)
 |-- Mileage: integer (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Trim: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Cylinder: integer (nullable = true)
 |-- Liter: double (nullable = true)
 |-- Doors: integer (nullable = true)
 |-- Cruise: integer (nullable = true)
 |-- Sound: integer (nullable = true)
 |-- Leather: integer (nullable = true)

In [4]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
from pyspark.ml.feature import StringIndexer, VectorAssembler

def get_indexer_input(data):
    str_cols_value = {}
    for c, t in data[data.columns].dtypes:
        if t == 'string':
            str_cols_value[c] = StringIndexer(inputCol=c, outputCol='indexed_' + c).fit(data)
    return str_cols_value

In [5]:
data_test, data_train = data.randomSplit(weights=[0.3, 0.7], seed=10)

get_indexer_input = get_indexer_input(data)
print (get_indexer_input)

{'Trim': StringIndexer_4141a447f7f316da2b43, 'Make': StringIndexer_4838a136aa1639fbed00, 'Type': StringIndexer_45df90a97b35bfb67189, 'Model': StringIndexer_4ae79950dc676731930c}

In [6]:
def model_training(data_train, indexer_input):
    x_cols = list(set(data_train.columns) - set(indexer_input.keys() + ["Price"]))
    str_ind_cols = ['indexed_' + column for column in indexer_input.keys()]
    indexers = indexer_input.values()
    pipeline_tr = Pipeline(stages=indexers)
    data_tr = pipeline_tr.fit(data_train).transform(data_train)
    assembler = VectorAssembler(inputCols=x_cols, outputCol="features")
    gbt = GBTRegressor(featuresCol="features", labelCol="Price", stepSize=0.008, maxDepth=5, subsamplingRate=0.75,
                       seed=10, maxIter=20, minInstancesPerNode=5, checkpointInterval=100, maxBins=64)
    pipeline_training = Pipeline(stages=[assembler, gbt])
    model = pipeline_training.fit(data_tr)
    return model


def model_testing(model, data_test, indexer_input):
    indexers = indexer_input.values()
    pipeline_te = Pipeline(stages=indexers)
    data_te = pipeline_te.fit(data_test).transform(data_test)
    predictions = model.transform(data_te)
    predictions.select("prediction").show(10,False)

In [7]:
model = model_training(data_train, get_indexer_input)
model.write().overwrite().save(spark_model_location)

In [8]:
# Load the saved model and Test the model.
from pyspark.ml import PipelineModel
from pyspark.ml import Pipeline
import json
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler

def get_indexer_input(data):
    str_cols_value = {}
    for c, t in data[data.columns].dtypes:
        if t == 'string':
            str_cols_value[c] = StringIndexer(inputCol=c, outputCol='indexed_' + c).fit(data)
    return str_cols_value

def model_testing(model, data_test, indexer_input):
    indexers = indexer_input.values()
    pipeline_te = Pipeline(stages=indexers)
    data_te = pipeline_te.fit(data_test).transform(data_test)
    data_te.show(1,False)
    predictions = model.transform(data_test)
    predictions.select("prediction").show(10,False)

sameModel = PipelineModel.load(path="s3://ruchika-wibd-west-2/mleap/models/car_price_prediction_model/")

j={"Price":9041.9062544231,"Mileage":26191,"Make":"Chevrolet","Model":"AVEO","Trim":"SVM Sedan 4D","Type":"Sedan","Cylinder":4,"Liter"
:1.6,"Doors":4,"Cruise":0,"Sound":0,"Leather":1}

a=[json.dumps(j)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)

get_indexer_input = get_indexer_input(df)
model_testing(sameModel, df, get_indexer_input)

+------+--------+-----+-------+-----+---------+-------+-----+---------------+-----+------------+-----+------------+------------+------------+-------------+
|Cruise|Cylinder|Doors|Leather|Liter|Make     |Mileage|Model|Price          |Sound|Trim        |Type |indexed_Trim|indexed_Make|indexed_Type|indexed_Model|
+------+--------+-----+-------+-----+---------+-------+-----+---------------+-----+------------+-----+------------+------------+------------+-------------+
|0     |4       |4    |1      |1.6  |Chevrolet|26191  |AVEO |9041.9062544231|0    |SVM Sedan 4D|Sedan|0.0         |0.0         |0.0         |0.0          |
+------+--------+-----+-------+-----+---------+-------+-----+---------------+-----+------------+-----+------------+------------+------------+-------------+

+------------------+
|prediction        |
+------------------+
|10236.175823272792|
+------------------+

In [9]:
row_df=data_test.limit(1)
row_df.show()

+---------------+-------+---------+-----+------------+-----+--------+-----+-----+------+-----+-------+
|          Price|Mileage|     Make|Model|        Trim| Type|Cylinder|Liter|Doors|Cruise|Sound|Leather|
+---------------+-------+---------+-----+------------+-----+--------+-----+-----+------+-----+-------+
|9041.9062544231|  26191|Chevrolet| AVEO|SVM Sedan 4D|Sedan|       4|  1.6|    4|     0|    0|      1|
+---------------+-------+---------+-----+------------+-----+--------+-----+-----+------+-----+-------+

In [12]:
#Serialize to MLeap Bundle
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer
model.serializeToBundle("jar:file:/tmp/"+model_name+".zip", model.transform(row_df))

An error occurred while calling o503.serializeToBundle.
: java.nio.file.NoSuchFileException: /tmp2/model.zip
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
	at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
	at java.nio.file.Files.newOutputStream(Files.java:216)
	at com.sun.nio.zipfs.ZipFileSystem.<init>(ZipFileSystem.java:116)
	at com.sun.nio.zipfs.ZipFileSystemProvider.newFileSystem(ZipFileSystemProvider.java:117)
	at java.nio.file.FileSystems.newFileSystem(FileSystems.java:326)
	at java.nio.file.FileSystems.newFileSystem(FileSystems.java:276)
	at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:43)
	at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:23)
	at ml.combust.mleap.spark.Si

In [13]:
#Save the Bundle to S3
import boto3

s3 = boto3.resource('s3')
data = open("/tmp/"+model_name+".zip", 'rb')
s3.Bucket(s3_model_bucket).put_object(Key='mleap/models/'+pipeline_name+'/'+model_name+'.zip', Body=data)

s3.Object(bucket_name='ruchika-wibd-west-2', key='mleap/models/pipeline1/model.zip')

In [14]:
%%python

## CHANGE ME ##
s3_model_bucket='ruchika-wibd-west-2'
spark_model_location='s3://'+s3_model_bucket+'/mleap/models/car_price_prediction_model/'
pipeline_name='pipeline1'
model_name='model'
home='/home/ec2-user/models/'

## Tar Zip the model and save back to S3
import boto3,os

## Check if local directory exists otherwise create it
os.makedirs(home, exist_ok = True)

s3 = boto3.resource('s3')
s3.Bucket(s3_model_bucket).download_file('mleap/models/'+pipeline_name+'/'+model_name+'.zip', home+model_name+'.zip')
cmd='cd '+home+' ; tar -czvf '+model_name+'.tgz '+model_name+'.zip '
print (cmd)
print (os.system(cmd))
data = open(home+model_name+'.tgz', 'rb')
s3.Bucket(s3_model_bucket).put_object(Key='mleap/models/'+pipeline_name+'/'+model_name+'.tgz', Body=data)

model.zip
cd /home/ec2-user/models/ ; tar -czvf model.tgz model.zip 
0
