In [1]:
!pip install implicit
import implicit
from scipy.sparse import coo_matrix
from scipy.sparse import csr_matrix
import numpy as np
import pandas as pd
import random
from tqdm import tqdm
import math

In [2]:
import os
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS
from pyspark.sql.types import IntegerType

# data science imports
import math
import numpy as np
import pandas as pd

# visualization imports
import seaborn as sns
import matplotlib.pyplot as plt

In [3]:
!pip install findspark
import findspark
findspark.init()

In [4]:
# spark config
spark = SparkSession \
    .builder \
    .appName("meetup recommendation") \
    .getOrCreate()
# get spark contextb
sc = spark.sparkContext

In [5]:
members_rsvp = pd.read_csv("/dbfs/FileStore/tables/members_rsvp.csv")

In [6]:
## Turning the member_id and group_id to category and giving it an idx
members_rsvp['m_code'] = members_rsvp['id'].astype('category').cat.codes
members_rsvp['g_code'] = members_rsvp['group_id'].astype('category').cat.codes

In [7]:
## Get the final dataframe with just the member_codes, group_codes and the scaled rsvps
members_final = members_rsvp[['m_code','g_code','rsvp_total']]

In [8]:
from pyspark.sql import SQLContext
sql = SQLContext(sc)
df_spark = sql.createDataFrame(members_final)

In [9]:
type(df_spark)

In [10]:
df_spark.show()

In [11]:
ratings_data = df_spark.rdd.map(tuple)

In [12]:
type(ratings_data)

In [13]:
ratings_data.take(3)

In [14]:
## Split data
train, validation, test = ratings_data.randomSplit([6, 2, 2], seed=99)
# cache data
train.cache()
validation.cache()
test.cache()

In [15]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    """
    Grid Search Function to select the best model based on RMSE of hold-out data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            # train ALS model
            model = ALS.train(
                ratings=train_data,    # (userID, productID, rating) tuple
                iterations=num_iters,
                rank=rank,
                lambda_=reg,           # regularization param
                seed=99)
            # make prediction
            valid_data = validation_data.map(lambda p: (p[0], p[1]))
            predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
            # get the rating result
            ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
            # get the RMSE
            MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
            error = math.sqrt(MSE)
            print('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [16]:
# hyper-param config
num_iterations = 10
ranks = [8, 10, 12, 14, 16, 18, 20]
reg_params = [0.001, 0.01, 0.05, 0.1, 0.2]

# grid search and select best model
start_time = time.time()
final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

In [17]:
## ALS Learning curve
def plot_learning_curve(arr_iters, train_data, validation_data, reg, rank):
    """
    Plot function to show learning curve of ALS
    """
    errors = []
    for num_iters in arr_iters:
        # train ALS model
        model = ALS.train(
            ratings=train_data,    # (userID, productID, rating) tuple
            iterations=num_iters,
            rank=rank,
            lambda_=reg,           # regularization param
            seed=99)
        # make prediction
        valid_data = validation_data.map(lambda p: (p[0], p[1]))
        predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
        # get the rating result
        ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
        # get the RMSE
        MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
        error = math.sqrt(MSE)
        # add to errors
        errors.append(error)

    # plot
    plt.figure(figsize=(12, 6))
    plt.plot(arr_iters, errors)
    plt.xlabel('number of iterations')
    plt.ylabel('RMSE')
    plt.title('ALS Learning Curve')
    plt.grid(True)
    plt.display()

In [18]:
# create an array of num_iters
iter_array = list(range(1, 11))
# create learning curve plot
plot_learning_curve(iter_array, train, validation, 0.1, 20)

In [19]:
test_data = test.map(lambda p: (p[0], p[1]))
predictions = final_model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
# get the rating result
ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
# get the RMSE
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)
print('The out-of-sample RMSE of rating predictions is', round(error, 4))

In [20]:
### Time-Delta implicit recommendation
members_delta = pd.read_csv("/dbfs/FileStore/tables/members_delta.csv")

In [21]:
#turning the member_id and group_id to category and giving it an idx
members_delta['m_code'] = members_delta['id'].astype('category').cat.codes
members_delta['g_code'] = members_delta['group_id'].astype('category').cat.codes

In [22]:
members_delta_final = members_delta[['m_code','g_code','delta']]

In [23]:
members_delta_final.head()

Unnamed: 0,m_code,g_code,delta
0,122781,0,3.658363
1,5180,0,3.754448
2,11369,0,3.658363
3,296263,0,3.658363
4,122875,0,3.754448


In [24]:
sql = SQLContext(sc)
df_spark_del = sql.createDataFrame(members_delta_final)

In [25]:
type(df_spark_del)

In [26]:
df_spark_del.show()

In [27]:
delta_data = df_spark_del.rdd.map(tuple)

In [28]:
delta_data.take(3)

In [29]:
## Split data
train_delta, validation_delta, test_delta = delta_data.randomSplit([6, 2, 2], seed=99)
# cache data
train_delta.cache()
validation_delta.cache()
test_delta.cache()

In [30]:
# hyper-param config
num_iterations = 10
ranks = [8, 10, 12, 14, 16, 18, 20]
reg_params = [0.001, 0.01, 0.05, 0.1, 0.2]

# grid search and select best model
start_time = time.time()
final_model_delta = train_ALS(train_delta, validation_delta, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

In [31]:
test_data = test_delta.map(lambda p: (p[0], p[1]))
predictions = final_model_delta.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
# get the rating result
ratesAndPreds = test_delta.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
# get the RMSE
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)
print('The out-of-sample RMSE of rating predictions is', round(error, 4))