In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
#import os
#import boto3
#from utils.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, S3_BUCKET

spark = SparkSession.builder.appName("martech_test").getOrCreate()

In [2]:
'''
# S3 client
s3 = boto3.client(
    "s3",
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)

# List of S3 object keys to download
s3_object_keys = [
    "data/abi_bus_case1_beverage_channel_group_20210726.csv",
    "data/abi_bus_case1_beverage_sales_20210726.csv",
]

# Download each file from S3
for s3_object_key in s3_object_keys:
    local_path = os.path.join("s3_data", os.path.basename(s3_object_key))
    s3.download_file(S3_BUCKET, s3_object_key, local_path)
'''

1 - Build a data pipeline to merge the beverage channel features available in the interface #2 with the transactional sales from interface #1.

<i>Expected result: Data from both .CSV are merged correctly and can be grouped or transformed.</i>

In [3]:
#Fazemos o import do dado de sales, que está com a codificação diferente de utf-8
beverage_sales = spark.read.option("delimiter", "\t").option("encoding", "utf-16").csv("data/abi_bus_case1_beverage_sales_20210726.csv", header=True)
#Nesse caso utf-16 retornou rows vazias, podemos deleta-las com base no char � que está presente em DATE
beverage_sales = beverage_sales.filter(~col("DATE").contains("�"))
#Removemos � que existe no fim de toda a string em PERIOD
beverage_sales = beverage_sales.withColumn("PERIOD", regexp_replace(col("PERIOD").cast("string"), "�", ""))


#Fazemos o import do dado de feature dos channels
beverage_features = spark.read.csv("data/abi_bus_case1_beverage_channel_group_20210726.csv", header=True)

In [4]:
beverage_sales.show()

+--------+-------------+-----------+-------------------+------------------+--------------------+-------+------------+---------------+--------+----+-----+------+
|    DATE|CE_BRAND_FLVR|   BRAND_NM|Btlr_Org_LVL_C_Desc|        CHNL_GROUP|     TRADE_CHNL_DESC|PKG_CAT|Pkg_Cat_Desc|    TSR_PCKG_NM|$ Volume|YEAR|MONTH|PERIOD|
+--------+-------------+-----------+-------------------+------------------+--------------------+-------+------------+---------------+--------+----+-----+------+
|1/1/2006|         3440|      LEMON|             CANADA|           LEISURE|         SPORT VENUE|   N20O|   20Z/600ML|  .591L NRP 24L|   22.48|2006|    1|     1|
|1/1/2006|         3440|      LEMON|          NORTHEAST|            SUPERS|           SUPERETTE|   N20O|   20Z/600ML|    20Z NRP 24L|     100|2006|    1|     1|
|1/1/2006|         3554| STRAWBERRY|          SOUTHEAST|         WORKPLACE|      PLANT / OFFICE|   N20O|   20Z/600ML|    20Z NRP 24L|   66.14|2006|    1|     1|
|1/1/2006|         3441|  RASPBERR

In [5]:
beverage_features.show()

+--------------------+----------------+---------------+
|     TRADE_CHNL_DESC|TRADE_GROUP_DESC|TRADE_TYPE_DESC|
+--------------------+----------------+---------------+
|         SPORT VENUE|   ENTERTAINMENT|      ALCOHOLIC|
|           SUPERETTE|        SERVICES|            MIX|
|      PLANT / OFFICE|        SERVICES|            MIX|
|   MASS MERCHANDISER|         GROCERY|            MIX|
|LIQUOR/BEER/WINE/...|         GROCERY|      ALCOHOLIC|
|   CONVENIENCE STORE|        SERVICES|            MIX|
|QUICK SERVICE RES...|   ENTERTAINMENT|      ALCOHOLIC|
|         SUPERMARKET|         GROCERY|            MIX|
|           ALL OTHER|           OTHER|            MIX|
|OTHER EATING + DR...|        SERVICES|            MIX|
|          RESTAURANT|   ENTERTAINMENT|      ALCOHOLIC|
|  HYPER-MERCHANDISER|         GROCERY|            MIX|
|          DRUG STORE|         GROCERY|            MIX|
|      TRANSPORTATION|        SERVICES|  NON ALCOHOLIC|
|  MILITARY-COMMISARY|  GOV & MILITARY|         

In [6]:
# Join the two DataFrames based on 'TRADE_CHNL_DESC'
beverage_df = beverage_sales.join(beverage_features, on='TRADE_CHNL_DESC', how='inner')
beverage_df.show()

+--------------------+--------+-------------+-----------+-------------------+------------------+-------+------------+---------------+--------+----+-----+------+----------------+---------------+
|     TRADE_CHNL_DESC|    DATE|CE_BRAND_FLVR|   BRAND_NM|Btlr_Org_LVL_C_Desc|        CHNL_GROUP|PKG_CAT|Pkg_Cat_Desc|    TSR_PCKG_NM|$ Volume|YEAR|MONTH|PERIOD|TRADE_GROUP_DESC|TRADE_TYPE_DESC|
+--------------------+--------+-------------+-----------+-------------------+------------------+-------+------------+---------------+--------+----+-----+------+----------------+---------------+
|         SPORT VENUE|1/1/2006|         3440|      LEMON|             CANADA|           LEISURE|   N20O|   20Z/600ML|  .591L NRP 24L|   22.48|2006|    1|     1|   ENTERTAINMENT|      ALCOHOLIC|
|           SUPERETTE|1/1/2006|         3440|      LEMON|          NORTHEAST|            SUPERS|   N20O|   20Z/600ML|    20Z NRP 24L|     100|2006|    1|     1|        SERVICES|            MIX|
|      PLANT / OFFICE|1/1/2006

In [7]:
# executando o merge
beverage_df = beverage_sales.join(beverage_features, beverage_sales["TRADE_CHNL_DESC"] == beverage_features["TRADE_CHNL_DESC"], 'inner')

#dropando a coluna de join duplicada
beverage_df = beverage_df.drop(beverage_features["TRADE_CHNL_DESC"])

beverage_df.show()

+--------+-------------+-----------+-------------------+------------------+--------------------+-------+------------+---------------+--------+----+-----+------+----------------+---------------+
|    DATE|CE_BRAND_FLVR|   BRAND_NM|Btlr_Org_LVL_C_Desc|        CHNL_GROUP|     TRADE_CHNL_DESC|PKG_CAT|Pkg_Cat_Desc|    TSR_PCKG_NM|$ Volume|YEAR|MONTH|PERIOD|TRADE_GROUP_DESC|TRADE_TYPE_DESC|
+--------+-------------+-----------+-------------------+------------------+--------------------+-------+------------+---------------+--------+----+-----+------+----------------+---------------+
|1/1/2006|         3440|      LEMON|             CANADA|           LEISURE|         SPORT VENUE|   N20O|   20Z/600ML|  .591L NRP 24L|   22.48|2006|    1|     1|   ENTERTAINMENT|      ALCOHOLIC|
|1/1/2006|         3440|      LEMON|          NORTHEAST|            SUPERS|           SUPERETTE|   N20O|   20Z/600ML|    20Z NRP 24L|     100|2006|    1|     1|        SERVICES|            MIX|
|1/1/2006|         3554| STRAW

2 - Implement a data ingestion pipeline for the beverage sales data using dimensional data modeling. The solution must contain 
dimensions (at least two), fact (at least one) and especially summary tables based on the data provided. 

Your approach should be able to answer most of the business questions based on the KPIs provided. 

Implement and perform all the transformations and aggregations you find important. 

Make sure you can understand and explain you design decisions for the fictional team members represented by the recruiters. 
    

<i>Expected result: Physical data model implemented with 100% of data provided ingested</i>

In [8]:
# Limpeza e Transformação de Dados
# Supondo que $ Volume não deve ser negativo, removemos registros com $ Volume negativo
beverage_df = beverage_df.filter(col("$ Volume") >= 0)
beverage_df.show()

+--------+-------------+-----------+-------------------+------------------+--------------------+-------+------------+---------------+--------+----+-----+------+----------------+---------------+
|    DATE|CE_BRAND_FLVR|   BRAND_NM|Btlr_Org_LVL_C_Desc|        CHNL_GROUP|     TRADE_CHNL_DESC|PKG_CAT|Pkg_Cat_Desc|    TSR_PCKG_NM|$ Volume|YEAR|MONTH|PERIOD|TRADE_GROUP_DESC|TRADE_TYPE_DESC|
+--------+-------------+-----------+-------------------+------------------+--------------------+-------+------------+---------------+--------+----+-----+------+----------------+---------------+
|1/1/2006|         3440|      LEMON|             CANADA|           LEISURE|         SPORT VENUE|   N20O|   20Z/600ML|  .591L NRP 24L|   22.48|2006|    1|     1|   ENTERTAINMENT|      ALCOHOLIC|
|1/1/2006|         3440|      LEMON|          NORTHEAST|            SUPERS|           SUPERETTE|   N20O|   20Z/600ML|    20Z NRP 24L|     100|2006|    1|     1|        SERVICES|            MIX|
|1/1/2006|         3554| STRAW

In [9]:
# Criação de uma tabela dimensão de tempo
time_dimension = beverage_df.select("DATE", "YEAR", "MONTH", "PERIOD").distinct()
time_dimension.show()

+---------+----+-----+------+
|     DATE|YEAR|MONTH|PERIOD|
+---------+----+-----+------+
| 2/8/2006|2006|    2|     2|
|2/22/2006|2006|    2|     4|
| 1/8/2006|2006|    1|     2|
|1/15/2006|2006|    1|     3|
| 3/8/2006|2006|    3|     2|
|3/15/2006|2006|    3|     3|
| 2/1/2006|2006|    2|     1|
|1/22/2006|2006|    1|     4|
| 1/1/2006|2006|    1|     1|
|3/22/2006|2006|    3|     4|
|2/15/2006|2006|    2|     3|
| 3/1/2006|2006|    3|     1|
|3/29/2006|2006|    3|     5|
+---------+----+-----+------+



In [10]:
# Criação de uma tabela dimensão de localidade
location_dimension = beverage_df.select("Btlr_Org_LVL_C_Desc", "CHNL_GROUP").distinct()
location_dimension.show()

+-------------------+------------------+
|Btlr_Org_LVL_C_Desc|        CHNL_GROUP|
+-------------------+------------------+
|        GREAT LAKES|         WORKPLACE|
|            MIDWEST|       DRUG STORES|
|          SOUTHEAST|         WORKPLACE|
|          SOUTHEAST|       DRUG STORES|
|          SOUTHWEST|         EDUCATION|
|          NORTHEAST|      FOOD SERVICE|
|             CANADA|    TRANSPORTATION|
|          NORTHEAST|         WORKPLACE|
|        GREAT LAKES|    TRANSPORTATION|
|               WEST|CONVENIENCE RETAIL|
|            MIDWEST|            SUPERS|
|               WEST|            SUPERS|
|          NORTHEAST|     MILITARY/GOVT|
|          SOUTHWEST|       RETAIL COLD|
|        GREAT LAKES|        HEALTHCARE|
|          SOUTHEAST|            SUPERS|
|          NORTHEAST|             CLUBS|
|               WEST|         EDUCATION|
|          SOUTHEAST|CONVENIENCE RETAIL|
|        GREAT LAKES|CONVENIENCE RETAIL|
+-------------------+------------------+
only showing top

In [11]:
# Criação de uma tabela dimensão de produto
product_dimension = beverage_df.select("CE_BRAND_FLVR", "BRAND_NM", "PKG_CAT", "Pkg_Cat_Desc", "TSR_PCKG_NM").distinct()
product_dimension.show()

+-------------+-----------+-------+------------+---------------+
|CE_BRAND_FLVR|   BRAND_NM|PKG_CAT|Pkg_Cat_Desc|    TSR_PCKG_NM|
+-------------+-----------+-------+------------+---------------+
|         3441|  RASPBERRY|   N56P|    500ML 6P|    .5L NRP 6P*|
|         3697|      GRAPE|   N20O|   20Z/600ML|  20z NRP 24L S|
|         3440|      LEMON|   N20O|   20Z/600ML|    20Z NRP 24L|
|         3440|      LEMON|   N56P|    500ML 6P|    .5L NRP 6P*|
|         3441|  RASPBERRY|   N20O|   20Z/600ML|.591L NRP 24L *|
|         3441|  RASPBERRY|   N56P|    500ML 6P|   .5L NRP 6P S|
|         3440|      LEMON|   N128|12Z/355M 8NR|   12Z NRP 8P F|
|         3554| STRAWBERRY|   N56P|    500ML 6P|   .5L NRP 6P S|
|         3554| STRAWBERRY|   N20O|   20Z/600ML|  20z NRP 24L S|
|         3554| STRAWBERRY|   N20O|   20Z/600ML|.591L NRP 24L *|
|         3554| STRAWBERRY|   N56P|    500ML 6P|     .5L NRP 6P|
|         3440|      LEMON|   N56P|    500ML 6P|   .5L NRP 6P S|
|         3440|      LEMO

In [12]:
# Criação de uma tabela fato de vendas
fact_table = beverage_df.select("DATE", "CE_BRAND_FLVR", "BRAND_NM", "Btlr_Org_LVL_C_Desc", "CHNL_GROUP",
                                "TRADE_GROUP_DESC", "$ Volume")
fact_table.show()

+--------+-------------+-----------+-------------------+------------------+----------------+--------+
|    DATE|CE_BRAND_FLVR|   BRAND_NM|Btlr_Org_LVL_C_Desc|        CHNL_GROUP|TRADE_GROUP_DESC|$ Volume|
+--------+-------------+-----------+-------------------+------------------+----------------+--------+
|1/1/2006|         3440|      LEMON|             CANADA|           LEISURE|   ENTERTAINMENT|   22.48|
|1/1/2006|         3440|      LEMON|          NORTHEAST|            SUPERS|        SERVICES|     100|
|1/1/2006|         3554| STRAWBERRY|          SOUTHEAST|         WORKPLACE|        SERVICES|   66.14|
|1/1/2006|         3441|  RASPBERRY|            MIDWEST| MASS MERCHANDISER|         GROCERY|   222.5|
|1/1/2006|         3440|      LEMON|               WEST| MASS MERCHANDISER|         GROCERY|   302.5|
|1/1/2006|         3440|      LEMON|            MIDWEST|OTHER SMALL STORES|         GROCERY|      10|
|1/1/2006|         3554| STRAWBERRY|             CANADA|      FOOD SERVICE|   ENTE

In [13]:
# Métricas agregadas para a tabela de resumo
summary_table = fact_table.groupBy("Btlr_Org_LVL_C_Desc", "TRADE_GROUP_DESC").agg(round(sum("$ Volume"), 2).alias("TotalVolume")).orderBy("Btlr_Org_LVL_C_Desc", col("TotalVolume").desc())
summary_table.show()

+-------------------+----------------+-----------+
|Btlr_Org_LVL_C_Desc|TRADE_GROUP_DESC|TotalVolume|
+-------------------+----------------+-----------+
|             CANADA|         GROCERY|  168703.45|
|             CANADA|        SERVICES|   83758.62|
|             CANADA|        ACADEMIC|    53126.0|
|             CANADA|   ENTERTAINMENT|   38697.67|
|             CANADA|  GOV & MILITARY|    1249.55|
|             CANADA|           OTHER|     972.83|
|        GREAT LAKES|         GROCERY|  380326.41|
|        GREAT LAKES|        SERVICES|  157807.08|
|        GREAT LAKES|        ACADEMIC|  152047.23|
|        GREAT LAKES|   ENTERTAINMENT|   46100.78|
|        GREAT LAKES|           OTHER|    3446.45|
|        GREAT LAKES|  GOV & MILITARY|    3102.27|
|            MIDWEST|         GROCERY|  326444.71|
|            MIDWEST|        SERVICES|  129269.54|
|            MIDWEST|        ACADEMIC|   88605.37|
|            MIDWEST|   ENTERTAINMENT|   54127.19|
|            MIDWEST|          

3 - What are the next steps, enhancements and future features that could be added to your solution after the MVP?
<i>Expected result: List of items with enhancements and features expected in the feature to improve proposed MVP.</i>


1. **Melhoria na Qualidade dos Dados:**
   - Tratar de forma mais robusta valores ausentes ou inválidos, e tratamento de possíveis outliers.

2. **Visualização de Dados:**
   - Desenvolver um painel de controle usando ferramentas de visualização como Tableau ou Power BI.
   - Criar gráficos interativos para facilitar a exploração dos dados.

3. **Integração com ML:**
   - Explorar a incorporação de modelos de aprendizado de máquina para análises preditivas.
   - Treinar modelos para prever tendências futuras com base nos dados históricos.

4. **Integração com cloud:**
   - Integrar com a Azure para fazer esse ETL em tempo real, ou em intervalos determinados, de maneira automatizada.

4 - Create queries/scripts that can answer following business questions using the data structure created:
Expected result: Data processes and results.

&nbsp;&rarr;4.1 What are the Top 3 Trade Groups (TRADE_GROUP_DESC) for each Region (Btlr_Org_LVL_C_Desc) in sales ($ Volume)?

&nbsp;&rarr;4.2 How much sales ($ Volume) each brand (BRAND_NM) achieved per month?

&nbsp;&rarr;4.3 Which are the lowest brand (BRAND_NM) in sales ($ Volume) for each region (Btlr_Org_LVL_C_Desc)?

<i>Important: The queries/scripts need to be based on your MVP created in item #2, not the source files.</i>

In [17]:
#4.1
# Criando uma especificação de janela
window_spec = Window.partitionBy("Btlr_Org_LVL_C_Desc").orderBy(col("TotalVolume").desc())

# Adicionando uma coluna de classificação
ranked_summary = summary_table.withColumn("rank", row_number().over(window_spec))

# Filtrando para obter os 3 melhores para cada região
top3_trade_groups = ranked_summary.filter(col("rank") <= 3).drop("rank")

# Mostrar o resultado
top3_trade_groups.show(30)

+-------------------+----------------+-----------+
|Btlr_Org_LVL_C_Desc|TRADE_GROUP_DESC|TotalVolume|
+-------------------+----------------+-----------+
|             CANADA|         GROCERY|  168703.45|
|             CANADA|        SERVICES|   83758.62|
|             CANADA|        ACADEMIC|    53126.0|
|        GREAT LAKES|         GROCERY|  380326.41|
|        GREAT LAKES|        SERVICES|  157807.08|
|        GREAT LAKES|        ACADEMIC|  152047.23|
|            MIDWEST|         GROCERY|  326444.71|
|            MIDWEST|        SERVICES|  129269.54|
|            MIDWEST|        ACADEMIC|   88605.37|
|          NORTHEAST|         GROCERY|  403748.11|
|          NORTHEAST|        SERVICES|  156027.64|
|          NORTHEAST|        ACADEMIC|  149591.89|
|          SOUTHEAST|         GROCERY|  416674.35|
|          SOUTHEAST|        SERVICES|  225958.15|
|          SOUTHEAST|        ACADEMIC|  126047.84|
|          SOUTHWEST|         GROCERY|  271838.33|
|          SOUTHWEST|        SE

In [15]:
#4.2
# Juntando fact_table e time_dimension na coluna 'DATE'
joined_table = fact_table.join(time_dimension, "DATE")

# Groupby por 'YEAR', 'MONTH', 'BRAND_NM' e calculando a soma de '$ Volume'
brand_monthly_sales = joined_table.groupBy("YEAR", "MONTH", "BRAND_NM").agg(round(sum("$ Volume"), 2).alias("TotalVolume"))

# Ordenando o resultado
brand_monthly_sales = brand_monthly_sales.orderBy("YEAR", "MONTH", col("TotalVolume").desc())

# Mostrando o resultado
brand_monthly_sales.show()

+----+-----+-----------+-----------+
|YEAR|MONTH|   BRAND_NM|TotalVolume|
+----+-----+-----------+-----------+
|2006|    1|      LEMON|   506459.5|
|2006|    1|  RASPBERRY|  393934.66|
|2006|    1| STRAWBERRY|  315945.67|
|2006|    2|      LEMON|   551533.2|
|2006|    2|  RASPBERRY|  410000.11|
|2006|    2| STRAWBERRY|  345737.67|
|2006|    3|      LEMON|  766466.77|
|2006|    3|  RASPBERRY|  587952.64|
|2006|    3| STRAWBERRY|  475427.74|
|2006|    3|      GRAPE|        7.5|
+----+-----+-----------+-----------+



In [16]:
#4.3
# Groupby de 'Btlr_Org_LVL_C_Desc', 'BRAND_NM' e calculando a soma de '$ Volume'
region_brand_sales = fact_table.groupBy("Btlr_Org_LVL_C_Desc", "BRAND_NM").agg(round(sum("$ Volume"), 2).alias("TotalVolume"))

# Usando a função de janela para classificar as marcas dentro de cada região com base em TotalVolume
window_spec = Window.partitionBy("Btlr_Org_LVL_C_Desc").orderBy(col("TotalVolume"))

# Adicionando uma coluna de classificação
ranked_brands = region_brand_sales.withColumn("rank", row_number().over(window_spec))

# Filtrando para obter a marca mais baixa para cada região
lowest_brand_by_region = ranked_brands.filter(col("rank") == 1).drop("rank")

# Ordenando o resultado
lowest_brand_by_region = lowest_brand_by_region.orderBy("Btlr_Org_LVL_C_Desc", col("TotalVolume").asc())

# Mostrando o resultado
lowest_brand_by_region.show()

+-------------------+-----------+-----------+
|Btlr_Org_LVL_C_Desc|   BRAND_NM|TotalVolume|
+-------------------+-----------+-----------+
|             CANADA| STRAWBERRY|   74109.49|
|        GREAT LAKES| STRAWBERRY|  208833.03|
|            MIDWEST| STRAWBERRY|  149226.55|
|          NORTHEAST| STRAWBERRY|  195638.73|
|          SOUTHEAST|  RASPBERRY|  224007.46|
|          SOUTHWEST|      GRAPE|        7.5|
|               WEST| STRAWBERRY|   134794.2|
+-------------------+-----------+-----------+

