# ALS

## Create Spark Session

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .master('local[*]')\
        .appName('Homework9Part1')\
        .config('spark.driver.maxResultSize', '10g')\
        .config('spark.executor.memory' ,'10g')\
        .config('spark.driver.memory', '10g')\
        .getOrCreate()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

## Import the train and test dataset
* Train: `trainIdx2_matrix.txt`
* Test: `testTrack_hierarchy.txt`

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

train_schema = StructType([
    StructField('userId', IntegerType()),
    StructField('itemId', IntegerType()),
    StructField('rating', DoubleType())
])

train = spark.read.csv('RawData/trainIdx2_matrix.txt',
                       sep='|',
                       header=False,
                       schema=train_schema
                      )

train.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- itemId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [3]:
train.show(5)

+------+------+------+
|userId|itemId|rating|
+------+------+------+
|199808|248969|  90.0|
|199808|  2663|  90.0|
|199808| 28341|  90.0|
|199808| 42563|  90.0|
|199808| 59092|  90.0|
+------+------+------+
only showing top 5 rows



In [4]:
test_schema = StructType([
    StructField('userId', IntegerType()),
    StructField('trackId', IntegerType()),
    StructField('albumId', IntegerType()),
    StructField('artistId', IntegerType()),
    StructField('genreId_1', IntegerType()),
    StructField('genreId_2', IntegerType()),
    StructField('genreId_3', IntegerType()),
    StructField('genreId_4', IntegerType()),
    StructField('genreId_5', IntegerType()),
    StructField('genreId_6', IntegerType()),
    StructField('genreId_7', IntegerType()),
    StructField('genreId_8', IntegerType()),
    StructField('genreId_9', IntegerType()),
    StructField('genreId_10', IntegerType()),
    StructField('genreId_11', IntegerType()),
    StructField('genreId_12', IntegerType()),
    StructField('genreId_13', IntegerType()),
    StructField('genreId_14', IntegerType()),
    StructField('genreId_15', IntegerType()),
    StructField('genreId_16', IntegerType()),
    StructField('genreId_17', IntegerType()),
    StructField('genreId_18', IntegerType()),
    StructField('genreId_19', IntegerType()),
    StructField('genreId_20', IntegerType()),
    StructField('genreId_21', IntegerType()),
])

test = spark.read.csv('RawData/testTrack_hierarchy.txt',
                     sep='|',
                     nullValue='None',
                     header=False,
                     schema=test_schema)

test.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- trackId: integer (nullable = true)
 |-- albumId: integer (nullable = true)
 |-- artistId: integer (nullable = true)
 |-- genreId_1: integer (nullable = true)
 |-- genreId_2: integer (nullable = true)
 |-- genreId_3: integer (nullable = true)
 |-- genreId_4: integer (nullable = true)
 |-- genreId_5: integer (nullable = true)
 |-- genreId_6: integer (nullable = true)
 |-- genreId_7: integer (nullable = true)
 |-- genreId_8: integer (nullable = true)
 |-- genreId_9: integer (nullable = true)
 |-- genreId_10: integer (nullable = true)
 |-- genreId_11: integer (nullable = true)
 |-- genreId_12: integer (nullable = true)
 |-- genreId_13: integer (nullable = true)
 |-- genreId_14: integer (nullable = true)
 |-- genreId_15: integer (nullable = true)
 |-- genreId_16: integer (nullable = true)
 |-- genreId_17: integer (nullable = true)
 |-- genreId_18: integer (nullable = true)
 |-- genreId_19: integer (nullable = true)
 |-- genreId_20: integer (n

In [5]:
test.select('userId', 'trackId', 'artistId', 'genreId_1', 'genreId_2', 'genreId_3', 'genreId_4', 'genreId_5').show(5)

+------+-------+--------+---------+---------+---------+---------+---------+
|userId|trackId|artistId|genreId_1|genreId_2|genreId_3|genreId_4|genreId_5|
+------+-------+--------+---------+---------+---------+---------+---------+
|199810| 208019|    null|     null|     null|     null|     null|     null|
|199810|  74139|  271146|   113360|   173467|   173655|   192976|   146792|
|199810|   9903|    null|    33722|   123396|    79926|    73523|     null|
|199810| 242681|  244574|    61215|    17453|   274088|     null|     null|
|199810|  18515|   33168|    19913|    48505|   154024|     null|     null|
+------+-------+--------+---------+---------+---------+---------+---------+
only showing top 5 rows



## Only include the userIds that exist in the test dataset within the train dataset for the ALS model

### Get unique users

In [6]:
test_unique_users = test.select('userId').distinct().coalesce(1)

test_unique_users.show(5)
print('The number of unique users: ', test_unique_users.count())

+------+
|userId|
+------+
|199855|
|199976|
|200166|
|200625|
|200878|
+------+
only showing top 5 rows

The number of unique users:  20000


### Filter the trainset to only include userIds that are included within the test dataset

In [7]:
from pyspark.sql.functions import col

train = train.toPandas()

train = train[train.userId.isin(test_unique_users.toPandas().userId)]

train = spark.createDataFrame(train).repartition('userId')

In [8]:
train.printSchema()

root
 |-- userId: long (nullable = true)
 |-- itemId: long (nullable = true)
 |-- rating: double (nullable = true)



In [9]:
train.show(5)

+------+------+------+
|userId|itemId|rating|
+------+------+------+
|200309| 88934| 100.0|
|200309|293425| 100.0|
|200309| 69014|  10.0|
|200309| 28342|  80.0|
|200309| 28964|  80.0|
+------+------+------+
only showing top 5 rows



## Train data summary statistics

In [10]:
train.describe().show()

+-------+------------------+-----------------+------------------+
|summary|            userId|           itemId|            rating|
+-------+------------------+-----------------+------------------+
|  count|          10643437|         10643437|          10643437|
|   mean|224380.43626321084|149126.9231043506|47.600189769526516|
| stddev|14393.139199046373| 85467.7984951211|37.996779529988316|
|    min|            199810|                0|               0.0|
|    max|            249010|           296110|             100.0|
+-------+------------------+-----------------+------------------+



## Build ALS Model

In [11]:
from pyspark.ml.recommendation import ALS

als = ALS(userCol='userId', 
          itemCol='itemId',
          ratingCol='rating', 
          rank=5,
          maxIter= 5,
          regParam=0.01,
          nonnegative = True, 
          implicitPrefs = False)

### Train the best model

In [12]:
model = als.fit(train)

In [13]:
train_results = model.transform(train)

In [14]:
train_results.show(5)

+------+------+------+----------+
|userId|itemId|rating|prediction|
+------+------+------+----------+
|205890|   148|  90.0| 61.070606|
|216277|   148|  90.0| 71.845634|
|241707|   148|  60.0|    70.007|
|226963|   148|  90.0|   76.1273|
|206707|   148|  70.0|  62.11915|
+------+------+------+----------+
only showing top 5 rows



### Training summary

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

print('RMSE: ', evaluator.evaluate(train_results))

RMSE:  28.15705746813551


In [16]:
train_results.select('rating').describe().show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|          10643437|
|   mean|47.600189769526516|
| stddev|  37.9967795299883|
|    min|               0.0|
|    max|             100.0|
+-------+------------------+



## Make predictions for track, album, and artist on the test data

### Check for null values within `trackId`, `albumId`, and `artistId` columns

In [17]:
print('Number of null trackId: ', test.filter('trackId IS NULL').count())
print('Number of null albumId: ', test.filter('albumId IS NULL').count())
print('Number of null artistId: ', test.filter('artistId IS NULL').count())

Number of null trackId:  0
Number of null albumId:  8572
Number of null artistId:  10891


### Make predictions on track ratings

In [18]:
prediction_track = model.setItemCol('trackId').transform(test)

### Make predictions on album ratings

In [19]:
prediction_album = model.setItemCol('albumId').transform(test.filter('albumId IS NOT NULL'))

### Make predictions on artist ratings 

In [20]:
prediction_artist = model.setItemCol('artistId').transform(test.filter('artistId IS NOT NULL'))

## Add all ratings to the same dataframe, `predictions`

In [21]:
from pyspark.sql.types import IntegerType

predictions = test.select('userId', 'trackId')

predictions = predictions.join(prediction_track.select('userId', 'trackId', 'prediction'), ['userId', 'trackId'], 'left')
predictions = predictions.withColumn('prediction', predictions['prediction'].cast(IntegerType()))\
                         .withColumnRenamed('prediction', 'track_rating')

In [22]:
predictions = predictions.join(prediction_album.select('userId', 'trackId', 'prediction'), ['userId', 'trackId'], 'left')

predictions = predictions.withColumn('prediction', predictions['prediction'].cast(IntegerType()))\
                         .withColumnRenamed('prediction', 'album_rating')

In [23]:
predictions = predictions.join(prediction_artist.select('userId', 'trackId', 'prediction'), ['userId', 'trackId'], 'left')

predictions = predictions.withColumn('prediction', predictions['prediction'].cast(IntegerType()))\
                         .withColumnRenamed('prediction', 'artist_rating')

In [24]:
predictions.show(5)

+------+-------+------------+------------+-------------+
|userId|trackId|track_rating|album_rating|artist_rating|
+------+-------+------------+------------+-------------+
|200072|  29894|          75|          73|           76|
|200124| 162126|           9|          10|           11|
|200174| 137908|          25|          40|           35|
|200400| 263168|          71|         100|           67|
|200427|  82634|          10|          85|          116|
+------+-------+------------+------------+-------------+
only showing top 5 rows



### Check for null values within the `track_rating`, `album_rating`, and `artist_rating`

In [25]:
print('Number of null track_rating: ', predictions.filter('track_rating IS NULL').count())
print('Number of null album_rating: ', predictions.filter('album_rating IS NULL').count())
print('Number of null artist_rating: ', predictions.filter('artist_rating IS NULL').count())

Number of null track_rating:  0
Number of null album_rating:  8572
Number of null artist_rating:  10891


### Replace null values within `album_rating` and `artist_rating` with `0`

In [26]:
predictions = predictions.na.fill(0)

## Count the number of genres per userId-trackId pair

In [27]:
test_genres = test.select('userId', 
                          'trackId', 
                          'genreId_1', 
                          'genreId_2', 
                          'genreId_3', 
                          'genreId_4', 
                          'genreId_5', 
                          'genreId_6', 
                          'genreId_7', 
                          'genreId_8', 
                          'genreId_9', 
                          'genreId_10', 
                          'genreId_11', 
                          'genreId_12', 
                          'genreId_13', 
                          'genreId_14',
                          'genreId_15',
                          'genreId_16',
                          'genreId_17',
                          'genreId_18',
                          'genreId_19',
                          'genreId_20',
                          'genreId_21')

In [28]:
from pyspark.sql.functions import isnull

num_genres = test_genres.select('userId', 'trackId', sum([isnull(test_genres[col]).cast(IntegerType()) for col in test_genres.columns]).alias('num_genres'))

In [29]:
num_genres.show(5)

+------+-------+----------+
|userId|trackId|num_genres|
+------+-------+----------+
|199810| 208019|        21|
|199810|  74139|        14|
|199810|   9903|        17|
|199810| 242681|        18|
|199810|  18515|        18|
+------+-------+----------+
only showing top 5 rows



### Add the number of genres into `predictions` dataframe, 

In [30]:
predictions = predictions.join(num_genres, ['userId', 'trackId'], 'left')

In [31]:
predictions.show(5)

+------+-------+------------+------------+-------------+----------+
|userId|trackId|track_rating|album_rating|artist_rating|num_genres|
+------+-------+------------+------------+-------------+----------+
|200072|  29894|          75|          73|           76|        16|
|200124| 162126|           9|          10|           11|        18|
|200174| 137908|          25|          40|           35|        16|
|200400| 263168|          71|         100|           67|        20|
|200427|  82634|          10|          85|          116|        18|
+------+-------+------------+------------+-------------+----------+
only showing top 5 rows



## Write `predictions` to csv

In [32]:
predictions.coalesce(1).write.csv('Data/ratings.csv', header=True)