<a href="https://www.kaggle.com/code/michaeltkuo/h-m-kaggle-competition-initial-practice?scriptVersionId=94271913" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# H&M RECOMMENDATION SYSTEM

In [1]:
# Install Pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
     |████████████████████████████████| 281.4 MB 33 kB/s              
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
     |████████████████████████████████| 198 kB 29.9 MB/s            
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - done
[?25h  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=8765ccd82567d3a587e3aeca7a7a035c45ef52642718eeb80ebcf9166b933a34
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pysp

In [2]:
# import packages

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
from pyspark.sql import SQLContext 
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf,col,when
from pyspark.sql.functions import to_timestamp,date_format
from pyspark.sql.functions import weekofyear
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

In [3]:
sc = SparkSession.builder \
    .appName("Recommendations") \
    .config("spark.sql.files.maxPartitionBytes", 5000000) \
    .getOrCreate()

spark = SparkSession(sc)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/28 20:42:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
from pyspark.sql import Row
from pyspark.sql.types import *

schema = StructType([
    StructField("t_dat", DateType()),
    StructField("customer_id", StringType()),
    StructField("article_id", IntegerType()),
    StructField("price", DoubleType()),
    StructField("sales_channel_id", IntegerType())
])

dataset = spark.read.option("header", True) \
    .csv("../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv",
        schema = schema)

In [5]:
dataset.show(5)

                                                                                

+----------+--------------------+----------+--------------------+----------------+
|     t_dat|         customer_id|article_id|               price|sales_channel_id|
+----------+--------------------+----------+--------------------+----------------+
|2018-09-20|000058a12d5b43e67...| 663713001|0.050830508474576264|               2|
|2018-09-20|000058a12d5b43e67...| 541518023| 0.03049152542372881|               2|
|2018-09-20|00007d2de826758b6...| 505221004| 0.01523728813559322|               2|
|2018-09-20|00007d2de826758b6...| 685687003|0.016932203389830508|               2|
|2018-09-20|00007d2de826758b6...| 685687004|0.016932203389830508|               2|
+----------+--------------------+----------+--------------------+----------------+
only showing top 5 rows



In [6]:
dataset.printSchema()

root
 |-- t_dat: date (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- sales_channel_id: integer (nullable = true)



In [7]:
from pyspark.sql.functions import min, max
from pyspark.sql.functions import unix_timestamp, lit
min_date, max_date = dataset.select(min("t_dat"), max("t_dat")).first()
min_date, max_date

                                                                                

(datetime.date(2018, 9, 20), datetime.date(2020, 9, 22))

Just something to note here... There seems to be at least two years worth of data. Maybe we could do TopPop on the same week in 2018, 2019 and 2020?

Someone mentioned that we should weigh towards 2020?

In [8]:
# Create Calendar Weeks
dataset = dataset.withColumn('week_of_year',weekofyear(dataset.t_dat))
dataset.show()

+----------+--------------------+----------+--------------------+----------------+------------+
|     t_dat|         customer_id|article_id|               price|sales_channel_id|week_of_year|
+----------+--------------------+----------+--------------------+----------------+------------+
|2018-09-20|000058a12d5b43e67...| 663713001|0.050830508474576264|               2|          38|
|2018-09-20|000058a12d5b43e67...| 541518023| 0.03049152542372881|               2|          38|
|2018-09-20|00007d2de826758b6...| 505221004| 0.01523728813559322|               2|          38|
|2018-09-20|00007d2de826758b6...| 685687003|0.016932203389830508|               2|          38|
|2018-09-20|00007d2de826758b6...| 685687004|0.016932203389830508|               2|          38|
|2018-09-20|00007d2de826758b6...| 685687001|0.016932203389830508|               2|          38|
|2018-09-20|00007d2de826758b6...| 505221001|0.020322033898305086|               2|          38|
|2018-09-20|00083cda041544b2f...| 688873

In [9]:
# Select CW 38 (Arbitrary at this point)
from pyspark.sql.functions import col
from pyspark.sql import functions as F

recommend = dataset \
    .filter((F.col('week_of_year') == F.lit('38'))) \
    .groupby('article_id').count()

recommend = recommend \
    .withColumn('count', col('count')/3) \
    .sort("count", ascending=False)

In [10]:
recommend.show(12)



+----------+------------------+
|article_id|             count|
+----------+------------------+
| 685687004|             364.0|
| 706016001| 346.3333333333333|
| 673677002| 344.6666666666667|
| 924243001| 329.6666666666667|
| 685687001|             328.0|
| 685687003| 302.6666666666667|
| 751471001| 259.3333333333333|
| 779863001|             249.0|
| 448509014|243.66666666666666|
| 796248001|240.33333333333334|
| 706016002|236.66666666666666|
| 923758001|230.33333333333334|
+----------+------------------+
only showing top 12 rows



                                                                                

In [11]:
recommend_df = recommend.drop(col('count')) \
    .limit(12) \
    .toPandas()

                                                                                

In [12]:
recommend_df

Unnamed: 0,article_id
0,685687004
1,706016001
2,673677002
3,924243001
4,685687001
5,685687003
6,751471001
7,779863001
8,448509014
9,796248001


In [13]:
recommend_array = recommend_df['article_id'] \
    .astype(str) \
    .astype(int) \
    .to_numpy()

In [14]:
customers = pd.read_csv('../input/h-and-m-personalized-fashion-recommendations/customers.csv')

In [15]:
customers

Unnamed: 0,customer_id,FN,Active,club_member_status,fashion_news_frequency,age,postal_code
0,00000dbacae5abe5e23885899a1fa44253a17956c6d1c3...,,,ACTIVE,NONE,49.0,52043ee2162cf5aa7ee79974281641c6f11a68d276429a...
1,0000423b00ade91418cceaf3b26c6af3dd342b51fd051e...,,,ACTIVE,NONE,25.0,2973abc54daa8a5f8ccfe9362140c63247c5eee03f1d93...
2,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,,,ACTIVE,NONE,24.0,64f17e6a330a85798e4998f62d0930d14db8db1c054af6...
3,00005ca1c9ed5f5146b52ac8639a40ca9d57aeff4d1bd2...,,,ACTIVE,NONE,54.0,5d36574f52495e81f019b680c843c443bd343d5ca5b1c2...
4,00006413d8573cd20ed7128e53b7b13819fe5cfc2d801f...,1.0,1.0,ACTIVE,Regularly,52.0,25fa5ddee9aac01b35208d01736e57942317d756b32ddd...
...,...,...,...,...,...,...,...
1371975,ffffbbf78b6eaac697a8a5dfbfd2bfa8113ee5b403e474...,,,ACTIVE,NONE,24.0,7aa399f7e669990daba2d92c577b52237380662f36480b...
1371976,ffffcd5046a6143d29a04fb8c424ce494a76e5cdf4fab5...,,,ACTIVE,NONE,21.0,3f47f1279beb72215f4de557d950e0bfa73789d24acb5e...
1371977,ffffcf35913a0bee60e8741cb2b4e78b8a98ee5ff2e6a1...,1.0,1.0,ACTIVE,Regularly,21.0,4563fc79215672cd6a863f2b4bf56b8f898f2d96ed590e...
1371978,ffffd7744cebcf3aca44ae7049d2a94b87074c3d4ffe38...,1.0,1.0,ACTIVE,Regularly,18.0,8892c18e9bc3dca6aa4000cb8094fc4b51ee8db2ed14d7...


In [16]:
submission = customers[['customer_id']].copy()
submission['y_score'] = submission.apply(lambda x: recommend_array, axis=1)

In [17]:
submission

Unnamed: 0,customer_id,y_score
0,00000dbacae5abe5e23885899a1fa44253a17956c6d1c3...,"[685687004, 706016001, 673677002, 924243001, 6..."
1,0000423b00ade91418cceaf3b26c6af3dd342b51fd051e...,"[685687004, 706016001, 673677002, 924243001, 6..."
2,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,"[685687004, 706016001, 673677002, 924243001, 6..."
3,00005ca1c9ed5f5146b52ac8639a40ca9d57aeff4d1bd2...,"[685687004, 706016001, 673677002, 924243001, 6..."
4,00006413d8573cd20ed7128e53b7b13819fe5cfc2d801f...,"[685687004, 706016001, 673677002, 924243001, 6..."
...,...,...
1371975,ffffbbf78b6eaac697a8a5dfbfd2bfa8113ee5b403e474...,"[685687004, 706016001, 673677002, 924243001, 6..."
1371976,ffffcd5046a6143d29a04fb8c424ce494a76e5cdf4fab5...,"[685687004, 706016001, 673677002, 924243001, 6..."
1371977,ffffcf35913a0bee60e8741cb2b4e78b8a98ee5ff2e6a1...,"[685687004, 706016001, 673677002, 924243001, 6..."
1371978,ffffd7744cebcf3aca44ae7049d2a94b87074c3d4ffe38...,"[685687004, 706016001, 673677002, 924243001, 6..."


In [18]:
submission = submission.rename(columns={'customer_id': 'customer_id', 'y_score': 'prediction'})
submission['prediction'] = submission.prediction.apply(lambda x: ' '.join([f'{e:010d}' for e in x]))
submission.head()

Unnamed: 0,customer_id,prediction
0,00000dbacae5abe5e23885899a1fa44253a17956c6d1c3...,0685687004 0706016001 0673677002 0924243001 06...
1,0000423b00ade91418cceaf3b26c6af3dd342b51fd051e...,0685687004 0706016001 0673677002 0924243001 06...
2,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,0685687004 0706016001 0673677002 0924243001 06...
3,00005ca1c9ed5f5146b52ac8639a40ca9d57aeff4d1bd2...,0685687004 0706016001 0673677002 0924243001 06...
4,00006413d8573cd20ed7128e53b7b13819fe5cfc2d801f...,0685687004 0706016001 0673677002 0924243001 06...


In [19]:
submission.to_csv('submission.csv', index=False)

In [20]:
customers = spark.read.option("header", True) \
    .csv('../input/h-and-m-personalized-fashion-recommendations/customers.csv')

dataset.createOrReplaceTempView("transaction") # Create temp view
customers.createOrReplaceTempView("customer")

df_bracketed_customers = spark.sql("""
                                   with age_bracketed_customers as(
                                   select customer_id,
                                    CASE
                                    WHEN age < 20 then 'Under 20'
                                    WHEN age between 20 and 30 then '20-30'
                                    WHEN age between 31 and 40 then '30-40'
                                    WHEN age between 41 and 50 then '40-50'
                                    WHEN age between 51 and 60 then '50-60'
                                    ELSE '60+'
                                    END AS `age bracket`
                                    from customer
                                    ),
                                    recs as (
                                    select
                                    article_id
                                    , `age bracket` as `purchaser_age_bracket`
                                    , row_number() over (partition by `age bracket` order by count(*) desc) as rank_within_age_bracket
                                    , count(*) as `purchase count`
                                    from transaction t
                                    join age_bracketed_customers a on a.customer_id = t.customer_id
                                    group by article_id, `age bracket`
                                    )
                                    select * from recs
                                    where rank_within_age_bracket <= 12
                                    order by purchaser_age_bracket, rank_within_age_bracket asc
                                   """)

In [21]:
pd_df_bracket_cust = df_bracketed_customers.toPandas()

22/04/28 20:46:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/28 20:46:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/28 20:46:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/28 20:46:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/28 20:46:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/28 20:46:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/28 20:46:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/28 20:46:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/28 20:46:50 WARN RowBasedKeyValueBatch: Calling spill() on

In [22]:
pd_df_bracket_cust

Unnamed: 0,article_id,purchaser_age_bracket,rank_within_age_bracket,purchase count
0,706016001,20-30,1,29181
1,706016002,20-30,2,18974
2,372860001,20-30,3,16901
3,759871002,20-30,4,16205
4,156231001,20-30,5,14790
...,...,...,...,...
67,720125001,Under 20,8,960
68,733749001,Under 20,9,951
69,759465001,Under 20,10,935
70,448509014,Under 20,11,919


In [23]:
groups = pd_df_bracket_cust['purchaser_age_bracket'].unique().tolist()
groups

['20-30', '30-40', '40-50', '50-60', '60+', 'Under 20']

In [24]:
group_variables = ['twenty_to_thirty', 
                   'thirty_to_forty', 
                   'forty_to_fifty', 
                   'fifty_to_sixty', 
                   'over_sixty', 
                   'under_twenty']

In [25]:
def array_maker(source_df, targeted_filter):
    filtered_df = source_df[source_df['purchaser_age_bracket'] == targeted_filter]
    
    df_array = filtered_df['article_id'] \
    .astype(str) \
    .astype(int) \
    .to_numpy()
    
    return df_array

In [26]:
d = {}
for variable in group_variables:
    for g in groups:
        variable = array_maker(pd_df_bracket_cust, g)
        d.update({g : variable})

In [27]:
d

{'20-30': array([706016001, 706016002, 372860001, 759871002, 156231001, 464297007,
        610776002, 448509014, 399223001, 706016003, 372860002, 720125001]),
 '30-40': array([610776002, 706016001, 464297007, 562245001, 372860001, 608776002,
        610776001, 160442007, 158340001, 562245046, 565379001, 351484002]),
 '40-50': array([706016001, 706016002, 372860001, 372860002, 610776002, 568601006,
        399256001, 351484002, 673677002, 572797001, 562245001, 562245046]),
 '50-60': array([706016001, 706016002, 372860001, 568601006, 610776002, 751471001,
        372860002, 673677002, 678942001, 579541001, 399256001, 573716012]),
 '60+': array([579541001, 399256005, 610776001, 399256001, 610776002, 678942001,
        673677002, 796210010, 573716012, 706016001, 783346001, 572797002]),
 'Under 20': array([706016001, 759871002, 706016002, 464297007, 759871001, 372860002,
        673396002, 720125001, 733749001, 759465001, 448509014, 372860001])}