# Final Project - AWS

Originally, the plan was to run the code with Jupyter notebook on a single AWS EC2 instance.  The specs available in the free tier were not conducive to this plan, as they were worse than the specs available in Google Colab.  Spark on Google Colab was already an improvement over local Jupyter Notebook and Pandas - the Spark dataframes did allow me to create a global baseline recommender with the full user-ratings dataset (although the code above only uses a fraction, as the ultimate goal was to move to AWS resources).  Google Colab was also limited in its runtime as it would disconnect after around half a day - anything running longer than this would be lost.  

To solve this issue, I ended up creating a Spark cluster composed of various EC2 instances.  The first step was to upload my documents into an S3 bucket.  

I created an S3 bucket called data-612-kkoon, where I placed the anime_summaries.csv, anime.csv, and ratings.csv objects.  This process was fairly straightforward.  Bucket versioning was disabled as I would only be reading from this bucket.  I did accidentally create this bucket in the wrong region at first - I deleted the original bucket and created a new one in the correct region.  However, another option could be to clone the S3 bucket across regions with S3-Cross Region Replication.  Since I only had three files uploaded, I chose the simpler route which was to recreate the bucket.  

The next step was to create the master EC2 instance.  I used the Amazon Linux 2023 kernel-6.1 AMI given its compatibility with PySpark - I had issues running PySpark locally on my Windows computer as it crashed with any matrix operation.  Google Colab is also Linux based.  

There were four instance types available on free tier:

1.  t3.micro, with 2 vCPUs and 1 GiB memory
2.  t3.small, with 2 vCPUs and 2 GiB memory
3.  c7i-flex.large, with 2vCPUs and 4 GiB memory
4.  m7i-flex.large, with 2vCPU and 8 GiB memory

At the time, I did not know that this instance would be my master instance with Spark.  As such, I chose the m7i-flex.large instance type, for the 8GiB memory given the large dataset and its issues with memory in previous projects.  However, since computation is done on the worker nodes, the m7i-flex.large is potentially not a necessary choice for the master node.  If I were to consider cost savings later on, I could try to downsize this EC2 instance.  Currently it is still using the m7i-flex.large instance type, like all of the workers in my Spark cluster.  

I created a VPC and subnet for this project, to ensure private access to my data.  All EC2 instances used this VPC and subnet.  For my master node, inbound rules were created to allow a custom TCP from this VPC through specified ports.  This also ended up unnecessary as I set up and used an SSH tunnel to allow access to Jupyter Notebook, Spark Master, and Spark UI on my local computer.  I also added an inbound rule to allow SSH access from my IP.  

I created a second security group for the worker nodes, which included inbound rules for SSH access from my IP, custom TCP from the VPC for ports 8081 (Spark worker UI) and 7337 (Spark Shuffle Service, as I had to manually start it on each worker due to my Spark installation), and a custom TCP pointing to the master node security group to allow communication from Spark Master (port 7077).  For the master security group, I added three inbound rules to allow the worker nodes to communicate with the master nodes (7077 for Spark Master port, 7078 for Spark Driver port, 7079 for Spark Block manager port).  For future steps, I would re-assess the need for the inbound rule for port 8081 for the worker security group and 7077 for the master security group.  I had incrementally added these rules to fix an issue that was preventing the master node from communicating with the worker nodes, so there may be extraneous, unnecessary inbound rules configured.  

To access the EC2 instances, I used PuTTY to SSH into these instances with private key authentication credentials.  I used the same private key for all nodes.  For the master node, I created an SSH tunnel to Jupyter Notebook, the master Spark UI, and the PySpark UI (ports 8888, 8080, and 4040, respectively).  All nodes used the same default user login, ec2-user.  

To set up the master node, I installed Jupyter Notebook.  When launching Jupyter Notebook, I can use the specified token to access it on my local computer via the SSH tunnel set up in the previous step.  For all nodes, I installed Java, Spark, and Scikit-Learn.  I set up the JAVE_HOME and SPARK_HOME environment variables globally.  

In AWS, I created an IAM role with S3 admin access.  I assigned this IAM role to all the nodes to allow it to access the S3 bucket.  

To create and connect the Spark Cluster, I started the master node and then all worker nodes.  I checked the Master UI on my local computer via the SSH tunnel and confirmed the worker connection.  I also manually started shuffle services on each worker node, which I enable in my Jupyter Notebook.

I began with 3 workers but after attempting to run my recommender system, I was receiving errors related to disk space.  I created a custom AMI image from one of my workers and launched 7 new workers with 32GiB EBS each, using the AMI image so that all the installs were already available for my new workers.  AWS free tier has a 16 total core limitation, which meant my Spark cluster was maxed out at 7 workers and 1 master.  

After some testing, I realized that my problem was not computational power, but memory.  Additionally, the large number of workers resulted in too much data shuffling.  This caused the Spark cluster to run my code even slower than in Google Colab.  After making adjustments to how I cached and unpersisted dataframes, I used only one worker with 2 vCPU, 8GiB RAM, and 108 EBS to allow persisted dataframes to spill into disk if required.  The Spark cluster set up (e.g. master node - worker node) was not necessary, but since my code was set up to use it and my master node had only 8GiB EBS, I continued with this configuration.  

Additionally, after examining my code, I realized that the cross-join for cosine similarity was resulting in disproportionate computation burden compared to added value.   I sorted the anime by number of user ratings and sorted by count of ratings.  I found that I could have 1,000,000 user-anime ratings with only 100 anime. Since the computation time of the cross join would increase exponentially, I chose to run the code for 100 of the top animes, which was over 1,000,000 user ratings to fulfil project requirements.

With everything set-up, I can now run my code with the following PySpark configuration:


In [1]:
# Set environment variables
import os
os.environ["SPARK_HOME"] = "/opt/spark"
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-amazon-corretto'
os.environ["PYSPARK_PYTHON"] = "/home/ec2-user/sklearn-env/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/home/ec2-user/sklearn-env/bin/python"

# Initialize findspark
import findspark
findspark.init()

# Create Spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("spark://10.0.0.218:7077") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.1026")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "6g")\
    .config("spark.executor.instances", "1") \
    .config("spark.cores.max", "4") \
    .config("spark.driver.host", "10.0.0.218") \
    .config("spark.driver.port", "7078") \
    .config("spark.blockManager.port", "7079") \
    .config("spark.driver.host", "10.0.0.218") \
    .config("spark.driver.bindAddress", "10.0.0.218") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.executorEnv.PYSPARK_PYTHON", "/home/ec2-user/sklearn-env/bin/python") \
    .config("spark.default.parallelism", "8")\
    .config("spark.sql.shuffle.partitions", "8")\
    .config("spark.shuffle.service.enabled", "true") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .getOrCreate()


spark

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1a219bbc-b684-42ff-a958-521c122332fa;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 264ms :: artifacts dl 5ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	:: evicted modules:
	com.amazonaws#aws-java-sdk-bundle;1.11.1026 by [com.amazonaws#aws-java-sdk-bundle;1.12.262] in [default]
	-----------------------------------------------------------

In [2]:
import findspark
from pyspark import SparkContext
import pyspark
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import IntegerType,StringType,StructField,StructType, BooleanType, FloatType
from pyspark.sql.functions import count, concat_ws, col, lit, mean, count_distinct, when, split, udf, coalesce, when, sum, abs, first
from pyspark.sql import Window
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.neighbors import NearestNeighbors as KNN
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, Normalizer
from pyspark.ml.linalg import DenseVector, Vectors
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
anime_desc_schema = StructType([
    StructField('anime_id', IntegerType()),
    StructField('description', StringType())
])
anime_descriptions = spark.read.option("sep", ";").csv('s3a://data-612-kkoon/anime_summaries.csv', schema = anime_desc_schema)
anime_descriptions = anime_descriptions.filter(col("anime_id").isNotNull())
schema = StructType([
    StructField('anime_id', IntegerType()),
    StructField('name', StringType()),
    StructField('genre', StringType()),
    StructField('type', StringType()),
    StructField('episodes', FloatType()),
    StructField('rating', FloatType()),
    StructField('members', IntegerType())
])
anime = spark.read.csv('s3a://data-612-kkoon/anime.csv', schema = schema, header = True)
anime_full = anime.join(anime_descriptions, on = "anime_id", how = "inner")

rating_schema = StructType([
    StructField('user_id', IntegerType()),
    StructField('anime_id', IntegerType()),
    StructField('rating', FloatType())
])
rating = spark.read.csv('s3a://data-612-kkoon/rating.csv', schema = rating_schema, header = True)
rating = rating.filter(col('rating') != -1)
rating = rating.dropna()

rating = rating.cache()
#new_rating = rating.select("user_id").distinct().sample(False, 0.15, seed=63) # meet project requirements - at least 10k users
#new_rating = new_rating.join(rating, on="user_id", how = "inner")
train_df, test_df = rating.randomSplit([0.8,0.2], seed = 63)
train_df_cache = train_df.cache()

train_means = train_df.select(mean('rating')).collect()[0][0]
print(train_means)

user_bias = train_df.groupBy("user_id").mean("rating")
user_bias = user_bias.withColumnRenamed("avg(Rating)","user_bias")
user_bias = user_bias.withColumn("user_bias", user_bias["user_bias"] - train_means)

user_bias_cached = user_bias.cache()
user_bias_cached.show()

anime_bias = train_df.groupBy("anime_id").mean("rating")
anime_bias = anime_bias.withColumnRenamed("avg(Rating)","anime_bias")
anime_bias = anime_bias.withColumn("anime_bias", anime_bias["anime_bias"] - train_means)

anime_bias_cached = anime_bias.cache()
anime_bias_cached.show()

25/07/21 01:48:53 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

7.808683581523678
+-------+--------------------+
|user_id|           user_bias|
+-------+--------------------+
|     12|   1.080205307365211|
|     14| -0.6328594056995023|
|     18|   1.401842734265796|
|     38| -1.1453172448900153|
|     46|  0.7481791635743607|
|     67|  1.1913164184763216|
|     70|  2.1913164184763216|
|     93| -0.3136340765731829|
|    107| 0.19131641847632164|
|    148| -0.3570706782978723|
|    161|  1.1913164184763216|
|    171|  1.4720181728622874|
|    186|  0.5327798331104683|
|    190| -0.6486835815236782|
|    198|  2.0484592756191793|
|    202|-0.39489047807540256|
|    203|  1.1913164184763216|
|    218|  1.1913164184763216|
|    225|  0.3264515536114567|
|    232|-0.39489047807540256|
+-------+--------------------+
only showing top 20 rows

+--------+--------------------+
|anime_id|          anime_bias|
+--------+--------------------+
|   11771|  0.8060012787752173|
|    1943|  0.4000044778520033|
|   16782|  0.6166037747981603|
|   17265|  0.480734

# Global Baseline Recommender

In [4]:
global_baseline_train = train_df_cache.join(user_bias_cached, on = "user_id", how = "left")
global_baseline_train = global_baseline_train.join(anime_bias_cached, on = "anime_id", how = "left")

global_baseline_train = global_baseline_train.withColumn("prediction",
                                                         coalesce(global_baseline_train["user_bias"],lit(0)) +
                                                         coalesce(global_baseline_train["anime_bias"],lit(0)) +
                                                         train_means)

global_baseline_train = global_baseline_train.withColumn("prediction", when(global_baseline_train["prediction"] < 1, 1)
    .when(global_baseline_train["prediction"] > 10, 10)
    .otherwise(global_baseline_train["prediction"]))

global_baseline_train_cache = global_baseline_train.cache()
global_baseline_train_cache.show()

[Stage 11:>                                                         (0 + 1) / 1]

+--------+-------+------+--------------------+------------------+-----------------+
|anime_id|user_id|rating|           user_bias|        anime_bias|       prediction|
+--------+-------+------+--------------------+------------------+-----------------+
|      18|    202|   8.0|-0.39489047807540256|0.5148950137940469|7.928688117242323|
|      18|   2115|   7.0| -0.4407906049350494|0.5148950137940469|7.882787990382676|
|      18|   2501|   9.0|  0.7839090110689151|0.5148950137940469|9.107487606386641|
|      18|   3196|  10.0|  0.9034376305975345|0.5148950137940469| 9.22701622591526|
|      18|   3781|   6.0| -0.4299460399954391|0.5148950137940469|7.893632555322286|
|      18|   4662|   9.0| 0.10481122816490274|0.5148950137940469|8.428389823482629|
|      18|   5427|   9.0|  1.2079830851429891|0.5148950137940469|9.531561680460715|
|      18|   5767|   8.0| -1.0434661902193305|0.5148950137940469|7.280112405098395|
|      18|   6118|   9.0|  1.1622707753227948|0.5148950137940469| 9.48584937

                                                                                

In [5]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
rmse_gb_train = evaluator.evaluate(global_baseline_train_cache)
print(rmse_gb_train)



1.2117542539704813


                                                                                

In [6]:
global_baseline_test = test_df.join(user_bias_cached, on = "user_id", how = "left")
global_baseline_test = global_baseline_test.join(anime_bias_cached, on = "anime_id", how = "left")

global_baseline_test = global_baseline_test.withColumn("prediction",
                                                       coalesce(global_baseline_test["user_bias"],lit(0)) +
                                                       coalesce(global_baseline_test["anime_bias"],lit(0)) +
                                                       train_means)
global_baseline_test = global_baseline_test.withColumn("prediction", when(global_baseline_test["prediction"] < 1, 1)
    .when(global_baseline_test["prediction"] > 10, 10)
    .otherwise(global_baseline_test["prediction"]))

global_baseline_test_cache = global_baseline_test.cache()
global_baseline_test_cache.show()

+--------+-------+------+--------------------+--------------------+------------------+
|anime_id|user_id|rating|           user_bias|          anime_bias|        prediction|
+--------+-------+------+--------------------+--------------------+------------------+
|   11617|      1|  10.0|  2.1913164184763216|-0.03175168038695908| 9.968248319613041|
|     170|      3|   9.0|-0.33571060855070556|  0.8107806065572403| 8.283753579530213|
|     225|      3|   9.0|-0.33571060855070556| -0.9396735581816493|6.5332994147913235|
|     813|      3|  10.0|-0.33571060855070556|  0.5133861122461214| 7.986359085219094|
|    1119|      3|   7.0|-0.33571060855070556| -0.6946402261985138| 6.778332746774459|
|    1292|      3|   6.0|-0.33571060855070556|-0.31398312069418566| 7.158989852278787|
|    1564|      3|   7.0|-0.33571060855070556| -0.5502909060404635| 6.922682066932509|
|    2201|      3|   7.0|-0.33571060855070556| -0.7775417130115674| 6.695431259961405|
|    7695|      3|   7.0|-0.335710608550705

In [7]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
rmse_gb_test = evaluator.evaluate(global_baseline_test_cache)
print(rmse_gb_test)



1.2297122835232495


                                                                                

# Content Based Recommender

When calculating cosine similarity, computation time increases exponentially due to the cross join.  Cosine similarity was identified as a bottleneck, as it would take hours on just the cosine similarity step.  An anime with only one rating could be disproportionately affecting computation time in comparison to the value it adds.  Given that 1 million ratings is the target for this project, I will compute the cosine similarity for only the top 100 anime.  

In [8]:
rating.groupBy("anime_id").agg(count("user_id").alias("user_count")).orderBy(col("user_count").desc()).limit(100).agg(sum("user_count")).show()
anime_pared = rating.groupBy("anime_id").agg(count("user_id").alias("user_count")).orderBy(col("user_count").desc()).limit(100).select("anime_id")
rating.unpersist()

+---------------+
|sum(user_count)|
+---------------+
|        1345994|
+---------------+



DataFrame[user_id: int, anime_id: int, rating: float]

In [9]:
new_anime_full = anime_full.join(anime_pared, on = "anime_id", how = "inner")

new_anime_full = new_anime_full.withColumn("description", F.regexp_replace("description", '\"', '')) # remove quotes
tokenizer = Tokenizer(inputCol="description", outputCol="words")
anime_df_processed = tokenizer.transform(new_anime_full)

stop_words = StopWordsRemover.loadDefaultStopWords("english")
remover = StopWordsRemover(inputCol="words", outputCol="words_nosw", stopWords=stop_words)
anime_df_processed_nosw = remover.transform(anime_df_processed)

hashingTF = HashingTF(inputCol="words_nosw", outputCol="features", numFeatures=5000) #numFeatures=500
anime_features_df = hashingTF.transform(anime_df_processed_nosw)

anime_features_df.show()

[Stage 32:>                                                         (0 + 1) / 1]

+--------+--------------------+--------------------+----+--------+------+-------+--------------------+--------------------+--------------------+--------------------+
|anime_id|                name|               genre|type|episodes|rating|members|         description|               words|          words_nosw|            features|
+--------+--------------------+--------------------+----+--------+------+-------+--------------------+--------------------+--------------------+--------------------+
|   16498|  Shingeki no Kyojin|Action, Drama, Fa...|  TV|    25.0|  8.54| 896229|Centuries ago, ma...|[centuries, ago,,...|[centuries, ago,,...|(5000,[78,122,175...|
|    5114|Fullmetal Alchemi...|Action, Adventure...|  TV|    64.0|  9.26| 793665|After a horrific ...|[after, a, horrif...|[horrific, alchem...|(5000,[49,152,342...|
|   30276|       One Punch Man|Action, Comedy, P...|  TV|    12.0|  8.82| 552458|The seemingly uni...|[the, seemingly, ...|[seemingly, unimp...|(5000,[1,20,83,14...|
|   

                                                                                

In [10]:
idf = IDF(inputCol="features", outputCol="IDF_features")
idf_model = idf.fit(anime_features_df)
tfidf_df = idf_model.transform(anime_features_df)
tfidf_df = tfidf_df.select("anime_id","IDF_features")
tfidf_df.show()

                                                                                

+--------+--------------------+
|anime_id|        IDF_features|
+--------+--------------------+
|   16498|(5000,[78,122,175...|
|    5114|(5000,[49,152,342...|
|   30276|(5000,[1,20,83,14...|
|   11757|(5000,[105,108,11...|
|      20|(5000,[89,133,188...|
|   22319|(5000,[78,87,189,...|
|    1575|(5000,[43,242,378...|
|   20507|(5000,[2,46,116,1...|
|   22199|(5000,[63,157,171...|
|     269|(5000,[58,92,96,1...|
|   10620|(5000,[133,233,23...|
|   21881|(5000,[15,117,285...|
|    9919|(5000,[40,78,102,...|
|       1|(5000,[13,161,231...|
|   22535|(5000,[12,102,150...|
|      30|(5000,[11,20,49,1...|
|    2904|(5000,[49,116,302...|
|   27899|(5000,[19,67,92,1...|
|    6702|(5000,[15,49,188,...|
|   18679|(5000,[99,103,211...|
+--------+--------------------+
only showing top 20 rows



In [11]:
normalizer = Normalizer(inputCol="IDF_features", outputCol="norm")
normalized_tfidf_df = normalizer.transform(tfidf_df)
normalized_tfidf_df_2 = normalized_tfidf_df.withColumnRenamed("anime_id", "anime_id_2").withColumnRenamed("norm","norm_2")
#normalized_tfidf_df = normalized_tfidf_df.repartition("anime_id")
#normalized_tfidf_df_2 = normalized_tfidf_df_2.repartition("anime_id_2")

sim_cos = udf(lambda x,y : float(x.dot(y)), FloatType())

cosine_sim = (
    normalized_tfidf_df
    .crossJoin(normalized_tfidf_df_2)
    .filter(normalized_tfidf_df["anime_id"] != normalized_tfidf_df_2["anime_id_2"])
    .withColumn("similarity", sim_cos(normalized_tfidf_df["norm"], normalized_tfidf_df_2["norm_2"]))
    .select(
        "anime_id",
        "anime_id_2",
        "similarity"
    )
)

cosine_sim_cache = cosine_sim.cache()
cosine_sim_cache.show()

[Stage 47:>                                                         (0 + 1) / 1]

+--------+----------+-----------+
|anime_id|anime_id_2| similarity|
+--------+----------+-----------+
|   16498|      5114|0.046655748|
|   16498|     30276| 0.04346148|
|   16498|     11757|0.048580788|
|   16498|        20|0.041770328|
|   16498|     22319| 0.03133955|
|   16498|      1575|0.018288309|
|   16498|     20507| 0.04277198|
|   16498|     22199|0.023875764|
|   16498|       269|0.011576882|
|   16498|     10620|0.025270337|
|   16498|     21881|0.052557766|
|   16498|      9919| 0.02926118|
|   16498|         1|0.016164998|
|   16498|     22535|0.015055798|
|   16498|        30| 0.07857197|
|   16498|      2904|0.022364216|
|   16498|     27899|0.012461833|
|   16498|      6702| 0.00723235|
|   16498|     18679|0.044972453|
|   16498|      3588|0.022739379|
+--------+----------+-----------+
only showing top 20 rows



                                                                                

In [12]:
train_df_pared = global_baseline_train_cache.join(cosine_sim_cache, on="anime_id", how = "inner") 
global_baseline_train.unpersist()

test_df_pared = global_baseline_test_cache.join(cosine_sim_cache, on="anime_id", how = "inner")
global_baseline_test.unpersist()

train_df_pared = train_df_pared.withColumn("rating", train_df_pared["rating"] - train_df_pared["prediction"])
train_df_pared = train_df_pared.select(col("user_id"),col("anime_id"),col("anime_id_2"),col("rating"),col("similarity"),col("prediction").alias("global_baseline"))
train_df_pared_cache = train_df_pared.cache()

test_df_pared = test_df_pared.withColumn("rating", test_df_pared["rating"] - test_df_pared["prediction"])
test_df_pared = test_df_pared.select(col("user_id"),col("anime_id"),col("anime_id_2"),col("rating"),col("similarity"),col("prediction").alias("global_baseline"))
test_df_pared_cache = test_df_pared.cache()

In [13]:
predictions = train_df_pared_cache.withColumn( # prediction is then retrieved by taking the cosine similarity * rating for similar items
    "cos_simXratings", col("similarity") * col("rating")
).groupBy("user_id", "anime_id_2").agg(
    sum("cos_simXratings").alias("sumCos_simXratings"), # sum up the cosine similarity * rating
    sum("similarity").alias("sum_similarity"),
).withColumn(
    "prediction",
    when(col("sum_similarity") != 0, (col("sumCos_simXratings") / col("sum_similarity")))
    .otherwise(None)  # prediction is sum of cosine similarity * rating for all similar items, divided by sum of all cosine similarities for those items
).select("user_id","anime_id_2","prediction").distinct()
predictions_cache = predictions.cache()

train_df_renamed = train_df_pared_cache.select("user_id","anime_id","rating","global_baseline").withColumnRenamed("anime_id", "anime_id_2").distinct() # we want to compare the predictions for "anime_2", but in the user-rating, this refers to the rating in anime_1
predictions_train = predictions_cache.join(train_df_renamed, on = ["user_id","anime_id_2"], how = "inner")\
    .withColumn("prediction", coalesce("prediction",lit(0)) + col("global_baseline"))\
    .withColumn("rating",col("rating") + col("global_baseline"))

predictions_train = predictions_train.withColumn("prediction", when(predictions_train["prediction"] < 1, 1)
    .when(predictions_train["prediction"] > 10, 10)
    .otherwise(predictions_train["prediction"]))

predictions_train_cache = predictions_train.cache()
predictions_train_cache.show()

[Stage 56:>                                                         (0 + 1) / 1]

+-------+----------+------------------+------+------------------+
|user_id|anime_id_2|        prediction|rating|   global_baseline|
+-------+----------+------------------+------+------------------+
|      1|     11757|              10.0|  10.0|              10.0|
|      3|        20| 7.879086456162366|   8.0| 7.549534364544735|
|      3|      6702| 8.495952787755837|   8.0| 8.159327057358658|
|      3|     28223|  8.39854623447299|   6.0| 8.121219806713043|
|      5|       225|3.4105293016905227|   1.0|3.4034899806923455|
|      5|      9989| 4.688538711508015|   7.0|5.3123621093571485|
|      5|     17265|  4.23228569166715|   8.0| 4.823897946768306|
|      5|     22319| 3.849632131798173|   2.0| 4.667059180505935|
|      7|       270| 7.486343462901233|   9.0|  7.30386699491074|
|      7|      4654| 7.612370026240543|   9.0| 7.446378287410231|
|      7|     10719| 7.479884012916218|   8.0|7.2816193621249905|
|     10|     11757| 9.517858659055848|   9.0| 9.662729425913202|
|     11| 

                                                                                

In [16]:
predictions_train_cache.count() # showing number of predictions in the training set

                                                                                

1071364

In [17]:
rmse_content_train = evaluator.evaluate(predictions_train_cache)
print(rmse_content_train) 

1.1873520186514084


In [18]:
# could consider dropDuplicates for the following df also 
test_df_renamed = test_df_pared_cache.select("user_id","anime_id","rating","global_baseline").withColumnRenamed("anime_id", "anime_id_2").distinct()  # but test global baseline is pulled back in, which was developed also from the training data
predictions_test = predictions_cache.join(test_df_renamed, on = ["user_id","anime_id_2"], how = "inner")\
    .withColumn("prediction", coalesce("prediction",lit(0)) + col("global_baseline"))\
    .withColumn("rating",col("rating") + col("global_baseline"))

predictions_test = predictions_test.withColumn("prediction", when(predictions_test["prediction"] < 1, 1)
    .when(predictions_test["prediction"] > 10, 10)
    .otherwise(predictions_test["prediction"]))

predictions_test_cache = predictions_test.cache()
predictions_test_cache.show()



+-------+----------+------------------+------+------------------+
|user_id|anime_id_2|        prediction|rating|   global_baseline|
+-------+----------+------------------+------+------------------+
|      5|      2993|3.0839422671610452|   2.0| 3.915270670306181|
|      5|     11759| 3.908415574923569|   3.0| 4.373025211517557|
|      5|     22535| 4.531450756594869|   7.0| 5.222082636485916|
|      7|     11757| 7.817322762801001|   8.0| 7.693708360237862|
|      8|     11757| 8.980118622729023|   9.0| 8.602123365307143|
|     14|      8525| 7.108758054802281|   8.0|  7.43917935497881|
|     14|      9253|  8.66653292086751|   9.0| 8.633943783066679|
|     17|      9756| 7.754851295875365|   9.0|7.5668500619069325|
|     38|     11757|  7.07335638915253|   9.0| 6.992762429213532|
|     39|      2001| 9.792573864380389|  10.0|              10.0|
|     44|       853| 8.178670463518923|   7.0| 8.333820291000606|
|     46|      7054|  8.96847677214752|  10.0| 9.130095625490824|
|     47| 

                                                                                

In [19]:
predictions_test_cache.count()

                                                                                

268387

In [20]:
rmse_content_test = evaluator.evaluate(predictions_test_cache)
print(rmse_content_test)

1.1863549184105326


# Conclusion
At the end of this project, the content based recommender provided marginal improvement over the global baseline.  The RMSE for the full training data for the global recommender was 1.21.  The RMSE for testing data global recommender was 1.23.  The previous assignment had a content based recommender with a training RMSE of 1.18 and a testing RMSE of 1.34.  In comparison, the subsetted data in Google Colab resulted in a content based recommender with a test RMSE of 1.21 and a testing RMSE of 1.24.  The testing RMSE showed a 6% improvement over the previous project’s content based recommender.  With the full user-ratings dataset and the top 100 anime, the training and testing RMSE were both 1.19.  The testing RMSE showed an 11% improvement.  Overall, the content based recommender for the 100 top animes did have the best RMSE test score, but only by hundredths of a rating point.  The lack of diversity from using only 100 anime is likely not worth the incredibly marginal improvement.  

The next steps for improving past the global baseline would be to tune the parameters on the content based recommender.  The top 100 anime recommender used 5000 features - potentially increasing this further would help adjust cosine similarity between anime.  The full anime dataset could be used.  Both of these might necessitate more RAM than 8GiB.  Regarding security, I would also like to figure out how to remove the master node IP from the Spark configuration.  Given that I had attempted an SSH tunnel to prevent exposure of the IP, these efforts were not really successful if it was exposed in the Spark configuration in Jupyter notebook.  

From this project, I found that PySpark (in both Google Colab and AWS) shows incredible utility for scaling data operations as long as it is used properly.  Using too many workers while failing to properly partition data, failing to cache or persist dataframes, and failing to unpersist dataframes can result in poor performance and long runtimes.  As I learned more about PySpark, it did allow me to perform data transformations on a scale that would not be feasible with Pandas.  My next steps for furthering my experience with PySpark would be to develop a better understanding of how to partition data evenly, how to prevent mass shuffling operations between executors/partitions, and how to debug issues by using the tasks in the Spark UI.  
