# Feature Model Teamplate
* Aluno: Paulo Henrique Costa Gontijo
* Matrícula: 15/0143800


In [1]:
# !pip install pyspark fastai fastbook -q

## Importação das Bibliotecas

In [2]:
import os

file_name = 'test.jsonl'
file_path = '/'.join([os.getcwd(), file_name])

## Criando Cluster Spark

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from os.path import abspath

warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
        .builder \
        .appName("otto-reccomender-competition-kaggle") \
        .config("spark.sql.warehouse.dir", abspath('spark-warehouse')) \
        .config("spark.executor.cores", "10")\
        .config("spark.driver.cores", "10")\
        .config("spark.executor.instances", "2")\
        .config("spark.executor.memory","32g")\
        .config("spark.driver.memory", "32g")\
        .config("spark.default.parallelism","80")\
        .config("spark.sql.shuffle.partitions","20")\
        .config("spark.sql.autoBroadcastHashJoin","-1")\
        .config("spark.sql.files.maxPartitionBytes","1g")\
        .enableHiveSupport() \
        .getOrCreate()

# show configured parameters
print(SparkConf().getAll())

# set log level
spark.sparkContext.setLogLevel("FATAL")

23/09/26 15:58:23 WARN Utils: Your hostname, gobellek-B660M-DS3H-DDR4 resolves to a loopback address: 127.0.1.1; using 192.168.0.37 instead (on interface enp4s0)
23/09/26 15:58:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/26 15:58:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[('spark.sql.shuffle.partitions', '20'), ('spark.driver.memory', '32g'), ('spark.driver.cores', '10'), ('spark.default.parallelism', '80'), ('spark.executor.cores', '10'), ('spark.executor.memory', '32g'), ('spark.app.name', 'otto-reccomender-competition-kaggle'), ('spark.sql.autoBroadcastHashJoin', '-1'), ('spark.sql.catalogImplementation', 'hive'), ('spark.executor.instances', '2'), ('spark.sql.warehouse.dir', '/home/gobellek/Documents/UnB/Tcc/tcc/spark-warehouse'), ('spark.master', 'local[*]'), ('spark.submit.pyFiles', ''), ('spark.submit.deployMode', 'client'), ('spark.app.submitTime', '1695754703648'), ('spark.ui.showConsoleProgress', 'true'), ('spark.sql.files.maxPartitionBytes', '1g')]


* Conferindo o instanciamento

In [4]:
spark

* Leitura do arquivo disponibilizado na competição kaggle: [link.](https://www.kaggle.com/competitions/otto-recommender-system/overview/)

In [5]:
df_raw = spark.read \
    .format("json") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .json(file_path).persist()
df_raw.columns

                                                                                

['events', 'session']

* Conferindo número de partições e conteúdo

In [6]:
print(df_raw.rdd.getNumPartitions())

80


In [7]:
df_raw.show(10)

+--------------------+--------+
|              events| session|
+--------------------+--------+
|[{59625, 16617240...|12899779|
|[{1142000, 166172...|12899780|
|[{141736, 1661724...|12899781|
|[{1669402, 166172...|12899782|
|[{255297, 1661724...|12899783|
|[{1036375, 166172...|12899784|
|[{1784451, 166172...|12899785|
|[{955252, 1661724...|12899786|
|[{1682750, 166172...|12899787|
|[{245131, 1661724...|12899788|
+--------------------+--------+
only showing top 10 rows



# Contagem de linhas em arquivo bruto

In [8]:
df_raw.count()

                                                                                

1671803

In [9]:
df_raw.printSchema()

root
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- aid: long (nullable = true)
 |    |    |-- ts: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- session: long (nullable = true)



* Processamento de Json para formato tabular

In [10]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [11]:
def explode_df_json_to_tabular(df_raw):
    df_explode = df_raw.withColumn('events_explode', F.explode('events'))\
                    .withColumn('session', F.col('session').cast(T.IntegerType()))\
                    .withColumn('aid', F.col('events_explode.aid').cast(T.IntegerType()))\
                    .withColumn('ts', F.col('events_explode.ts'))\
                    .withColumn('type', F.col('events_explode.type'))\
                    .drop('events', 'events_explode')
    return df_explode

In [12]:
df_explode = explode_df_json_to_tabular(df_raw)
df_explode.show(10)

+--------+-------+-------------+------+
| session|    aid|           ts|  type|
+--------+-------+-------------+------+
|12899779|  59625|1661724000278|clicks|
|12899780|1142000|1661724000378|clicks|
|12899780| 582732|1661724058352|clicks|
|12899780| 973453|1661724109199|clicks|
|12899780| 736515|1661724136868|clicks|
|12899780|1142000|1661724155248|clicks|
|12899781| 141736|1661724000559|clicks|
|12899781| 199008|1661724022851|clicks|
|12899781|  57315|1661724170835|clicks|
|12899781| 194067|1661724246188|clicks|
+--------+-------+-------------+------+
only showing top 10 rows



# Contagem de linhas após o tratamento

In [13]:
df_explode.count()

6928123

* Schema pós tratamento

In [14]:
df_explode.printSchema()

root
 |-- session: integer (nullable = true)
 |-- aid: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- type: string (nullable = true)



* Categorização de coluna alvo string

In [15]:
from pyspark.ml.feature import StringIndexer

def transform_indexer(df_explode):
    indexer = StringIndexer(inputCol='type', outputCol='type_cat')
    indexer_fitted = indexer.fit(df_explode)
    df_indexed = indexer_fitted.transform(df_explode)
    df_indexed_type = df_indexed.withColumn('type_cat', F.col('type_cat').cast(T.IntegerType())).drop('type')
    df_indexed_type_renamed = df_indexed_type.withColumnRenamed('session','userCol')
    df_indexed_droped = df_indexed_type_renamed.dropDuplicates()
    df = df_indexed_droped.drop('ts')
    return df

In [16]:
df_indexed = transform_indexer(df_explode).persist()
df_indexed.show(10)



+--------+-------+--------+
| userCol|    aid|type_cat|
+--------+-------+--------+
|12899780| 582732|       0|
|12899782| 562753|       1|
|12899782| 834354|       2|
|12899783|1216820|       0|
|12899785|1784451|       0|
|12899785| 903397|       0|
|12899785|1308865|       0|
|12899785| 614626|       0|
|12899788|1663048|       0|
|12899791|1787713|       0|
+--------+-------+--------+
only showing top 10 rows



                                                                                

* Conferência de schema pós tratativas

In [17]:
df_indexed.printSchema()

root
 |-- userCol: integer (nullable = true)
 |-- aid: integer (nullable = true)
 |-- type_cat: integer (nullable = true)



## Análise de Clientes

* Quantidade de clientes únicos

In [18]:
df_indexed.select('userCol').distinct().count()

                                                                                

1671803

## Análise de Produtos

* Quantidade de produtos únicos

In [19]:
df_indexed.select('aid').distinct().count()

783486

# Escrita de DataFrame tratado

In [20]:
df_indexed.coalesce(20)\
          .write.format('parquet')\
          .mode("overwrite")\
          .save('/'.join([os.getcwd(), 'refined-dataset']))

                                                                                

In [21]:
df_raw.unpersist()
df_explode.unpersist()
df_indexed.unpersist()

DataFrame[userCol: int, aid: int, type_cat: int]

# Lendo o dataset refinado

In [22]:
df_refined = spark.read.format('parquet').load('refined-dataset').persist()
df_refined.show()

+--------+-------+--------+
| userCol|    aid|type_cat|
+--------+-------+--------+
|12899781| 199008|       0|
|12899782| 975116|       0|
|12899782| 975116|       1|
|12899782| 889671|       1|
|12899785|1154962|       0|
|12899785| 453905|       0|
|12899785| 258458|       0|
|12899785|1553332|       0|
|12899785| 874493|       0|
|12899785|  10851|       0|
|12899785| 453905|       0|
|12899799| 413826|       2|
|12899801| 725164|       0|
|12899803| 795006|       0|
|12899803| 929024|       1|
|12899803|1651971|       0|
|12899803|1684753|       2|
|12899807|1407421|       0|
|12899809| 703890|       0|
|12899810|1309871|       0|
+--------+-------+--------+
only showing top 20 rows



# Filtrando Dataset de Treino para Clientes que Compraram

In [23]:
client_ids = df_refined.select(df_refined.userCol).where(F.col('type_cat') == 2).dropDuplicates().withColumnRenamed('userCol', 'clients_ids')
client_ids.count()

                                                                                

35849

In [24]:
df_pre_moodel = df_refined.join(client_ids, df_refined.userCol == client_ids.clients_ids, how='inner').drop('clients_ids').persist()
df_pre_moodel.show()

+--------+-------+--------+
| userCol|    aid|type_cat|
+--------+-------+--------+
|12899815| 711554|       0|
|12899815| 711554|       0|
|12899815| 745095|       0|
|12899815|1519178|       0|
|12899815| 772784|       0|
|12899815| 708311|       2|
|12899815|1574360|       0|
|12899815|1747391|       0|
|12899815| 526421|       0|
|12899815| 711554|       0|
|12899815|1783601|       0|
|12899815| 290121|       0|
|12899815|1188442|       0|
|12899815| 374736|       0|
|12899815|1577268|       0|
|12899815| 101371|       0|
|12899815| 307161|       0|
|12899815|1153934|       0|
|12899815|  86049|       0|
|12899815|1626876|       0|
+--------+-------+--------+
only showing top 20 rows



In [25]:
df_pre_moodel.select('userCol').count()

628773

In [26]:
df_pre_moodel.select('userCol').distinct().count()

35849

### Adicionar pipeline de hiperparametrização
* http://restanalytics.com/2019-02-27-Hyperparameter-Tuning-Alternating-Least-Squares-Recommender-System/

In [27]:
from pyspark.ml.recommendation import ALS

training = df_pre_moodel

In [None]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(rank=100,
          maxIter=20,
          regParam=0.01,
          userCol="userCol",
          itemCol="aid",
          ratingCol="type_cat",
          coldStartStrategy="drop",
          implicitPrefs=True,
          checkpointInterval=1)
model = als.fit(training)

# Gerando Recomendações
Upload do dataset de treino

In [34]:
file_name = 'test.jsonl'
file_path = '/'.join([os.getcwd(), file_name])

In [35]:
df_test = spark.read \
    .format("json") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .json(file_path).persist()

                                                                                

In [36]:
df_test.show()

+--------------------+--------+
|              events| session|
+--------------------+--------+
|[{59625, 16617240...|12899779|
|[{1142000, 166172...|12899780|
|[{141736, 1661724...|12899781|
|[{1669402, 166172...|12899782|
|[{255297, 1661724...|12899783|
|[{1036375, 166172...|12899784|
|[{1784451, 166172...|12899785|
|[{955252, 1661724...|12899786|
|[{1682750, 166172...|12899787|
|[{245131, 1661724...|12899788|
|[{525156, 1661724...|12899789|
|[{1219653, 166172...|12899790|
|[{915175, 1661724...|12899791|
|[{1537160, 166172...|12899792|
|[{1181781, 166172...|12899793|
|[{1746099, 166172...|12899794|
|[{207754, 1661724...|12899795|
|[{4503, 166172400...|12899796|
|[{1335784, 166172...|12899797|
|[{379440, 1661724...|12899798|
+--------------------+--------+
only showing top 20 rows



In [37]:
df_test.count()

                                                                                

1671803

# Tratamento de subset para recomendações

In [38]:
df_pre_transform = explode_df_json_to_tabular(df_test)
df_pre_transform.count()

6928123

In [39]:
df_subset = transform_indexer(df_pre_transform).select('userCol').distinct()
df_subset.count()

1671803

In [40]:
df_pre_moodel.join(df_subset, df_pre_moodel.userCol == df_subset.userCol, 'inner')\
                    .drop(df_subset.userCol).select('userCol').distinct().count()

35849

In [41]:
df_recommend = model.recommendForUserSubset(df_subset, 20)
df_recommend.printSchema()

root
 |-- userCol: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- aid: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



# Escrevendo Dataset de Recomendações em JSON

In [44]:
df_recommend.write.format('json')\
                  .mode("overwrite")\
                  .save('/'.join([os.getcwd(), 'raw-orders-predictions.json']))

                                                                                

In [43]:
df_recommend.unpersist()

DataFrame[userCol: int, recommendations: array<struct<aid:int,rating:float>>]