## Alternating Least Squares (ALS) - Implicit Rating Model #2

In [3]:
#import libraries & begin spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('ReadData').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
sqlContext = SQLContext(sc)
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col, translate
from pyspark.sql.types import IntegerType, FloatType, StringType, TimestampType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql.functions import when, col, mean, desc, round
from pyspark.sql.types import StructType, StructField

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
#create schema for the projects.csv
projects_schema = StructType([
    StructField("Project ID", StringType(), True),
    StructField("School ID", StringType(), True),
    StructField("Teacher ID", StringType(), True),
    StructField("Teacher Project Posted Sequence", IntegerType(), True),
    StructField("Project Type", StringType(), True),
    StructField("Project Title", StringType(), True),
    StructField("Project Essay", StringType(), True),
    StructField("Project Short Description", StringType(), True),
    StructField("Project Need Statement", StringType(), True),
    StructField("Project Subject Category Tree", StringType(), True),
    StructField("Project Subject Subcategory Tree", StringType(), True),
    StructField("Project Grade Level Category", StringType(), True),
    StructField("Project Resource Category", StringType(), True),
    StructField("Project Cost", DoubleType(), True),
    StructField("Project Posted Date", TimestampType(), True),
    StructField("Project Expiration Date", TimestampType(), True),
    StructField("Project Current Status", StringType(), True),
    StructField("Project Fully Funded Date", TimestampType(), True),
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
#read in all .csv files from s3 bucket
donations_df = spark.read.csv("s3://donorchoose//Donations.csv", header =True, inferSchema = True)
donors_df = spark.read.csv("s3://donorchoose/Donors.csv", header =True, inferSchema = True)
teachers_df = spark.read.csv("s3://donorchoose/Teachers.csv", header =True, inferSchema = True)
projects_df = spark.read.csv("s3://donorchoose/Projects.csv", header =True, multiLine = True, schema = projects_schema, escape='"')
resources_df = spark.read.csv("s3://donorchoose/Resources.csv", header =True, inferSchema = True)
schools_df = spark.read.csv("s3://donorchoose/Schools.csv", header =True, inferSchema = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
#join all dataframes for future data exploration and feature engineering
donor_donations = donations_df.join(donors_df, on = ['Donor ID'], how = 'inner')
projects_schools = projects_df.join(schools_df, on = ['School ID'], how = 'inner')
donor_projects = donor_donations.join(projects_schools, on = ['Project ID'], how = 'left')
projects_teachers = donor_projects.join(teachers_df, on = ['Teacher ID'], how = 'inner')
df = projects_teachers.join(resources_df, on = ['Project ID'], how = 'inner')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
#missing school cities were all Washington DC
df = df.fillna({'School City':'DC'})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
#rename all df column names to remove spaces
df_renamed = df.withColumnRenamed('Project ID', 'project_id').withColumnRenamed('Teacher ID', 'teacher_id')\
.withColumnRenamed('Donor ID', 'donor_id').withColumnRenamed('Donation ID', 'donation_id')\
.withColumnRenamed('Donation Included Optional Donation', 'optional_donation').withColumnRenamed('Donation Amount', 'donation_amount')\
.withColumnRenamed('Donor Cart Sequence', 'donor_cart_sequence').withColumnRenamed('Donation Received Date','donation_received_date')\
.withColumnRenamed('Donor City', 'donor_city').withColumnRenamed('Donor State', 'donor_state').withColumnRenamed('Donor Is Teacher', 'donor_is_teacher')\
.withColumnRenamed('Donor Zip', 'donor_zip').withColumnRenamed('School ID', 'school_id')\
.withColumnRenamed('Teacher Project Posted Sequence', 'teacher_project_posted_seq')\
.withColumnRenamed('Project ID', 'project_id').withColumnRenamed('Project Type', 'project_type')\
.withColumnRenamed('Project Title', 'project_title').withColumnRenamed('Project Subject Category Tree', 'project_cat')\
.withColumnRenamed('Project Subject Subcategory Tree', 'project_cat2').withColumnRenamed('Project Grade Level Category', 'project_grade_level_cat')\
.withColumnRenamed('Project Resource Category', 'project_resource_cat').withColumnRenamed('Project Cost', 'project_cost')\
.withColumnRenamed('Project Posted Date', 'project_posted_date').withColumnRenamed('Project Expiration Date', 'project_exp_date')\
.withColumnRenamed('Project Current Status', 'project_curr_stat').withColumnRenamed('Project Fully Funded Date', 'project_fully_funded_date')\
.withColumnRenamed('School Name', 'school_name').withColumnRenamed('School Metro Type', 'school_metro_type')\
.withColumnRenamed('School Percentage Free Lunch', 'school_percent_free_lunch').withColumnRenamed('School State', 'school_state')\
.withColumnRenamed('School Zip', 'school_zip').withColumnRenamed('School City', 'school_city').withColumnRenamed('School County', 'school_county')\
.withColumnRenamed('School District', 'school_district').withColumnRenamed('Teacher Prefix', 'teacher_prefix')\
.withColumnRenamed('Teacher First Project Posted Date', 'teacher_first_proj').withColumnRenamed('Resource Item Name', 'resource_item_name')\
.withColumnRenamed('Resource Quantity', 'resource_quantity').withColumnRenamed('Resource Unit Price', 'resource_unit_price')\
.withColumnRenamed('Resource Vendor Name', 'resource_vendor_name')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
#cast timestamps to all time columns
df_renamed = df_renamed.withColumn("donation_received_date",to_date(unix_timestamp(col("donation_received_date"), "yyyy-MM-dd").cast("timestamp")))
df_renamed = df_renamed.withColumn("project_posted_date",to_date(unix_timestamp(col("project_posted_date"), "yyyy-MM-dd").cast("timestamp")))
df_renamed = df_renamed.withColumn("project_exp_date",to_date(unix_timestamp(col("project_exp_date"), "yyyy-MM-dd").cast("timestamp")))
df_renamed = df_renamed.withColumn("project_fully_funded_date",to_date(unix_timestamp(col("project_fully_funded_date"), "yyyy-MM-dd").cast("timestamp")))
df_renamed = df_renamed.withColumn("teacher_first_proj",to_date(unix_timestamp(col("teacher_first_proj"), "yyyy-MM-dd").cast("timestamp")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
#create new datetime columns for future data exploration
df_renamed = df_renamed.withColumn('teacher_date_exp_dif',datediff(df_renamed.project_exp_date.cast('date'),df_renamed.teacher_first_proj.cast('date')))
df_renamed= df_renamed.withColumn('project_days',datediff(df_renamed.project_exp_date.cast('date'),df_renamed.project_posted_date.cast('date')))

df_renamed = df_renamed.withColumn('post_date_day', date_format(col('project_posted_date'), "E"))
df_renamed = df_renamed.withColumn('post_date_month', month(df_renamed['project_posted_date']))
df_renamed = df_renamed.withColumn('post_date_year', year(df_renamed['project_posted_date']))

df_renamed = df_renamed.withColumn('donation_day', date_format(col('donation_received_date'), "E"))
df_renamed = df_renamed.withColumn('donation_month', month(df_renamed['donation_received_date']))
df_renamed = df_renamed.withColumn('donation_year', year(df_renamed['donation_received_date']))

df_renamed = df_renamed.withColumn('t_proj_post_day', date_format(col('teacher_first_proj'), "E"))
df_renamed = df_renamed.withColumn('t_proj_post_month', month(df_renamed['teacher_first_proj']))
df_renamed = df_renamed.withColumn('t_proj_post_year', year(df_renamed['teacher_first_proj']))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
#cast column types for ints and floats that weren't inferred at import

df_renamed = df_renamed.withColumn('school_percent_free_lunch',df_renamed.school_percent_free_lunch.cast('float'))
df_remamed = df_renamed.withColumn('donor_cart_sequence',df_renamed.donor_cart_sequence.cast('int'))
df_renamed = df_renamed.withColumn('donation_amount',df_renamed.donation_amount.cast('float'))
df_renamed = df_renamed.withColumn('teacher_project_posted_seq',df_renamed.teacher_project_posted_seq.cast('int'))
df_renamed = df_renamed.withColumn('project_cost',df_renamed.project_cost.cast('float'))
df_renamed = df_renamed.withColumn('resource_quantity',df_renamed.resource_quantity.cast('float'))
df_renamed = df_renamed.withColumn('resource_unit_price',df_renamed.resource_unit_price.cast('float'))
df_renamed = df_renamed.withColumn('resource_unit_price',df_renamed.resource_unit_price.cast('float'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
#import libraries for modeling
import pyspark.sql.functions as sql_func
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
#split main categories and sub categories as they allow for two entries per column
split_col = split(df_renamed['project_cat'], ',')
df_renamed = df_renamed.withColumn('main_cat1', split_col.getItem(0))
df_renamed = df_renamed.withColumn('main_cat2', split_col.getItem(1))
split_col = split(df_renamed['project_cat2'], ',')
df_renamed = df_renamed.withColumn('subcat1', split_col.getItem(0))
df_renamed = df_renamed.withColumn('subcat2', split_col.getItem(1))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
#create new table that will bring in donor_id, main_cat1, and count of donation_id's 
df3 = df_renamed.select(df_renamed['donor_id'], df_renamed['main_cat1'], df_renamed['donation_id'])
#sum the donation amount
donor_project_table = df3.groupBy("donor_id", "main_cat1").agg(countDistinct('donation_id'))
#create new user table that will take distinct donor ID's and map them to distinct userIDs
users = df_renamed.select("donor_id").distinct()
users = users.coalesce(1)
users = users.withColumn("userID", monotonically_increasing_id()).persist()
#create new user table that will take distinct categories and map them to distinct catIDs
cats = df_renamed.select("main_cat1").distinct()
cats = cats.coalesce(1)
cats = cats.withColumn("catID", monotonically_increasing_id()).persist()
#join tables
prepped = donor_project_table.join(users, "donor_id", "inner").join(cats, "main_cat1", "inner")
prepped = prepped.select(prepped['userID'], prepped['catID'], prepped['count(DISTINCT donation_id)'])
#renamed count column and create final table for the implicit als model
ml_df = prepped.withColumnRenamed('count(DISTINCT donation_id)', 'count')
ml_df.sort(col('count').desc()).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+-----+
| userID|catID|count|
+-------+-----+-----+
|1712928|    7|10187|
| 981151|    7| 5547|
| 180723|    6| 5303|
|1231374|    6| 4584|
| 981151|    6| 3054|
|1712928|    6| 2876|
|1712928|    3| 2165|
| 981151|    3| 2156|
|1592967|    7| 2035|
|1331509|    6| 1962|
| 570350|    6| 1928|
|1231374|    3| 1928|
|1221365|    6| 1855|
|1702785|    6| 1809|
| 500437|    6| 1794|
| 981151|    2| 1774|
|  40550|    6| 1765|
|1712928|    2| 1677|
| 180723|    3| 1669|
|1091447|    7| 1584|
+-------+-----+-----+
only showing top 20 rows

In [15]:
#create ROEM function which will evaluate the ALS model
#(https://github.com/jamenlong/ALS_expected_percent_rank_cv/blob/master/ROEM_function.py)

def ROEM(predictions, userCol = "userId", itemCol = "catID", ratingCol = "count"):
    #Creates table that can be queried
    predictions.createOrReplaceTempView("predictions")
    #Sum of total number of plays of all songs
    denominator = predictions.groupBy().sum(ratingCol).collect()[0][0]
    #Calculating rankings of songs predictions by user
    spark.sql("SELECT " + userCol + " , " + ratingCol + " , PERCENT_RANK() OVER (PARTITION BY " + userCol + " ORDER BY prediction DESC) AS rank FROM predictions").createOrReplaceTempView("rankings")
    #Multiplies the rank of each song by the number of plays and adds the products together
    numerator = spark.sql('SELECT SUM(' + ratingCol + ' * rank) FROM rankings').collect()[0][0]
    performance = numerator/denominator
    return performance

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
#create als model with default parameters
#split data into training and testing sets, set up base ALS model
training, testing = ml_df.randomSplit([0.6, 0.4])
#create base ALS model
als = ALS(userCol="userID", itemCol="catID", ratingCol="count", coldStartStrategy="drop", nonnegative = True, implicitPrefs = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
#fit base model to training data set
model = als.fit(training)
#pull predictions based off of base model and testing data set
predictions = model.transform(testing)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
#base performance
performance = ROEM(predictions, userCol = "userId", itemCol = "catID", ratingCol = "count")
print(performance)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.18879302687557417

In [19]:
#create 3 fold for cross validation
test1, test2, test3 = training.randomSplit([0.33, 0.33, 0.33])
train1 = test2.union(test3)
train2 = test3.union(test1)
train3 = test1.union(test2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
#create dummy variables for future use
best_validation_performance = 9999999999999
best_rank = 0
best_maxIter = 0
best_regParam = 0
best_alpha = 0
best_model = 0
best_predictions = 0

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
#define hyperparameters
ranks = [10,50]
maxIters = [10, 25]
regParams = [.05, .1]
alphas = [10, 40]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# For loop will create the 3 ALS models
for r in ranks:
    for mi in maxIters:
        for rp in regParams:
            for a in alphas:
                als = ALS(rank = r, maxIter = mi, regParam = rp, alpha = a, userCol='userID', itemCol='catID', ratingCol='count',
                    coldStartStrategy="drop", nonnegative = True, implicitPrefs = True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
#fit model 1 to training data
model1 = als.fit(train1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-18:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 1322

An error occurred while calling o526.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
java.lang.StackOverflowError
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1841)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1534)
	at java.io.ObjectOutpu

In [24]:
#fit model 2 to training data
model2 = als.fit(train2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-23:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2207

An error occurred while calling o526.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
java.lang.StackOverflowError
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject

In [25]:
#fit model 3 to training data
model3 = als.fit(train3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-24:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 3906

An error occurred while calling o526.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
java.lang.StackOverflowError
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject

In [26]:
#create predictions for all 3 models
predictions1 = model1.transform(test1)
predictions2 = model2.transform(test2)
predictions3 = model3.transform(test3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'model1' is not defined
Traceback (most recent call last):
NameError: name 'model1' is not defined



In [27]:
#performance of model #1
performance1 = ROEM(predictions1, userCol = "userId", itemCol = "catID", ratingCol = "count")
print(performance1)

VBox()

Exception in thread cell_monitor-25:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 5404



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'predictions1' is not defined
Traceback (most recent call last):
NameError: name 'predictions1' is not defined



In [28]:
#performance of model #2
performance2 = ROEM(predictions2, userCol = "userId", itemCol = "catID", ratingCol = "count")
print(performance2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'predictions2' is not defined
Traceback (most recent call last):
NameError: name 'predictions2' is not defined



In [None]:
#performance of model #2
performance3 = ROEM(predictions3, userCol = "userId", itemCol = "catID", ratingCol = "count")
print(performance3)

In [35]:
#get best performance ROEM and best predictions comparing the three models to the original base model
if performance < best_validation_performance:
    best_validation_performance = performance
    best_rank = r
    best_maxIter = mi
    best_regParam = rp
    best_alpha = a
    best_model = validation_model
    best_predictions = validation_predictions
    print ("**Best Model** ")
    print ("  Percent Rank Error: "), best_validation_performance
    print ("  Rank: "), best_rank
    print ("  MaxIter: "), best_maxIter
    print ("  RegParam: "), best_regParam
    print ("  Alpha: "), best_alpha
return best_model, best_predictions

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'validation_performance' is not defined
Traceback (most recent call last):
NameError: name 'validation_performance' is not defined

