In [1]:
import os
from datetime import datetime, timedelta
from pyspark.sql import SparkSession, functions as f
import matplotlib.pyplot as plt
from pyspark.sql.functions import explode, col, date_format
from pyspark.sql.types import *

In [2]:
# Define a raiz do projeto
os.chdir('/home/jovyan/')

## Session

In [3]:
# Minio
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT_", "minioserver:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY_", "embuuWvqBMTLPRnbYXxu")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY_", "ZApLkWzj71pCNrB0IBGvQ5s5a2x4AJ42XSFZxb39")

# Postgres
POSTGRES_DB = os.getenv("POSTGRES_DB", "z106")
POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "z106pass")
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "postgres_z106:5432")

# Remote
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "z106")

In [4]:
%%time

spark = (
    SparkSession.builder.appName("DW_z106")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.4")
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .getOrCreate()
)

CPU times: user 933 ms, sys: 369 ms, total: 1.3 s
Wall time: 5min 21s


## Carga de dados Dimensões

In [5]:
def query_postgres(spark, query):
    df = (
        spark.read.format("jdbc")
        .option("driver", "org.postgresql.Driver")
        .option("url", f"jdbc:postgresql://{POSTGRES_HOST}/{POSTGRES_DB}")
        .option("user", POSTGRES_USER)
        .option("password", POSTGRES_PASSWORD)
        .option("query", query)
        .load()
    )
    return df

### Clientes

In [6]:
%%time
query = "SELECT * FROM customers"
df_customers = query_postgres(spark, query)

# Criando a dimensão de clientes
df_customers.createOrReplaceTempView("dm_customers")

CPU times: user 11.5 ms, sys: 4.92 ms, total: 16.4 ms
Wall time: 1.79 s


In [11]:
df_customers.show(5)

+--------------------+----------+-----------------+--------------------+--------------+--------------------+--------------------+
|                  id|created_at|             name|               email|        tax_id|             address|              phones|
+--------------------+----------+-----------------+--------------------+--------------+--------------------+--------------------+
|CID_77322b65-fb44...|2024-01-27|    Elisa Pacheco|elisa.pacheco@exe...|382.710.694-03|{'street': 'Aerop...|[{'country': '+55...|
|CID_ea8cd53f-c02f...|2024-06-06|Valentina Costela|valentina.costela...|984.307.615-00|{'street': 'Estaç...|[{'country': '+55...|
|CID_043bd699-82f3...|2022-07-19|    Igor Mendonça|igor.mendonça@exe...|538.092.647-92|{'street': 'Lotea...|[{'country': '+55...|
|CID_d9a65e7f-441d...|2020-03-25|        Léo Silva|léo.silva@exemple...|308.496.715-66|{'street': 'Favel...|[{'country': '+55...|
|CID_4e6ba3cd-8569...|2023-04-23|   Emanuel Farias|emanuel.farias@ex...|842.137.956-91|{'s

### Produtos

In [7]:
%%time
query = "SELECT * FROM products"
df_products = query_postgres(spark, query)

# Criando a dimensão de produtos
df_products.createOrReplaceTempView("dm_products")

CPU times: user 5.62 ms, sys: 6.78 ms, total: 12.4 ms
Wall time: 49.9 ms


In [12]:
df_products.show(5)

+---+--------------------+--------------------+--------------+----------+
| id|                name|           categoria|  reference_id|unit_price|
+---+--------------------+--------------------+--------------+----------+
|  1|         Cropped Mar|             cropped|     GQ6Z22YGT|     78.90|
|  2|Top Z106 Caipirin...|colecao-brasilidades|   TP01-CAIPNH|     62.90|
|  3|Short Moderninho ...|               short|    SH04-BOSSA|    112.90|
|  4|      Top Z106 Brisa|                 top|     69X4JXZRX|     58.90|
|  5|Top Liberdade Bri...|                 top|TP04-BRILHOTRP|     65.90|
+---+--------------------+--------------------+--------------+----------+
only showing top 5 rows



### Data

In [8]:
def generate_date_df(spark, start_date, end_date):
    date_list = [start_date + timedelta(days=x) for x in range(0, (end_date - start_date).days + 1)]
    return spark.createDataFrame(date_list, DateType()).toDF("date")

In [9]:
# Data de início e fim
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 12, 31)

# Gerando o DataFrame de datas
df_dates = generate_date_df(spark, start_date, end_date)

In [10]:
# Adicionando colunas de tempo
df_time = df_dates.withColumn("year", f.year("date")) \
                  .withColumn("month", f.month("date")) \
                  .withColumn("day", f.dayofmonth("date")) \
                  .withColumn("weekday", f.date_format("date", "E")) \
                  .withColumn("week_of_year", f.weekofyear("date"))

# Criando a dimensão de tempo
df_time.createOrReplaceTempView("dm_time")

In [13]:
df_time.show(5)

+----------+----+-----+---+-------+------------+
|      date|year|month|day|weekday|week_of_year|
+----------+----+-----+---+-------+------------+
|2023-01-01|2023|    1|  1|    Sun|          52|
|2023-01-02|2023|    1|  2|    Mon|           1|
|2023-01-03|2023|    1|  3|    Tue|           1|
|2023-01-04|2023|    1|  4|    Wed|           1|
|2023-01-05|2023|    1|  5|    Thu|           1|
+----------+----+-----+---+-------+------------+
only showing top 5 rows



## Carga de dados Fato

In [14]:
%%time
query = "SELECT * FROM ft_orders"
ft_orders_df = query_postgres(spark, query)

# Criando a dimensão de produtos
ft_orders_df.createOrReplaceTempView("ft_orders")

CPU times: user 9.9 ms, sys: 2.2 ms, total: 12.1 ms
Wall time: 39.4 ms


## Salvando Dimensões como Parquet

In [16]:
# Salvar o DataFrame em formato Parquet no MinIO
df_customers.write.mode("overwrite").parquet(f"s3a://{MINIO_BUCKET}/dw/dm_customers")
df_products.write.mode("overwrite").parquet(f"s3a://{MINIO_BUCKET}/dw/dm_products")
df_time.write.mode("overwrite").parquet(f"s3a://{MINIO_BUCKET}/dw/dm_time")
ft_orders_df.write.mode("overwrite").parquet(f"s3a://{MINIO_BUCKET}/dw/ft_orders")

## ToPandas timestamp fix

In [None]:
pdf = df_orders.limit(10) \
    .withColumn("created_at", date_format("created_at", "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("charges_created_at", date_format("charges_created_at", "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("charges_paid_at", date_format("charges_paid_at", "yyyy-MM-dd HH:mm:ss")) \
    .toPandas()

In [17]:
# Lendo os arquivos Parquet
ft_orders_df = spark.read.parquet(f"s3a://{MINIO_BUCKET}/dw/ft_orders")
customers_df = spark.read.parquet(f"s3a://{MINIO_BUCKET}/dw/dm_customers")
products_df = spark.read.parquet(f"s3a://{MINIO_BUCKET}/dw/dm_products")
time_df = spark.read.parquet(f"s3a://{MINIO_BUCKET}/dw/dm_time")

Py4JJavaError: An error occurred while calling o100.parquet.
: java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:467)
	at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2625)
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2590)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:562)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
	... 31 more


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 51934)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/

#### Receita Total por Cliente (Top 10)

In [None]:
receita_por_cliente_df = ft_orders_df \
    .groupBy('customer') \
    .agg(f.sum('charge_value').alias('total_revenue')) \
    .orderBy('total_revenue', ascending=False) \
    .limit(10)

receita_por_cliente = receita_por_cliente_df.toPandas()
clientes = receita_por_cliente['customer'].fillna('Desconhecido')  # Substituir valores None
receitas = receita_por_cliente['total_revenue']
receita_por_cliente

#### Número de Pedidos por Categoria de Produto

In [None]:
items_df = ft_orders_df.withColumn('item', explode(col('item_references')))
items_df = items_df.select('item', 'charge_value')
pedidos_por_produto_df = items_df.join(products_df, items_df.item == products_df.reference_id, 'left') \
    .groupBy('category') \
    .agg(f.count('item').alias('order_count'))

categorias_df = pedidos_por_produto_df.toPandas()
categorias = categorias_df['category'].fillna('Desconhecida')  # Substituir valores None
quantidade_pedidos = categorias_df['order_count']


In [None]:
# Receita Total por Mês
receita_mensal_df = ft_orders_df \
    .join(time_df, ft_orders_df.created_at.cast('date') == time_df.date, 'left') \
    .groupBy('year', 'month') \
    .agg(f.sum('charge_value').alias('total_revenue')) \
    .orderBy('year', 'month')

receita_mensal = receita_mensal_df.toPandas()

# Garantir que os valores de year e month são inteiros
receita_mensal['month'] = receita_mensal['month'].fillna(0).astype(int)
receita_mensal['year'] = receita_mensal['year'].fillna(0).astype(int)

# Criar o período no formato 'YYYY-MM'
receita_mensal['period'] = receita_mensal.apply(lambda x: f"{int(x['year']):04d}-{int(x['month']):02d}", axis=1)
meses = receita_mensal['period']
receitas_por_mes = receita_mensal['total_revenue']

In [None]:
# Número de Pedidos por Estado de Entrega
shipping_df = ft_orders_df.withColumn(
    'shipping_region_code',
    f.get_json_object(f.col('shipping'), '$.region_code')
)
quantidade_pedidos_por_estado_df = shipping_df \
    .groupBy('shipping_region_code') \
    .agg(f.count('id').alias('order_count'))

estados_df = quantidade_pedidos_por_estado_df.toPandas()
estados = estados_df['shipping_region_code'].fillna('Desconhecido')  # Substituir valores None
quantidade_pedidos_por_estado = estados_df['order_count']

# Criar subplots
fig, axs = plt.subplots(2, 2, figsize=(16, 12))

# Receita Total por Cliente (Top 10)
axs[0, 0].bar(clientes, receitas, color='skyblue')
axs[0, 0].set_title('Receita Total por Cliente (Top 10)')
axs[0, 0].set_xlabel('Cliente')
axs[0, 0].set_ylabel('Receita Total (BRL)')
axs[0, 0].tick_params(axis='x', rotation=45)

# Número de Pedidos por Categoria de Produto
axs[0, 1].bar(categorias, quantidade_pedidos, color='lightgreen')
axs[0, 1].set_title('Número de Pedidos por Categoria de Produto')
axs[0, 1].set_xlabel('Categoria do Produto')
axs[0, 1].set_ylabel('Número de Pedidos')
axs[0, 1].tick_params(axis='x', rotation=45)

# Receita Total por Mês
axs[1, 0].plot(meses, receitas_por_mes, marker='o', linestyle='-', color='coral')
axs[1, 0].set_title('Receita Total por Mês')
axs[1, 0].set_xlabel('Mês')
axs[1, 0].set_ylabel('Receita Total (BRL)')
axs[1, 0].tick_params(axis='x', rotation=45)

# Número de Pedidos por Estado de Entrega
axs[1, 1].bar(estados, quantidade_pedidos_por_estado, color='lightcoral')
axs[1, 1].set_title('Número de Pedidos por Estado de Entrega')
axs[1, 1].set_xlabel('Estado de Entrega')
axs[1, 1].set_ylabel('Número de Pedidos')
axs[1, 1].tick_params(axis='x', rotation=45)

# Ajustar layout e mostrar
plt.tight_layout()
plt.show()