In [None]:
# Instalar pySpark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=5c468ae028922e92d67ed0b7f8e9bf50cd33b76beab2408811bbd436615bb96d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [220]:
# Bibliotecas
from pyspark.sql import SparkSession
import requests
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [226]:
# Iniciar sessão Spark
spark = SparkSession.builder \
    .appName("API fakeStore") \
    .getOrCreate()

In [227]:
# Requisição dos usuários na API
url = "https://fakestoreapi.com/users/"
response = requests.get(url)
data = response.json()
df_users = spark.createDataFrame(data)

In [233]:
# Requisição dos objetos que estão no carrinho
carts = "https://fakestoreapi.com/carts"
request_carts = requests.get(carts)
response_carts = request_carts.json()
df_carts = spark.createDataFrame(response_carts)

# Função explode para quebrar a lista de dicionários da coluna 'products'
df_carts = df_carts.withColumn("product", explode(col("products")))

new_df_carts = df_carts.select(
    col("userId").alias("id_usuario"),
    col("product.productId").alias("id_produto"),
    col("product.quantity").alias("quantidade"),
    col("date")
)

# Conversão data
new_df_carts = new_df_carts.withColumn("data", date_format(col("date"), "yyyy-MM-dd"))

In [235]:
# Requisição de todos os produtos
products = "https://fakestoreapi.com/products"
request_products = requests.get(products)
response_products = request_products.json()

# Função abaixo utilizada pois os dados estavam inconsistentes, no entanto, foi inferido o esquema mais robusto
df_products = spark.read.json(spark.sparkContext.parallelize([response_products]))
df_products = df_products.withColumnRenamed("category", "categoria")

In [236]:
# Join entre o dataframe "usuário" e "carrinho"
join_df = df_users.join(new_df_carts, df_users.id == new_df_carts.id_usuario, "inner").select('id_usuario','id_produto','quantidade', 'data')

In [237]:
# Join entre o 'join_df' com o dataframe "produtos"
join_df_2 = join_df.join(df_products, join_df.id_produto == df_products.id, "inner").select('id_usuario', 'id_produto', 'quantidade', 'categoria', 'data')

In [240]:
# Somatório da quantidade de produtos e trazendo a data mais recente em cada usuário adicionou ao carrinho
group_df = join_df_2.groupBy("id_usuario", "id_produto", "categoria").agg(
    sum("quantidade").alias("total_quantidade"),
    max("data").alias("ultima_data")
)

+----------+----------+----------------+----------------+-----------+
|id_usuario|id_produto|       categoria|total_quantidade|ultima_data|
+----------+----------+----------------+----------------+-----------+
|         1|         1|  men's clothing|              14| 2020-03-02|
|         1|         2|  men's clothing|               5| 2020-03-02|
|         1|         3|  men's clothing|               6| 2020-03-02|
|         1|         5|        jewelery|               2| 2020-01-02|
|         2|         1|  men's clothing|               2| 2020-03-01|
|         2|         9|     electronics|               1| 2020-03-01|
|         3|         1|  men's clothing|               4| 2020-01-01|
|         3|         7|        jewelery|               1| 2020-03-01|
|         3|         8|        jewelery|               1| 2020-03-01|
|         4|        10|     electronics|               2| 2020-03-01|
|         4|        12|     electronics|               3| 2020-03-01|
|         8|        

In [250]:
# Extração quantidade máxima de cada produto
max_qtd_df = group_df.groupBy("id_usuario").agg(
    max("total_quantidade").alias("max_qtd")
)
max_qtd_df = max_qtd_df.withColumnRenamed("id_usuario", "id")
join_max_df = group_df.join(max_qtd_df, group_df.id_usuario == max_qtd_df.id, "inner")

# Filtrando apenas a maior quantidade dos produtos de cada usuário
final_df = join_max_df.filter(col("total_quantidade") == col("max_qtd")).select("id_usuario", "categoria", "ultima_data").orderBy(asc("id_usuario"))
final_df.show()


+----------+----------------+-----------+
|id_usuario|       categoria|ultima_data|
+----------+----------------+-----------+
|         1|  men's clothing| 2020-03-02|
|         2|  men's clothing| 2020-03-01|
|         3|  men's clothing| 2020-01-01|
|         4|     electronics| 2020-03-01|
|         8|women's clothing| 2020-03-01|
+----------+----------------+-----------+



In [253]:
# Salvar em arquivo CSV
output_path = "./fakeStoreSpark.csv"
final_df.write.option("header", "true").csv(output_path)