In [1]:
import dask.dataframe as dd
import os
import gc

In [2]:
# Carregar os dados de vendas com calendário
sales_calendar = dd.read_parquet('../data/sales_calendar/')
# Carregar os dados de preços
item_prices = dd.read_parquet('../data/prices/')

In [3]:
# Converter yearweek para string nos dois DataFrames
sales_calendar['yearweek'] = sales_calendar['yearweek'].astype(str)
item_prices['yearweek'] = item_prices['yearweek'].astype(str)

In [4]:
# Fazer o merge dos DataFrames usando 'item', 'store_code', e 'yearweek' como chave
merged_data = sales_calendar.merge(item_prices, on=['item', 'store_code', 'yearweek'], how='left')
# Verificar as primeiras linhas do DataFrame resultante
print(merged_data.head())

                          id                 item     category_x  \
0  HOME_&_GARDEN_2_437_NYC_2  HOME_&_GARDEN_2_437  HOME_&_GARDEN   
1  HOME_&_GARDEN_2_437_NYC_2  HOME_&_GARDEN_2_437  HOME_&_GARDEN   
2    SUPERMARKET_1_052_NYC_2    SUPERMARKET_1_052    SUPERMARKET   
3    SUPERMARKET_1_052_NYC_2    SUPERMARKET_1_052    SUPERMARKET   
4    SUPERMARKET_1_053_NYC_2    SUPERMARKET_1_053    SUPERMARKET   

        department   store store_code    region       d  sales yearweek  \
0  HOME_&_GARDEN_2  Harlem      NYC_2  New York  d_1053      0   201351   
1  HOME_&_GARDEN_2  Harlem      NYC_2  New York  d_1053      0   201351   
2    SUPERMARKET_1  Harlem      NYC_2  New York  d_1053      1   201351   
3    SUPERMARKET_1  Harlem      NYC_2  New York  d_1053      1   201351   
4    SUPERMARKET_1  Harlem      NYC_2  New York  d_1053      0   201351   

        date event     category_y  sell_price  
0 2013-12-16  <NA>  HOME_&_GARDEN      9.9625  
1 2013-12-16  <NA>  HOME_&_GARDEN      9.962

In [5]:
merged_data.to_parquet('../data/ds_market/', write_index=False)

In [6]:
merged_data.isnull().sum().compute()

id                   0
item                 0
category_x           0
department           0
store                0
store_code           0
region               0
d                    0
sales                0
yearweek             0
date                 0
event         57561115
category_y    12513532
sell_price    12513532
dtype: int64

In [None]:
# Verificar se há valores ausentes nas colunas principais
missing_values = merged_data.isnull().sum().compute()
print("Valores ausentes em cada coluna:\n", missing_values)

In [None]:
# Verificar duplicatas com base em 'item', 'store_code', e 'yearweek'
duplicates = merged_data.duplicated(subset=['item', 'store_code', 'yearweek']).compute()
print(f"Número de duplicatas: {duplicates.sum()}")

In [None]:
# Descrever os dados para verificar os intervalos de valores
print(merged_data[['sell_price', 'sales']].describe().compute())

In [None]:
# Liberar objetos não utilizados e memória
gc.collect()
# Definir o caminho do diretório
output_dir = '../data/merged_sales_with_calendar_and_prices'
# Criar o diretório se ele não existir
os.makedirs(output_dir, exist_ok=True)
# Aumentar o número de partições se o arquivo ainda for muito grande
merged_data_repartitioned = merged_data.repartition(npartitions=15)
# Salvar as partições uma por vez (removendo 'write_index')
for i, partition in enumerate(merged_data_repartitioned.to_delayed()):
    partition.compute().to_parquet(f'{output_dir}/part_{i}.parquet')

In [None]:
# Verificar o tamanho de cada partição
print(merged_data_repartitioned.map_partitions(len).compute())