# Project Update
#### Will Jarrard (wej5ar) Abhi Dommalapati (ad4bu), Sebastian Ranasinghe (sar2jf)

## Setup Environment and clean data

In [1]:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession

# Build spark session
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "150g") \
    .appName('my-cool-app') \
    .getOrCreate()

spark.catalog.clearCache()

### Read in Data

In [2]:
transactions_train = spark.read.csv('/project/ds5559/h_and_m/transactions_train.csv',  inferSchema=True, header = True)
transactions_train.printSchema()

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- sales_channel_id: integer (nullable = true)



## Clean Data

#### Seperate out only 2020 because we get a memory error if we do more

In [3]:
from pyspark.sql.functions import from_unixtime, unix_timestamp, year, month, col, date_format

# Need to make it a string so we can turn it into unix timestamp
transactions_train =  transactions_train.withColumn('t_dat', transactions_train['t_dat'].cast('string'))

# Used https://stackoverflow.com/questions/53285032/how-do-i-convert-timestamp-to-unix-format-with-pyspark
# and https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.from_unixtime.html
transactions_train = transactions_train.withColumn('date', from_unixtime(unix_timestamp('t_dat', 'yyyy-MM-dd')))

# This is to get the date then year so that we only use the year 2020
transactions_train = transactions_train.withColumn('year', year(col('date')))
transactions_train = transactions_train[transactions_train['year'] == 2020]

# Get count by transaction so we can use it for ALS modeling
transactions_train = transactions_train.groupby('customer_id', 'article_id').count()
transactions_train.show(5)

+--------------------+----------+-----+
|         customer_id|article_id|count|
+--------------------+----------+-----+
|00f5ce6142a289516...| 796210010|    2|
|01da48c6794598377...| 621073001|    1|
|0871a5a2f27641068...| 784727001|    1|
|109defd99fce9bfa0...| 803986002|    1|
|1596f86f7ae4b8977...| 654590002|    1|
+--------------------+----------+-----+
only showing top 5 rows



#  Model

### Need to fit string indexer to all data first

In [4]:
from pyspark.ml.feature import StringIndexer
cols = ['customer_id', 'article_id']
indexer = StringIndexer(inputCols=cols, outputCols=[x + "_index" for x in cols]) 
transactions_train = indexer.fit(transactions_train).transform(transactions_train)
(training,test) = transactions_train.randomSplit([0.8, 0.2])

### Train Implicit Model 
[see difference between implicit and explicit here](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html#explicit-vs-implicit-feedback)

In [5]:
# Model
from pyspark.ml.recommendation import ALS

# See https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.recommendation.ALS.html
# regparam which is 0.1
als=ALS(userCol="customer_id_index", 
        itemCol="article_id_index", 
        ratingCol="count",
        coldStartStrategy="drop", 
        nonnegative=True,
        implicitPrefs=True
       )

model = als.fit(training)

### Make predictions

In [6]:
predictions=model.transform(test)
predictions.show(10)

+--------------------+----------+-----+-----------------+----------------+------------+
|         customer_id|article_id|count|customer_id_index|article_id_index|  prediction|
+--------------------+----------+-----+-----------------+----------------+------------+
|69fbf08f0866cbc4a...| 817353008|    1|          15447.0|           148.0| 0.018701833|
|ac982e99e61de5442...| 817353008|    1|          48510.0|           148.0| 0.037698228|
|11e370068454ac91a...| 817353008|    1|         344726.0|           148.0| 0.013734466|
|d170614cb34c05e11...| 817353008|    1|          48942.0|           148.0| 0.030476555|
|23d9bd604e0c7d750...| 817353008|    2|           7066.0|           148.0| 0.043810364|
|5c2eb4acf551af4ec...| 817353008|    1|         190874.0|           148.0|  0.01097815|
|6e7605011fee0ea4a...| 817353008|    1|         192005.0|           148.0|0.0109603545|
|a5ae79e492de1b85d...| 817353008|    1|          14874.0|           148.0|  0.06541093|
|a6092d5bda31e7eb6...| 817353008

### Evaluate using rmse

In [7]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator=RegressionEvaluator(metricName="rmse",labelCol="count",predictionCol="prediction")
rmse=evaluator.evaluate(predictions)
print(rmse)

1.2453698424117083


### Make recommendations based off of items

In [8]:
item_recs = model.recommendForAllItems(10)
item_recs.show(5)

+----------------+--------------------+
|article_id_index|     recommendations|
+----------------+--------------------+
|             148|[{168, 0.25538644...|
|             463|[{239446, 0.23014...|
|             471|[{297, 0.10508741...|
|             496|[{7231, 0.0722737...|
|             833|[{168, 0.06621577...|
+----------------+--------------------+
only showing top 5 rows



### Make recommendations based off of users

In [9]:
user_recs = model.recommendForAllUsers(12)
user_recs.show(5, False)

+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|customer_id_index|recommendations                                                                                                                                                                                                        |
+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|148              |[{20, 0.20783934}, {25, 0.19548015}, {18, 0.185775}, {30, 0.1771148}, {3, 0.1765406}, {31, 0.15613519}, {11, 0.15046923}, {70, 0.1366841}, {14, 0.13656622}, {7, 0.1363846}, {74, 0.12996837}, {110, 0.12833914}]      |
|463              |[{20, 0.45331547}, {25, 0.4249837}, {

### Reformat to Calculate Mean Average Precision 

In [12]:
transactions_train.show(5)

+--------------------+----------+-----+-----------------+----------------+
|         customer_id|article_id|count|customer_id_index|article_id_index|
+--------------------+----------+-----+-----------------+----------------+
|00f5ce6142a289516...| 796210010|    2|          67509.0|           175.0|
|01da48c6794598377...| 621073001|    1|         284500.0|          4769.0|
|0871a5a2f27641068...| 784727001|    1|         343407.0|         11369.0|
|109defd99fce9bfa0...| 803986002|    1|         671375.0|          1945.0|
|1596f86f7ae4b8977...| 654590002|    1|          26536.0|         12448.0|
+--------------------+----------+-----+-----------------+----------------+
only showing top 5 rows



In [13]:
import pyspark.sql.functions as f

grouped_trans = transactions_train.groupBy("customer_id").agg(
    f.first("customer_id_index").alias("customer_id_index"), 
    f.concat_ws(",", f.collect_list("article_id_index")).alias("labels"))

In [14]:
joined = grouped_trans.join(user_recs, grouped_trans.customer_id_index == user_recs.customer_id_index).drop(grouped_trans.customer_id_index)

In [15]:
joined.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- labels: string (nullable = false)
 |-- customer_id_index: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- article_id_index: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [16]:
joined.show(1)

+--------------------+--------------------+-----------------+--------------------+
|         customer_id|              labels|customer_id_index|     recommendations|
+--------------------+--------------------+-----------------+--------------------+
|27a246bb259207e7b...|5103.0,153.0,2730...|              299|[{20, 0.6097708},...|
+--------------------+--------------------+-----------------+--------------------+
only showing top 1 row



In [17]:
pred_labels = joined.select("recommendations.article_id_index", "labels").rdd
pred_labels = pred_labels.map(lambda x: (x[0], [int(float(s)) for s in x[1].split(",")]))

In [18]:
from pyspark.mllib.evaluation import RankingMetrics
metrics = RankingMetrics(pred_labels)

In [19]:
metrics.meanAveragePrecision

0.022193131397492634