### Libraries Required

In [1]:
import pandas as pd
from sqlalchemy import create_engine
import mysql.connector
import pymysql
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

### Initiate Spark Session and Database Connection

In [2]:
appName = "Meal Recipe Collaborative Filtering"
master = "local[*]"

spark = SparkSession.builder.master(master).appName(appName).getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("WARN")

# Establish a connection
sqlEngine = create_engine('mysql+pymysql://root:5177FC9E-8C6B@54.88.63.24:3306/food_intol')

### Obtaining user ratings from Database

In [3]:
# Connecting to database
dbConnection    = sqlEngine.connect()

# Storing user ratings to pandas dataframe
panda_df = pd.read_sql('SELECT * FROM User_Ratings', dbConnection)

# Ending
dbConnection.close()

### Formatting and transform Panda Dataframe to Spark Dataframe

In [4]:
# turn pandas df into spark df for training
test_user_df = spark.createDataFrame(panda_df)

# convert user_id to string
test_user_df = test_user_df \
    .withColumn('user_id', test_user_df['user_id'].cast(IntegerType()))

In [5]:
# test_user_df.show()

+-------+------------+------+
|user_id|recipe_index|rating|
+-------+------------+------+
|      0|          10|   4.0|
|      0|         113|   3.0|
|      0|         442|   4.0|
|      1|           1|   4.0|
|      1|          12|   3.0|
|      1|          13|   5.0|
|      1|          14|   4.0|
|      1|          55|   5.0|
|      1|          88|   4.0|
|      2|           1|   4.0|
|      2|          33|   3.0|
|      2|          76|   3.0|
|      2|          78|   2.0|
|      2|         120|   3.0|
|      2|         421|   4.0|
|      2|         501|   5.0|
|      3|           1|   2.0|
|      3|           2|   4.0|
|      3|          17|   2.0|
|      3|         121|   3.0|
+-------+------------+------+
only showing top 20 rows



### Model Training

In [6]:
# train / test split
train, test = test_user_df.randomSplit([0.8, 0.2])

# define ALS model hyperparameters
als = ALS(maxIter=4, regParam=0.1, userCol="user_id", itemCol="recipe_index", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(train)

#### Model Performance

In [7]:
# apply model to test
# predictions = model.transform(test)

# eval = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
# rmse = eval.evaluate(predictions)
# print("Root-mean-square error = " + str(rmse))

MSE is quite poor, suggesting that more data is required to provide more reliable predictions<br>
Ideally we should have below 1.0 RMSE

### Extracting Features (Pandas)

In [8]:
userRecs = model.recommendForAllUsers(50)

In [9]:
pd.set_option('display.max_colwidth', None)
user_predictions = userRecs.toPandas()

user_predictions['user_id'] = user_predictions['user_id'].astype(str)

### Extract Recommendations

Run this function to get the N-number of recommendations that should be shown to the user

extractRecommendations(ratings_df, predictions_df, user_id, num_of_recommendations)<br>

Where:<br><br>
<b>ratings_df</b><br> refers to the user data table from the database, including user_id, recipe_id, recipe_ratings<br>

<b>predictions_df</b><br> is the recomemndations for each user based on the trained model<br>

<b>user_id</b><br> being the unique user identifying number<br>

<b>num_of_recommendations</b><br> being the number of recommendations you wish to output<br>

In [11]:
def extractRecommendations(ratings_df, predictions_df, user_id, num_of_recommendations):
    
    predicted_recipes = []
    user_ratings = ratings_df[ratings_df['user_id'] == user_id]['recipe_index'].tolist()
    
    for item in predictions_df[predictions_df['user_id'] == user_id]['recommendations'].tolist()[0]:
        predicted_recipes.append(item[0])
        
    return [x for x in predicted_recipes if x not in user_ratings][:num_of_recommendations]
    

In [12]:
user_list = []
for index, row in user_predictions.iterrows():
    user_list.append(row[0])

recommended_recipes = []
for user in user_list:
    recommended_recipes.append(extractRecommendations(panda_df, user_predictions, user, 10))
    
output_df = pd.DataFrame({'user_id': user_list, 'recommendations': recommended_recipes})

output_df['recommendations'] = output_df['recommendations'].apply(lambda x: ', '.join(map(str, x)))


### Pushing new recommendations to Database

In [13]:
dbConnection = sqlEngine.connect()

print("Database connection established")

output_df.to_sql('temp_table', dbConnection, if_exists='replace', index=False)

sql = """INSERT  User_Recommendations
            SELECT  *
            FROM    temp_table t
            ON DUPLICATE KEY UPDATE
                    recommendations = t.recommendations"""

dbConnection.execute(sql)

print("Database has been updated successfully")

dbConnection.close()

print("Database connection closed")

Database connection established
Database has been updated successfully
Database connection closed
