In [1]:
%%sh
echo '{"kernel_python_credentials" : {"url": "http://<EMR master node private IP>:8998/"}, "session_configs": 
{"executorMemory": "2g","executorCores": 2,"numExecutors":4}}' > ~/.sparkmagic/config.json
less ~/.sparkmagic/config.json



In [3]:
## CHANGE ME ##
input_file_path = "s3://<bucket>/ml/data/data.csv"
s3_model_bucket='<bucket>'
spark_model_location='s3://'+s3_model_bucket+'/models/car_price_prediction_model/'
pipeline_name='pipeline1'
model_name='model'

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor

data = spark.read.csv(path=input_file_path, header=True, quote='"', sep=",", inferSchema=True)

In [5]:
data.printSchema()

root
 |-- Price: double (nullable = true)
 |-- Mileage: integer (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Trim: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Cylinder: integer (nullable = true)
 |-- Liter: double (nullable = true)
 |-- Doors: integer (nullable = true)
 |-- Cruise: integer (nullable = true)
 |-- Sound: integer (nullable = true)
 |-- Leather: integer (nullable = true)

In [7]:
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
from pyspark.ml.feature import StringIndexer, VectorAssembler

def get_indexer_input(data):
    str_cols_value = {}
    for c, t in data[data.columns].dtypes:
        if t == 'string':
            str_cols_value[c] = StringIndexer(inputCol=c, outputCol='indexed_' + c).fit(data)
    return str_cols_value

In [8]:
data_test, data_train = data.randomSplit(weights=[0.3, 0.7], seed=10)

get_indexer_input = get_indexer_input(data)
print (get_indexer_input)

{'Trim': StringIndexer_472bacc9a3f556535587, 'Make': StringIndexer_475394bfff5acb0d2fb4, 'Type': StringIndexer_426ca2ebe79e94d86d3d, 'Model': StringIndexer_4f31b5e8d4519f5d6dce}

In [9]:
def model_training(data_train, indexer_input):
    x_cols = list(set(data_train.columns) - set(indexer_input.keys() + ["Price"]))
    str_ind_cols = ['indexed_' + column for column in indexer_input.keys()]
    indexers = indexer_input.values()
    pipeline_tr = Pipeline(stages=indexers)
    data_tr = pipeline_tr.fit(data_train).transform(data_train)
    assembler = VectorAssembler(inputCols=x_cols, outputCol="features")
    gbt = GBTRegressor(featuresCol="features", labelCol="Price", stepSize=0.008, maxDepth=5, subsamplingRate=0.75,
                       seed=10, maxIter=20, minInstancesPerNode=5, checkpointInterval=100, maxBins=64)
    pipeline_training = Pipeline(stages=[assembler, gbt])
    model = pipeline_training.fit(data_tr)
    return model


def model_testing(model, data_test, indexer_input):
    indexers = indexer_input.values()
    pipeline_te = Pipeline(stages=indexers)
    data_te = pipeline_te.fit(data_test).transform(data_test)
    predictions = model.transform(data_te)
    predictions.select("prediction").show(10,False)

In [14]:
model = model_training(data_train, get_indexer_input)
model.write().overwrite().save(spark_model_location)

In [None]:
# Load the saved model and Test the model.
from pyspark.ml import PipelineModel
from pyspark.ml import Pipeline
import json
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler

def get_indexer_input(data):
    str_cols_value = {}
    for c, t in data[data.columns].dtypes:
        if t == 'string':
            str_cols_value[c] = StringIndexer(inputCol=c, outputCol='indexed_' + c).fit(data)
    return str_cols_value

def model_testing(model, data_test, indexer_input):
    indexers = indexer_input.values()
    pipeline_te = Pipeline(stages=indexers)
    data_te = pipeline_te.fit(data_test).transform(data_test)
    data_te.show(1,False)
    predictions = model.transform(data_test)
    predictions.select("prediction").show(10,False)

sameModel = PipelineModel.load(path="s3://neilawsml/models/CarPrices/")

j={"Price":9041.9062544231,"Mileage":26191,"Make":"Chevrolet","Model":"AVEO","Trim":"SVM Sedan 4D","Type":"Sedan","Cylinder":4,"Liter"
:1.6,"Doors":4,"Cruise":0,"Sound":0,"Leather":1}

a=[json.dumps(j)]
jsonRDD = sc.parallelize(a)
df = spark.read.json(jsonRDD)

get_indexer_input = get_indexer_input(df)
model_testing(sameModel, df, get_indexer_input)

In [15]:
row_df=data_test.limit(1)
row_df.show()

+---------------+-------+---------+-----+------------+-----+--------+-----+-----+------+-----+-------+
|          Price|Mileage|     Make|Model|        Trim| Type|Cylinder|Liter|Doors|Cruise|Sound|Leather|
+---------------+-------+---------+-----+------------+-----+--------+-----+-----+------+-----+-------+
|9041.9062544231|  26191|Chevrolet| AVEO|SVM Sedan 4D|Sedan|       4|  1.6|    4|     0|    0|      1|
+---------------+-------+---------+-----+------------+-----+--------+-----+-----+------+-----+-------+

In [17]:
#Serialize to MLeap Bundle
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer
model.serializeToBundle("jar:file:/tmp/"+model_name+".zip", model.transform(row_df))

In [18]:
#Save the Bundle to S3
import boto3

s3 = boto3.resource('s3')
data = open("/tmp/"+model_name+".zip", 'rb')
s3.Bucket(s3_model_bucket).put_object(Key='models/'+pipeline_name+'/'+model_name+'.zip', Body=data)

s3.Object(bucket_name='neilawsml1', key='models/pipeline1/model.zip')

In [26]:
%%python

## CHANGE ME ##
s3_model_bucket='<bucket>'
spark_model_location='s3://'+s3_model_bucket+'/models/car_price_prediction_model/'
pipeline_name='pipeline1'
model_name='model'
home='/home/ec2-user/models/'

## Tar Zip the model and save back to S3
import boto3,os

s3 = boto3.resource('s3')
s3.Bucket(s3_model_bucket).download_file('models/'+pipeline_name+'/'+model_name+'.zip', home+model_name+'.zip')
cmd='cd '+home+' ; tar -czvf '+model_name+'.tgz '+model_name+'.zip '
print (cmd)
print (os.system(cmd))
data = open(home+model_name+'.tgz', 'rb')
s3.Bucket(s3_model_bucket).put_object(Key='models/'+pipeline_name+'/'+model_name+'.tgz', Body=data)

model.zip
cd /home/ec2-user/models/ ; tar -czvf model.tgz model.zip 
0
