In [9]:
# import libraries
import time
import os
import pandas as pd
import numpy as np
from pyspark import SparkContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F
from pyspark.sql.functions import explode
import matplotlib.pyplot as plt

# Start up spark cluster

In [10]:
appName="Collaborative Filtering with PySpark"
# initialize the spark session
spark = SparkSession.builder\
    .appName(appName)\
    .config('spark.driver.memory', '15g')\
    .getOrCreate()
# get sparkcontext from the sparksession
sc = spark.sparkContext

# Load in Data

In [11]:
def load_dataset(path):
    #define schema
    schema = StructType([
        StructField("item", StringType(), True),
        StructField("user", StringType(), True),
        StructField("rating", StringType(), True),
        StructField("timestamp", IntegerType(), True)])

    df = spark.read.csv(path ,header=False,schema=schema)
    df = df.withColumn("rating", df["rating"].cast(IntegerType()))
    
    #provide index values for item and user to convert them into integers
    stringIndexer = StringIndexer(inputCols=["item","user"], outputCols=["itemIndex","userIndex"])
    model = stringIndexer.fit(df)
    df = model.transform(df)
    
    #df = df.withColumn("userIndex", df["userIndex"].cast(IntegerType()))
    
    return df

# Define ALS object

In [12]:
als = ALS(maxIter=15, 
          implicitPrefs=False,
          regParam=0.25,
          userCol="userIndex", 
          itemCol="itemIndex", 
          ratingCol="rating",
          coldStartStrategy="drop")

## Load in data and train model

In [8]:
fname = r'Software.csv'
#fname = r'Books.csv'
df = load_dataset(r'./ratings_data/' + fname)
test_fraction = .35
training, test = df.randomSplit([1-test_fraction, test_fraction])

model = als.fit(training)

In [13]:
item_df = df.groupby("itemIndex")\
   .agg(F.min("item"))\
   .withColumnRenamed('min(item)', 'item')


item_df = item_df.withColumn("itemIndex", item_df["itemIndex"].cast(IntegerType()))

In [16]:
meta_df = pd.read_csv(f'./meta_data/meta_{fname}').rename(columns={'asin':'item'})

In [17]:
# display top n recommended artists for a user
def recommendedItems(userIndex, n):
    test = model.recommendForAllUsers(n)\
        .filter(col('userIndex')==userIndex)\
        .select(["recommendations.itemIndex","recommendations.rating"])\
        .withColumn("rec_exp", explode("itemIndex"))\
        .select(col('rec_exp'))\
        .withColumnRenamed('rec_exp', 'itemIndex')
        
    return test

# recs = recommendedItems(9386, n=6)
# recs.show()

In [18]:
def print_recs(recs_df):
    print('Recommendations:')
    print('-'*16, end='\n\n')
    for i, row in recs_df.iterrows():
        print(f' {i+1}. {row.title}')
        print(' '*5, 'https://www.amazon.com/dp/{}'.format(row['item']))


## Print out n recommendations for user

In [19]:
def get_recommendation(userID, n=6):
    recs = recommendedItems(9386, n=6)
    nrecommendations_df = recs.join(item_df, on='itemIndex').toPandas()
    recs_df = nrecommendations_df.merge(meta_df)
    print_recs(recs_df)
    
get_recommendation(9386)

Recommendations:
----------------

 1. AVG Internet Security 2015 + AVG Pc Tuneup&reg; 1year 1pc 1user Download
      https://www.amazon.com/dp/B00T1XOTKE
 2. Dantz Retrospect 7 Small Business Server Standard
      https://www.amazon.com/dp/B0007KI7IK
 3. Ubuntu 8.04 [OLD VERSION]
      https://www.amazon.com/dp/B0019KKM4O
 4. Symantec Norton AntiVirus 2009 CD 1-User
      https://www.amazon.com/dp/B0026NR4NU
 5. Oscar the Balloonist Dives into the Lake
      https://www.amazon.com/dp/B00005U2QU
 6. PhotomatixPro 3.1
      https://www.amazon.com/dp/3772396585


### Checking to see what items user rated

In [20]:
def topLikes(dataframe, userIndex, n):
    df = dataframe.filter(dataframe.userIndex==userIndex)\
        .sort(dataframe.rating.desc())\
        .select(dataframe.userIndex,dataframe.itemIndex,dataframe.rating)\
        .limit(n)
    return df
# display top liked items for a user
topLikes(df, 9386, 10).show(truncate=False)

+---------+---------+------+
|userIndex|itemIndex|rating|
+---------+---------+------+
|9386.0   |1879.0   |5     |
|9386.0   |3685.0   |5     |
|9386.0   |4415.0   |3     |
+---------+---------+------+

