In [1]:
# download the sigir17 data from https://sites.google.com/site/limkwanhui/datacode

In [2]:
from os.path import expanduser

SRC_PATH = expanduser("~") + '/SageMaker/mastering-ml-on-aws/chapter6/'


In [3]:
from pyspark.context import SparkContext

sc = SparkContext('local', 'test')


In [4]:
from pyspark.sql import SQLContext

spark = SQLContext(sc)

In [86]:
poi_df = spark.read.csv(SRC_PATH + 'data-sigir17/poiList-sigir17', header=True, inferSchema=True, sep=';')

In [88]:
poi_df.limit(4).toPandas()

Unnamed: 0,poiID,poiName,lat,long,rideDuration,theme,theme2,theme3,theme4
0,1,Gadget's Go Coaster,33.810259,-117.918438,1.0,Kiddie,Roller Coaster,,
1,2,Astro Orbitor,28.418532,-81.579153,1.5,Spinning Ride,,,
2,3,Mad Tea Party,33.813458,-117.918289,1.5,Family,Spinning Ride,,
3,4,Dumbo the Flying Elephant,33.81368,-117.918928,1.67,Family,Spinning Ride,,


In [5]:
visits_df = spark.read.csv(SRC_PATH + 'data-sigir17/userVisits-sigir17', header=True, inferSchema=True, sep=';')


In [7]:
sample_df = visits_df.limit(1000).toPandas()

In [8]:
sample_df.head()

Unnamed: 0,id,nsid,takenUnix,poiID,poiTheme,poiFreq,rideDuration,seqID
0,5858403310,10004778@N07,1308262550,6,Ride,1665,120.0,1
1,5857850631,10004778@N07,1308270702,26,Family,18710,900.0,1
2,5858399220,10004778@N07,1308631356,6,Ride,1665,120.0,2
3,8277294024,10004778@N07,1355568624,26,Family,18710,900.0,3
4,9219062165,10004778@N07,1373030964,29,Water,10427,900.0,4


In [6]:
visits_df.describe().toPandas()

Unnamed: 0,summary,id,nsid,takenUnix,poiID,poiTheme,poiFreq,rideDuration,seqID
0,count,332091.0,332091,332091.0,332091.0,332091,332091.0,332091.0,332091.0
1,mean,8916292302.139416,,1323382407.5555675,15.975127299445033,,6181.338365086678,740.7857015095311,4288.19415762547
2,stddev,6226917245.549271,,74244858.13151878,8.695388902420351,,5199.41535123871,488.5329445328169,3093.323953206581
3,min,102530213.0,10000151@N02,1187918299.0,1.0,Dark,162.0,60.0,1.0
4,max,29475731115.0,99987318@N03,1471870895.0,31.0,Water,18710.0,2700.0,11758.0


In [9]:
sample_df.describe()

Unnamed: 0,id,takenUnix,poiID,poiFreq,rideDuration,seqID
count,1000.0,1000.0,1000.0,1000.0,1000.0,1000.0
mean,8448781000.0,1328379000.0,20.785,7764.625,625.2606,50.349
std,4805738000.0,63463680.0,8.138243,6233.964628,324.947216,20.027838
min,1643421000.0,1191397000.0,1.0,580.0,60.0,1.0
25%,6075781000.0,1308877000.0,15.0,2757.0,270.0,33.0
50%,6222417000.0,1310770000.0,23.0,4082.0,600.0,56.0
75%,11517500000.0,1376198000.0,28.0,16366.0,900.0,69.0
max,27776010000.0,1466605000.0,31.0,18710.0,1500.0,73.0


In [10]:
sample_df.nsid.describe()

count             1000
unique              36
top       10182842@N08
freq               365
Name: nsid, dtype: object

In [11]:
visits_df.createOrReplaceTempView('visits')

In [89]:
poi_df.createOrReplaceTempView('points')

In [12]:
spark.sql('select distinct poiID from visits').count()

31

In [13]:
spark.sql('select nsid,count(distinct poiID) as cnt from visits group by nsid').describe().show()

+-------+------------+-----------------+
|summary|        nsid|              cnt|
+-------+------------+-----------------+
|  count|        8903|             8903|
|   mean|        null| 4.86027181848815|
| stddev|        null|5.965584836576787|
|    min|10000151@N02|                1|
|    max|99987318@N03|               31|
+-------+------------+-----------------+



In [14]:
spark.sql('select nsid,poiID,count(*) from visits group by nsid,poiID').describe().show()

+-------+------------+------------------+-----------------+
|summary|        nsid|             poiID|         count(1)|
+-------+------------+------------------+-----------------+
|  count|       43271|             43271|            43271|
|   mean|        null|14.920061935245315|7.674678190936193|
| stddev|        null| 8.437883931275111|52.93100615991835|
|    min|10000151@N02|                 1|                1|
|    max|99987318@N03|                31|             4128|
+-------+------------+------------------+-----------------+



In [108]:
train_df = spark.sql('select hash(nsid) as user_hash_id,hash(poiID) as poi_hash_id, poiID, count(*) as pictures_taken from visits group by 1,2, 3')

In [109]:
train_df.count()

43271

In [110]:
from pyspark.ml.recommendation import ALS

recommender = ALS(userCol="user_hash_id", itemCol="poi_hash_id", ratingCol="pictures_taken", coldStartStrategy="drop")

model = recommender.fit(train_df)

In [111]:
recommendations = model.recommendForAllUsers(10)

In [137]:
recommendations.limit(3).toPandas()

Unnamed: 0,user_hash_id,recommendations
0,413285690,"[(1249949532, 40.04582595825195), (-20661408, ..."
1,1005782960,"[(-359179259, 5.537337303161621), (1249949532,..."
2,1410121870,"[(1249949532, 11.890027046203613), (-140121007..."


In [112]:
joined_recommenadations = recommendations.join(train_df, 'user_hash_id').join(poi_df, "poiID")

In [113]:
joined_recommenadations.createOrReplaceTempView('recommendations')

In [114]:
joined_recommenadations.limit(5).toPandas()

Unnamed: 0,poiID,user_hash_id,recommendations,poi_hash_id,pictures_taken,poiName,lat,long,rideDuration,theme,theme2,theme3,theme4
0,12,413285690,"[(1249949532, 40.04582595825195), (-20661408, ...",-319098976,6,Disney Junior - Live on Stage!,28.357814,-81.560655,25.0,Family,Show,,Indoor
1,12,413285690,"[(1249949532, 40.04582595825195), (-20661408, ...",-319098976,6,Turtle Talk with Crush,28.375429,-81.551228,17.0,Family,Show,,Indoor
2,12,413285690,"[(1249949532, 40.04582595825195), (-20661408, ...",-319098976,6,Soarin' Around the World,33.808353,-117.920017,4.5,Ride,Family,3D Ride,Indoor
3,12,413285690,"[(1249949532, 40.04582595825195), (-20661408, ...",-319098976,6,It's A Small World,28.420815,-81.581992,14.0,Water,Ride,Dark,Indoor
4,12,413285690,"[(1249949532, 40.04582595825195), (-20661408, ...",-319098976,6,Matterhorn Bobsleds,33.813041,-117.917815,4.0,Roller Coaster,,,


In [115]:
row_list = spark.sql('select distinct p.poiName, hash(v.poiID) as h from visits v join points p on (p.poiID=v.poiID) ').collect()

In [116]:
hash_to_poi_mapping =  dict(map(lambda x: (x.h, x.poiName), row_list))

In [117]:
hash_to_poi_mapping

{1023896466: 'Gran Fiesta Tour Starring The Three Caballeros',
 -319098976: 'Matterhorn Bobsleds',
 -1057351352: 'Haunted Mansion',
 -1823081949: 'Star Tours: The Adventures Continue',
 -331964951: "Snow White's Scary Adventures",
 1546539305: 'Redwood Creek Challenge Trail',
 -1223696181: "Frontierland Shootin' Arcade",
 1796998381: 'Mark Twain Riverboat',
 1765031574: 'The Seas with Nemo & Friends',
 -359179259: 'Pirates of the Caribbean',
 -1041903523: 'British Revolution',
 -132918897: 'The Barnstormer',
 94926449: "Ellen's Energy Adventure",
 -1355542311: 'The Hall of Presidents',
 -397064898: 'Dumbo the Flying Elephant',
 944065163: 'Casey Jr. Circus Train',
 -20661408: "Mickey's Fun Wheel",
 -1401210078: 'O Canada!',
 335551368: 'Radiator Springs Racers',
 -559580957: 'Test Track',
 -441499547: "It's Tough to Be a Bug!",
 1079293707: 'King Arthur Carrousel',
 -768484170: 'Country Bear Jamboree',
 69695535: 'Disney Junior - Live on Stage!',
 972445202: 'Fantasmic!',
 -1731921111:

In [141]:
def poi_names(recommendations, visited_pois):
  return str([(hash_to_poi_mapping[poi], weight) for (poi,weight) in recommendations if hash_to_poi_mapping[poi] not in visited_pois])

spark.udf.register("poi_names", poi_names)

<function __main__.poi_names(recommendations, visited_pois)>

In [142]:
recommendation_sample = spark.sql('select user_hash_id, collect_list(poiName), poi_names(max(recommendations), collect_list(poiName)) from recommendations group by 1').sample(0.1).collect()

In [143]:
recommendation_sample[3]

Row(user_hash_id=753179416, collect_list(poiName)=["Walt Disney: One Man's Dream", 'Spaceship Earth', "California Screamin'", "Pete's Silly Sideshow", 'Casey Jr. Circus Train', "Ellen's Energy Adventure", "The Little Mermaid ~ Ariel's Undersea Adventure", "Mickey's PhilharMagic", 'Buzz Lightyear Astro Blasters'], poi_names(max(recommendations), collect_list(poiName, 0, 0))='[(\'Pirates of the Caribbean\', 1.7577803134918213), (\'Main Street Cinema\', 1.6354202032089233), (\'Haunted Mansion\', 1.5228301286697388), ("Mickey\'s Fun Wheel", 1.4988539218902588), ("Tuck and Roll\'s Drive \'Em Buggies", 1.3196501731872559), ("It\'s A Small World", 1.0924171209335327), (\'Disney Junior - Live on Stage!\', 1.0835790634155273), ("Frontierland Shootin\' Arcade", 0.9953671097755432)]')

In [145]:
recommendation_sample[100]

Row(user_hash_id=-96995627, collect_list(poiName)=['Indiana Jones Epic Stunt Spectacular!', 'Impressions de France', 'Monsters, Inc. Mike & Sulley to the Rescue!', 'Haunted Mansion', "Roger Rabbit's Car Toon Spin", 'British Revolution', 'The Twilight Zone Tower of Terror', "Peter Pan's Flight", 'Autopia', 'The Great Movie Ride', 'Reflections of China', "Heimlich's Chew Chew Train", 'The Barnstormer', "Peter Pan's Flight", 'Turtle Talk with Crush', 'Country Bear Jamboree', "Davy Crockett's Explorer Canoes", 'Rose & Crown Pub Musician', 'Grizzly River Run', 'The Hall of Presidents', 'Hyperspace Mountain'], poi_names(max(recommendations), collect_list(poiName, 0, 0))='[(\'Fantasmic!\', 2.7037513256073), ("Ellen\'s Energy Adventure", 2.526970863342285), (\'Main Street Cinema\', 1.4273393154144287), ("Frontierland Shootin\' Arcade", 1.2841346263885498), (\'Pirates of the Caribbean\', 1.2152353525161743), (\'Disney Junior - Live on Stage!\', 1.1271891593933105), ("It\'s A Small World", 1.051

In [146]:
recommendation_sample[300]

Row(user_hash_id=209960903, collect_list(poiName)=['Fantasmic!', 'Mark Twain Riverboat', "Rock 'n' Roller Coaster", 'Test Track', "Jumpin' Jellyfish", 'Space Mountain', "Gadget's Go Coaster", 'Disney Junior - Live on Stage!', 'Turtle Talk with Crush', "Soarin' Around the World", "It's A Small World", 'Matterhorn Bobsleds', "Ellen's Energy Adventure", "The Little Mermaid ~ Ariel's Undersea Adventure", "Mickey's PhilharMagic", 'Buzz Lightyear Astro Blasters', 'Disney Junior - Live on Stage!', 'Jungle Cruise', 'Finding Nemo Submarine Voyage', 'Red Car Trolley & News Boys', 'Pirates of the Caribbean', 'Storybook Land Canal Boats', 'Pirates of the Caribbean'], poi_names(max(recommendations), collect_list(poiName, 0, 0))='[("Frontierland Shootin\' Arcade", 5.431038856506348), (\'Star Tours\', 4.750153064727783), ("Snow White\'s Scary Adventures", 3.1163182258605957), (\'O Canada!\', 2.7878997325897217), ("It\'s Tough to Be a Bug!", 2.251542568206787), (\'The Barnstormer\', 1.9589238166809082

In [147]:
sagemaker_test_df = spark.sql('select hash(nsid) as user_hash_id,hash(poiID) as poi_hash_id,count(*) as pictures_taken from visits where abs(hash(nsid) % 10) < 4 group by 1,2')
sagemaker_train_df = spark.sql('select hash(nsid) as user_hash_id,hash(poiID) as poi_hash_id,count(*) as pictures_taken from visits where abs(hash(nsid) % 10) >= 4 group by 1,2')

In [149]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.feature import VectorAssembler


pipeline = Pipeline(stages = [
    StringIndexer(inputCol='user_hash_id', outputCol="user_hash_id_index", handleInvalid='keep'),
    OneHotEncoder(inputCol='user_hash_id_index', outputCol='user_hash_id_encoded'),
    StringIndexer(inputCol='poi_hash_id', outputCol='poi_hash_id_indexed', handleInvalid='keep'),
    OneHotEncoder(inputCol='poi_hash_id_indexed', outputCol='poi_hash_id_encoded'),
    QuantileDiscretizer(numBuckets=5, inputCol='pictures_taken', outputCol='interest_level'),
    VectorAssembler(inputCols=['poi_hash_id_encoded', 'user_hash_id_encoded', 'interest_level'],
                    outputCol='features'),
])

model = pipeline.fit(train_df)

In [151]:
sparse_df = model.transform(train_df)

In [152]:
sparse_df.show(5)

+------------+-----------+-----+--------------+------------------+--------------------+-------------------+-------------------+--------------+--------------------+
|user_hash_id|poi_hash_id|poiID|pictures_taken|user_hash_id_index|user_hash_id_encoded|poi_hash_id_indexed|poi_hash_id_encoded|interest_level|            features|
+------------+-----------+-----+--------------+------------------+--------------------+-------------------+-------------------+--------------+--------------------+
|  -858777831|-1057351352|   13|            14|             325.0|  (8903,[325],[1.0])|                6.0|     (31,[6],[1.0])|           3.0|(8935,[6,356,8934...|
|  1816327878|  441353343|   21|            16|            1917.0| (8903,[1917],[1.0])|               26.0|    (31,[26],[1.0])|           3.0|(8935,[26,1948,89...|
|     9814056|-1823081949|    3|             1|            6845.0| (8903,[6845],[1.0])|               24.0|    (31,[24],[1.0])|           1.0|(8935,[24,6876,89...|
|   588196111|-1

In [153]:
sagemaker_train_df, sagemaker_test_df = sparse_df.randomSplit([0.8, 0.2], seed=17)


In [180]:
# once pyspark supports writing to protobuf directly, as one can do in scala we could do:
#
# sagemaker_train_df.write.format("sagemaker").option("labelColumnName", "interest_level").option("featuresColumnName", "features").save("s3://mastering-ml-aws/chapter6/train-data/")

In [234]:
from scipy.sparse import csr_matrix
import numpy as np 
import boto3
import io
import numpy as np
import scipy.sparse as sp
import sagemaker.amazon.common as smac


def spark_vector_to_sparse_matrix(row):
    vect = row['features']
    return csr_matrix((vect.values, vect.indices, np.array([0, vect.values.size])),(1, vect.size), dtype=np.float32)

def upload_matrices_to_s3(dataframe, dataset_name):
    features_matrices = dataframe.select("features").rdd.map(spark_vector_to_sparse_matrix).collect()
    interest_levels = dataframe.select("interest_level").rdd.map(lambda r: r['interest_level']).collect()
    
    interest_level_vector = np.array(interest_levels, dtype=np.float32)
    buffer = io.BytesIO()
    smac.write_spmatrix_to_sparse_tensor(buffer, sp.vstack(features_matrices), interest_level_vector)
    buffer.seek(0)
    bucket = boto3.resource('s3').Bucket('mastering-ml-aws')
    bucket.Object('chapter6/%s-data.protobuf'%dataset_name).upload_fileobj(buffer)

In [235]:
upload_matrices_to_s3(sagemaker_train_df, 'train')
upload_matrices_to_s3(sagemaker_test_df, 'test')

In [239]:
features_matrices[0].shape

(1, 8935)