In [1]:
# Instala Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Baixa e extrai Spark
!curl -L -o spark.tgz https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar -xzf spark.tgz

# Instala findspark
!pip install -q findspark

# Configura variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

import findspark
findspark.init()


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  285M  100  285M    0     0  1130k      0  0:04:18  0:04:18 --:--:-- 1295k


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("iFood Case - Data Processing") \
    .getOrCreate()


In [3]:
offers_df = spark.read.json("offers.json")
profile_df = spark.read.json("profile.json")
transactions_df = spark.read.json("transactions.json")

In [4]:
offers_df.printSchema()
profile_df.printSchema()
transactions_df.printSchema()


root
 |-- channels: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- discount_value: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- id: string (nullable = true)
 |-- min_value: long (nullable = true)
 |-- offer_type: string (nullable = true)

root
 |-- age: long (nullable = true)
 |-- credit_card_limit: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: string (nullable = true)
 |-- registered_on: string (nullable = true)

root
 |-- account_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- time_since_test_start: double (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- amount: double (nullable = true)
 |    |-- offer id: string (nullable = true)
 |    |-- offer_id: string (nullable = true)
 |    |-- reward: double (nullable = true)



In [5]:
'''
offers_df.show(5, truncate=False)
profile_df.show(5, truncate=False)
transactions_df.show(5, truncate=False)
'''

'\noffers_df.show(5, truncate=False)\nprofile_df.show(5, truncate=False)\ntransactions_df.show(5, truncate=False)\n'

Esse erro do transactions_df.show() do PySpark indica que o arquivo .json tem linhas corrompidas ou malformadas.
Por segurança, ele joga essas linhas problemáticas na coluna chamada "_corrupt_record".
Vou ler o arquivo em modo permissivo a seguir e ver onde estão os erros


In [6]:
transactions_df = spark.read \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .json("transactions.json")

transactions_df.show(5, truncate=False)


+--------------------------------+--------------+---------------------+----------------------------------------------------+
|account_id                      |event         |time_since_test_start|value                                               |
+--------------------------------+--------------+---------------------+----------------------------------------------------+
|78afa995795e4d85b5d9ceeca43f5fef|offer received|0.0                  |{null, 9b98b8c7a33c4b65b9aebfe6a799e6d9, null, null}|
|a03223e636434f42ac4c3df47e8bac43|offer received|0.0                  |{null, 0b1e1539f2cc45b7b9fa7c272da2e1d7, null, null}|
|e2127556f4f64592b11af22de27a7932|offer received|0.0                  |{null, 2906b810c7d4411798c6938adc9daaa5, null, null}|
|8ec6ce2a7e7949b1bf142def7d0e0586|offer received|0.0                  |{null, fafdcd668e3743c1bb461111dcafc2a4, null, null}|
|68617ca6246f4fbc85e91a2a49552598|offer received|0.0                  |{null, 4d5c57ea9a6940dd891ad53e9dbe8da0, null, null}|


In [7]:
#transactions_df.filter(transactions_df["_corrupt_record"].isNotNull()).count()

In [8]:
transactions_df.select("event").distinct().show()

+---------------+
|          event|
+---------------+
|    transaction|
| offer received|
|offer completed|
|   offer viewed|
+---------------+



In [9]:
trans_df = transactions_df.filter(transactions_df["event"] == "transaction")
trans_df.show(5, truncate=False)

+--------------------------------+-----------+---------------------+-------------------------+
|account_id                      |event      |time_since_test_start|value                    |
+--------------------------------+-----------+---------------------+-------------------------+
|02c083884c7d45b39cc68e1314fec56c|transaction|0.0                  |{0.83, null, null, null} |
|9fa9ae8f57894cc9a3b8a9bbe0fc1b2f|transaction|0.0                  |{34.56, null, null, null}|
|54890f68699049c2a04d415abc25e717|transaction|0.0                  |{13.23, null, null, null}|
|b2f1cd155b864803ad8334cdf13c4bd2|transaction|0.0                  |{19.51, null, null, null}|
|fe97aa22dd3e48c8b143116a8403dd52|transaction|0.0                  |{18.97, null, null, null}|
+--------------------------------+-----------+---------------------+-------------------------+
only showing top 5 rows



In [10]:
received_df = transactions_df.filter(transactions_df["event"] == "offer received")
received_df.show(5, truncate=False)

+--------------------------------+--------------+---------------------+----------------------------------------------------+
|account_id                      |event         |time_since_test_start|value                                               |
+--------------------------------+--------------+---------------------+----------------------------------------------------+
|78afa995795e4d85b5d9ceeca43f5fef|offer received|0.0                  |{null, 9b98b8c7a33c4b65b9aebfe6a799e6d9, null, null}|
|a03223e636434f42ac4c3df47e8bac43|offer received|0.0                  |{null, 0b1e1539f2cc45b7b9fa7c272da2e1d7, null, null}|
|e2127556f4f64592b11af22de27a7932|offer received|0.0                  |{null, 2906b810c7d4411798c6938adc9daaa5, null, null}|
|8ec6ce2a7e7949b1bf142def7d0e0586|offer received|0.0                  |{null, fafdcd668e3743c1bb461111dcafc2a4, null, null}|
|68617ca6246f4fbc85e91a2a49552598|offer received|0.0                  |{null, 4d5c57ea9a6940dd891ad53e9dbe8da0, null, null}|


In [11]:
viewed_df = transactions_df.filter(transactions_df["event"] == "offer viewed")
viewed_df.show(5, truncate=False)

+--------------------------------+------------+---------------------+----------------------------------------------------+
|account_id                      |event       |time_since_test_start|value                                               |
+--------------------------------+------------+---------------------+----------------------------------------------------+
|389bc3fa690240e798340f5a15918d5c|offer viewed|0.0                  |{null, f19421c1d4aa40978ebb69ca19b0e20d, null, null}|
|d1ede868e29245ea91818a903fec04c6|offer viewed|0.0                  |{null, 5a8bc65990b245e5a138643cd4eb9837, null, null}|
|102e9454054946fda62242d2e176fdce|offer viewed|0.0                  |{null, 4d5c57ea9a6940dd891ad53e9dbe8da0, null, null}|
|02c083884c7d45b39cc68e1314fec56c|offer viewed|0.0                  |{null, ae264e3637204a6fb9bb56bc8210ddfd, null, null}|
|be8a5d1981a2458d90b255ddc7e0d174|offer viewed|0.0                  |{null, 5a8bc65990b245e5a138643cd4eb9837, null, null}|
+---------------

In [12]:
completed_df = transactions_df.filter(transactions_df["event"] == "offer completed")
completed_df.show(5, truncate=False)

+--------------------------------+---------------+---------------------+----------------------------------------------------+
|account_id                      |event          |time_since_test_start|value                                               |
+--------------------------------+---------------+---------------------+----------------------------------------------------+
|9fa9ae8f57894cc9a3b8a9bbe0fc1b2f|offer completed|0.0                  |{null, null, 2906b810c7d4411798c6938adc9daaa5, 2.0} |
|fe97aa22dd3e48c8b143116a8403dd52|offer completed|0.0                  |{null, null, fafdcd668e3743c1bb461111dcafc2a4, 2.0} |
|629fc02d56414d91bca360decdfa9288|offer completed|0.0                  |{null, null, 9b98b8c7a33c4b65b9aebfe6a799e6d9, 5.0} |
|676506bad68e4161b9bbaffeb039626b|offer completed|0.0                  |{null, null, ae264e3637204a6fb9bb56bc8210ddfd, 10.0}|
|8f7dd3b2afe14c078eb4f6e6fe4ba97d|offer completed|0.0                  |{null, null, 4d5c57ea9a6940dd891ad53e9dbe8da0,

In [13]:
received_df = received_df.withColumn("offer_id", received_df["value"]["offer id"])

In [14]:
viewed_df = viewed_df.withColumn("offer_id", viewed_df["value"]["offer id"])

In [15]:
completed_df = completed_df.withColumn("offer_id", completed_df["value"]["offer_id"]) \
                           .withColumn("reward", completed_df["value"]["reward"])

In [16]:
trans_df = trans_df.withColumn("amount", trans_df["value"]["amount"])

In [17]:
base_df = received_df.select(
    "account_id",
    "offer_id",
    "time_since_test_start"
).withColumnRenamed("time_since_test_start", "received_time")

In [18]:
viewed_df = viewed_df.select(
    "account_id",
    "offer_id",
    "time_since_test_start"
).withColumnRenamed("time_since_test_start", "viewed_time")

base_df = base_df.join(viewed_df, ["account_id", "offer_id"], how="left")


In [19]:
completed_df = completed_df.select(
    "account_id",
    "offer_id",
    "time_since_test_start",
    "reward"
).withColumnRenamed("time_since_test_start", "completed_time")

base_df = base_df.join(completed_df, ["account_id", "offer_id"], how="left")

In [20]:
from pyspark.sql.functions import when

base_df = base_df.withColumn("foi_visualizada", when(base_df["viewed_time"].isNotNull(), 1).otherwise(0))
base_df = base_df.withColumn("foi_completada", when(base_df["completed_time"].isNotNull(), 1).otherwise(0))


In [21]:
from pyspark.sql.functions import round

base_df = base_df.withColumn("tempo_ate_visualizacao", round(base_df["viewed_time"] - base_df["received_time"], 2))
base_df = base_df.withColumn("tempo_ate_completamento", round(base_df["completed_time"] - base_df["received_time"], 2))


In [22]:
base_df.select(
    "account_id", "offer_id",
    "received_time", "viewed_time", "completed_time",
    "foi_visualizada", "foi_completada",
    "tempo_ate_visualizacao", "tempo_ate_completamento"
).show(10, truncate=False)


+--------------------------------+--------------------------------+-------------+-----------+--------------+---------------+--------------+----------------------+-----------------------+
|account_id                      |offer_id                        |received_time|viewed_time|completed_time|foi_visualizada|foi_completada|tempo_ate_visualizacao|tempo_ate_completamento|
+--------------------------------+--------------------------------+-------------+-----------+--------------+---------------+--------------+----------------------+-----------------------+
|2eeac8d8feae4a8cad5a6af0499a211d|3f207df678b143eea3cee63160fa8bed|0.0          |null       |null          |0              |0             |null                  |null                   |
|31dda685af34476cad5bc968bdb01c53|0b1e1539f2cc45b7b9fa7c272da2e1d7|0.0          |6.25       |null          |1              |0             |6.25                  |null                   |
|389bc3fa690240e798340f5a15918d5c|f19421c1d4aa40978ebb69ca19b0e20

In [23]:
# Renomeia id para evitar conflito com offer_id
profile_df = profile_df.withColumnRenamed("id", "account_id")

base_df = base_df.join(profile_df, on="account_id", how="left")

In [24]:
# Renomeia id para evitar conflito com account_id
offers_df = offers_df.withColumnRenamed("id", "offer_id")

base_df = base_df.join(offers_df, on="offer_id", how="left")

In [25]:
# Tratamento de nulos e inconsistências
base_df = base_df.filter(
    base_df["age"].isNotNull() &
    base_df["gender"].isin("F", "M") &
    base_df["credit_card_limit"].isNotNull() &
    base_df["offer_type"].isNotNull() &
    (base_df["age"] > 10) & (base_df["age"] < 100)
)

# Normalização do limite
from pyspark.sql.functions import min as spark_min, max as spark_max, col

min_lim = base_df.agg(spark_min("credit_card_limit")).first()[0]
max_lim = base_df.agg(spark_max("credit_card_limit")).first()[0]

base_df = base_df.withColumn(
    "credit_card_limit_norm",
    (col("credit_card_limit") - min_lim) / (max_lim - min_lim)
)


base_df.toPandas().to_csv("base_df.csv", index=False)
