<https://www.kaggle.com/c/instacart-market-basket-analysis/data>

## 1. Read data

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Pyspark course") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.3")\
    .getOrCreate()

In [2]:
import pyspark.sql.functions as F

In [3]:
import os
import configparser
aws_profile = "myaws"

config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
access_id = config.get(aws_profile, "aws_access_key_id") 
access_key = config.get(aws_profile, "aws_secret_access_key")

In [4]:
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", access_id)
hadoop_conf.set("fs.s3n.awsSecretAccessKey", access_key)

In [5]:
#sdf = spark.read.option("header", "true").csv("s3n://bartek-ml-course/order_products__prior.csv").cache()
prior_sdf = spark.read.option("header", "true").csv("data/order_products__prior.csv").cache()

In [13]:
sdf.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- add_to_cart_order: string (nullable = true)
 |-- reordered: string (nullable = true)



In [14]:
sdf.show()

+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|       2|     33120|                1|        1|
|       2|     28985|                2|        1|
|       2|      9327|                3|        0|
|       2|     45918|                4|        1|
|       2|     30035|                5|        0|
|       2|     17794|                6|        1|
|       2|     40141|                7|        1|
|       2|      1819|                8|        1|
|       2|     43668|                9|        0|
|       3|     33754|                1|        1|
|       3|     24838|                2|        1|
|       3|     17704|                3|        1|
|       3|     21903|                4|        1|
|       3|     17668|                5|        1|
|       3|     46667|                6|        1|
|       3|     17461|                7|        1|
|       3|     32665|                8|        1|


In [15]:
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
schema_order_product = StructType([
    StructField("order_id", IntegerType()),
    StructField("product_id", IntegerType()),
    StructField("add_to_cart_order", IntegerType()),
    StructField("reorderd", IntegerType())
])

In [16]:
#orders_sdf = spark.read.option("header", "true").csv("s3n://bartek-ml-course/orders.csv")\
#    .cache()
orders_sdf = spark.read.option("header", "true").csv("data/orders.csv")\
    .cache()

In [17]:
orders_sdf.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_number: string (nullable = true)
 |-- order_dow: string (nullable = true)
 |-- order_hour_of_day: string (nullable = true)
 |-- days_since_prior_order: string (nullable = true)



## 2. Exploratory analysis

In [18]:
## your code here
sdf.groupBy("order_id", "product_id")\
    .agg(F.count("product_id").alias("times"))\
    .orderBy(F.desc("times"))\
    .show()

+--------+----------+-----+
|order_id|product_id|times|
+--------+----------+-----+
|      18|     17568|    1|
|      30|      1158|    1|
|      54|     24852|    1|
|      71|     39389|    1|
|      97|     34565|    1|
|     117|     20119|    1|
|     147|      1360|    1|
|     183|      4305|    1|
|     234|     19019|    1|
|     254|     47521|    1|
|     265|     20431|    1|
|     289|     12341|    1|
|     289|     48775|    1|
|     308|     18027|    1|
|     315|     42327|    1|
|     315|     21511|    1|
|     328|      5818|    1|
|     329|     42736|    1|
|     336|     25146|    1|
|     338|     38905|    1|
+--------+----------+-----+
only showing top 20 rows



In [32]:
product_count_df = prior_sdf\
    .select('product_id', 'order_id')\
    .join(orders_sdf.select('order_id', 'user_id'), 'order_id', how='left')\
    .select('user_id', 'product_id')\
    .groupBy('user_id', 'product_id')\
    .agg(F.count('product_id').alias('product_count'))

user_most_popular_item = product_count_df\
    .groupBy('user_id')\
    .agg(F.max('product_count').alias('product_count'))\
    .orderBy('user_id').cache()

user_most_popular_item\
    .join(product_count_df, how='left', on=['user_id','product_count'])\
    .orderBy('user_id')\
    .show()

+-------+-------------+----------+
|user_id|product_count|product_id|
+-------+-------------+----------+
|      1|           10|       196|
|      1|           10|     12427|
|     10|            4|     28535|
|     10|            4|     47526|
|     10|            4|     16797|
|     10|            4|     30489|
|     10|            4|     46979|
|    100|            3|     27344|
|    100|            3|     21616|
|   1000|            7|     26165|
|   1000|            7|     30492|
|   1000|            7|     14870|
|   1000|            7|     28465|
|   1000|            7|     49683|
|  10000|           44|     21137|
| 100000|            5|     16797|
| 100000|            5|     10151|
| 100000|            5|     19348|
| 100000|            5|      3318|
| 100001|           41|     21137|
+-------+-------------+----------+
only showing top 20 rows



## Second solution

In [29]:
from pyspark.sql import Window
from pyspark.sql.functions import rank, col

window = Window.partitionBy("user_id").orderBy(F.desc("count_product"))

prior_sdf\
    .select("order_id", "product_id")\
    .join(orders_sdf, "order_id", how="left")\
    .select("order_id", "product_id", "user_id")\
    .groupBy("user_id", "product_id")\
    .agg(F.count(F.col("product_id")).alias("count_product"))\
    .select('*', rank().over(window).alias("user_top_item"))\
    .filter(col("user_top_item") == 1)\
    .orderBy("user_id")\
    .show()

+-------+----------+-------------+-------------+
|user_id|product_id|count_product|user_top_item|
+-------+----------+-------------+-------------+
|      1|       196|           10|            1|
|      1|     12427|           10|            1|
|     10|     16797|            4|            1|
|     10|     47526|            4|            1|
|     10|     30489|            4|            1|
|     10|     46979|            4|            1|
|     10|     28535|            4|            1|
|    100|     21616|            3|            1|
|    100|     27344|            3|            1|
|   1000|     26165|            7|            1|
|   1000|     14870|            7|            1|
|   1000|     49683|            7|            1|
|   1000|     30492|            7|            1|
|   1000|     28465|            7|            1|
|  10000|     21137|           44|            1|
| 100000|     19348|            5|            1|
| 100000|     16797|            5|            1|
| 100000|      3318|

## Tercer

In [36]:
product_count_df = prior_sdf\
    .select('product_id', 'order_id')\
    .join(orders_sdf.select('order_id', 'user_id'), 'order_id', how='left')\
    .select('user_id', 'product_id')\
    .groupBy('user_id', 'product_id')\
    .agg(F.count('product_id').alias('product_count'))\
    .select(
        F.col("user_id"), 
        F.struct("product_count", "product_id").alias("count_product"))\
    .groupBy("user_id")\
    .agg(F.max("count_product"))\
    .orderBy("user_id")


In [37]:
product_count_df.show()

+-------+------------------+
|user_id|max(count_product)|
+-------+------------------+
|      1|         [10, 196]|
|     10|        [4, 47526]|
|    100|        [3, 27344]|
|   1000|        [7, 49683]|
|  10000|       [44, 21137]|
| 100000|         [5, 3318]|
| 100001|       [41, 21137]|
| 100002|       [10, 26172]|
| 100003|         [2, 9214]|
| 100004|        [7, 19660]|
| 100005|       [10, 42413]|
| 100006|        [4, 41290]|
| 100007|        [6, 33452]|
| 100008|       [11, 42972]|
| 100009|        [8, 24852]|
|  10001|       [11, 16398]|
| 100010|        [7, 41771]|
| 100011|        [7, 47766]|
| 100012|       [24, 41148]|
| 100013|       [15, 47792]|
+-------+------------------+
only showing top 20 rows



## 3. Create user-item list

In [41]:
from pyspark.sql import Window
def id2idx(id_sdf, id_col, idx_col):
    id_window = Window().orderBy(id_col)
    return id_sdf.withColumn(idx_col, F.rank().over(id_window))

In [58]:
class Labelizer:
    
    def __init__(self, all_data=None, id_col=None, idx_col=None):
        self.all_data = all_data
        self.id_col = id_col
        self.idx_col = idx_col
        self.id_idx = None
        
    def get_id_idx(self):
        self.id_idx = id2idx(self.all_data, self.id_col, self.idx_col)
        return self.id_idx
    
    def save(self, file_name):
        self.id_idx.write.mode("overwrite").parquet(file_name)
    
    def load(self, file_name):
        self.id_idx = spark.read.parquet(file_name)
    
    @classmethod
    def create_from_saved(cls, file_name, id_col=None, idx_col=None):
        labelizer = cls(all_data=None, id_col=id_col, idx_col=idx_col)
        labelizer.load(file_name)
        return labelizer

In [65]:
from als_labelizer import Labelizer

ImportError: cannot import name 'Labelizer'

In [59]:
all_users = orders_sdf.select("user_id").distinct()
users_sample, _ = all_users.randomSplit([0.015, 0.985], seed=666)
all_product = sdf.select("product_id").distinct()

In [60]:
user_labelizer = Labelizer(users_sample, "user_id", "user_idx")
user_id_idx = user_labelizer.get_id_idx()

In [61]:
user_labelizer_loaded = Labelizer(None, "user_id", "user_idx")
user_labelizer_loaded.load("data/user_id_idx.parquet")

In [62]:
user_labelizer_loaded.id_idx.show()

+-------+--------+
|user_id|user_idx|
+-------+--------+
| 100032|       1|
|  10012|       2|
| 100136|       3|
| 100281|       4|
| 100378|       5|
| 100383|       6|
| 100473|       7|
| 100543|       8|
| 100649|       9|
| 100716|      10|
| 100734|      11|
| 100736|      12|
| 100833|      13|
| 100898|      14|
| 100934|      15|
| 100950|      16|
| 100951|      17|
| 101001|      18|
| 101024|      19|
|  10104|      20|
+-------+--------+
only showing top 20 rows



In [None]:
new_user_labelizer = Labeli

In [43]:
all_users = orders_sdf.select("user_id").distinct()
users_sample, _ = all_users.randomSplit([0.015, 0.985], seed=666)
all_product = sdf.select("product_id").distinct()

In [40]:
users_sample.count()

3149

In [18]:
user_id_idx = id2idx(users_sample, "user_id", "user_idx")
product_id_idx = id2idx(all_product, "product_id", "product_idx")

In [19]:
## your code here
user_item_sdf = sdf.join(orders_sdf, on="order_id")\
    .select(F.col("user_id"), F.col("product_id"), F.lit(1).alias("rating"))\
    .join(user_id_idx, "user_id")\
    .join(product_id_idx, "product_id")\
    .cache()

## 4. Create ALS model

In [70]:
from pyspark.ml.recommendation import ALS
als = ALS(
    rank=5, maxIter=20, 
    userCol="user_idx", itemCol="product_idx", 
    seed=666, implicitPrefs=True)

In [71]:
als_model = als.fit(user_item_sdf)

## 5. Validate results

In [88]:
rec5 = als_model.recommendForAllUsers(10)

In [89]:
rec5.printSchema()

root
 |-- user_idx: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [91]:
rec5ids = rec5.join(user_id_idx, "user_idx")\
    .withColumn("recommendation", F.explode("recommendations"))\
    .select("user_id", "recommendation.product_idx")\
    .join(product_id_idx, "product_idx")\
    .drop("product_idx").cache()

In [92]:
order_product_train_sdf = spark\
    .read.option("header", "true")\
    .csv("data/order_products__train.csv").cache()
user_item_train_sdf = order_product_train.join(orders_sdf, "order_id")\
    .select(F.col("user_id"), F.col("product_id"), F.lit(1).alias("last_buy")).cache()

In [93]:
rec5ids_val = rec5ids.join(user_item_train_sdf, on=["user_id", "product_id"], how="left")

In [94]:
precision_df = rec5ids_val.groupBy("last_buy").count().toPandas()

In [95]:
precision_df['count']/sum(precision_df['count'])

0    0.948555
1    0.051445
Name: count, dtype: float64

## 6. Non colaborative algorithm

In [96]:
als_model.itemFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.010944766, -0...|
| 20|[-0.0060177636, 7...|
| 40|[-0.2809162, 0.41...|
| 60|[-0.0019884966, 0...|
| 70|[-0.09843452, 0.0...|
| 80|[-0.015406999, 0....|
|100|[5.365436E-4, 0.0...|
|110|[0.055715587, -2....|
|170|[-0.0025643965, -...|
|180|[-0.016432649, 0....|
|190|[-0.011314751, 0....|
|200|[0.0105908085, 0....|
|240|[-0.032621145, 0....|
|250|[-0.04655511, 0.0...|
|290|[0.084733084, -0....|
|380|[-0.16577977, 0.4...|
|420|[-0.0034182696, 0...|
|430|[0.0034633707, -0...|
|440|[7.157677E-4, -8....|
|450|[0.024876615, 0.0...|
+---+--------------------+
only showing top 20 rows

