# DSMarket - First look at input files

In [None]:
# Instalamos libreria gdown para obtener los archivos origenes de google drive.
!pip install gdown

## 1. Importing libraries

In [None]:
import numpy as np
import pandas as pd
import gdown

pd.options.display.float_format = '{:,.2f}'.format
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)

## 2. Paths and directories

In [None]:
sales_data_path = "https://drive.google.com/uc?id=1sbXj2mTiaHJC_1hU_blmDiU4sxy53-kN"
sales_file = "item_sales.csv"
calendar_data_path = "https://drive.google.com/uc?id=1NIqmTAKSoN3mN5MUbs-K5WXv-10srHPR"
calendar_file = "daily_calendar_with_events.csv"
prices_data_path = "https://drive.google.com/uc?id=1JHVZJ9eeqtGxPWs3vF3beEnuGVY2o8Ed"
prices_file = "item_prices.csv"


## 3. Import files

### 3A. Sales data

In [None]:
gdown.download(sales_data_path, sales_file, quiet=False)
pd_sales = pd.read_csv(sales_file)

In [None]:
print("The shape of the sales datataframe is :", pd_sales.shape)

In [None]:
pd_sales.head()

In [None]:
pd_sales['store_code'].unique()

In [None]:
pd_sales.info(verbose=True)

How many **stores** are there in the data?

In [None]:
print("There are {} stores in the data, and the number of registers per store are: ".format(pd_sales.store.nunique()))
pd_sales.store.value_counts()

How many **departments**?

In [None]:
print("There are {} departments in the data, and the number of registers per department are: ".format(pd_sales.item.nunique()))
pd_sales.item.value_counts()

And **items**?

In [None]:
print("There are {} departments in the data, and the number of registers per item are: ".format(pd_sales.department.nunique()))
pd_sales.department.value_counts()

Do we have one register per id? In other words, can two registers have the same id?

In [None]:
print(pd_sales.shape)
print(pd_sales.id.drop_duplicates().shape)

In [None]:
pd_sales[pd_sales.id == 'ACCESORIES_1_002_NYC_1'].T

### 3B. Calendar data

In [None]:
gdown.download(calendar_data_path, calendar_file, quiet=False)
pd_calendar = pd.read_csv(calendar_file)

In [None]:
print("The shape of the calendar datataframe is :", pd_calendar.shape)

In [None]:
pd_calendar.head()

In [None]:
pd_calendar.info(verbose=True)

In [None]:
print("The range of dates available are: {} - {}".format(pd_calendar['date'].min(),pd_calendar['date'].max()))

What sort of events do we have?

In [None]:
pd_calendar.event.value_counts()

We don't have a lot of events, but we can think about including additional ones in a later stage (it might help!)

### 3C. Prices data

In [None]:
gdown.download(prices_data_path, prices_file, quiet=False)
pd_prices = pd.read_csv(prices_file)

In [None]:
print("The shape of the prices datataframe is :", pd_prices.shape)

In [None]:
pd_prices.head()

In [None]:
pd_prices.info(verbose=True)

Same number of items in the prices data?

In [None]:
print("There are {} items in the data, and the number of registers per item are: ".format(pd_prices.item.nunique()))
pd_prices.item.value_counts()

The number of items does match, but there are many registers per item. It seems that prices per item can change with time

Let's take a loook at the variables distribution of the prices dataframe

In [None]:
pd_prices.describe()

In [None]:
pd_prices.describe(include=['object'])

In [None]:
pd_calendar.head()

In [None]:
from datetime import datetime, timedelta
def weekyearnum(dt):
    return dt.strftime("%Y%W")

def myweeyearknum(dt):
    offsetdt = dt + timedelta(days=+2);  # you add 3 days to Mon to get to Thu
    return weekyearnum(offsetdt)

def weeknum(dt):
    return dt.isocalendar()[1]

def myweeknum(dt):
    offsetdt = dt + timedelta(days=+2);  # you add 3 days to Mon to get to Thu
    return weeknum(offsetdt)

In [None]:
## Merge
pd_calendar['date'] = pd.to_datetime(pd_calendar['date'], format = "%Y-%m-%d")

pd_calendar['yearweek'] = pd_calendar['date'].apply(lambda x: myweeyearknum(x))

In [None]:
pd_calendar

In [None]:
pd_calendar['yearweek'] = pd_calendar['yearweek'].astype(float)
pd_merge = pd.merge(pd_prices,pd_calendar[['d','yearweek']], how="left",
                   left_on = 'yearweek', right_on = 'yearweek')

pd_merge.info()

In [None]:
pd_sales.set_index(['id','item','category','department','store','store_code','region']).head()

In [None]:
pd_calendar

In [None]:
pd_sales['region'].unique()

In [None]:
# 1. Obtener columnas de días (asegúrate de que sean string para comparar)
day_columns = pd_sales.loc[:, 'd_1':].columns.tolist()

# 2. Filtrar el calendario solo a los días presentes en pd_sales
filtered_calendar = pd_calendar[pd_calendar['d'].isin(day_columns)].copy()

# 3. Reordenar el calendario según el orden de las columnas en pd_sales
filtered_calendar = filtered_calendar.set_index('d').loc[day_columns].reset_index()

# 4. Calcular ventas por región para las columnas de días
sales_ny = pd_sales[pd_sales['region'] == 'New York'].loc[:, day_columns].sum(axis=0)
sales_bo = pd_sales[pd_sales['region'] == 'Boston'].loc[:, day_columns].sum(axis=0)
sales_ph = pd_sales[pd_sales['region'] == 'Philadelphia'].loc[:, day_columns].sum(axis=0)

# 5. Convertir esas series a DataFrame y agregar columna 'd'
state_sales_df = pd.DataFrame({
    'd': day_columns,
    'sales_ny': sales_ny.values,
    'sales_bo': sales_bo.values,
    'sales_ph': sales_ph.values
})

# 6. Merge con calendario por columna 'd'
state_sales_df = state_sales_df.merge(filtered_calendar, on='d', how='left')
state_sales_df.drop(columns=['d', 'weekday', 'event'], inplace=True)

In [None]:
state_sales_df

In [None]:
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import expr, col

# spark = SparkSession.builder.appName("SalesProcessing").getOrCreate()

# # Leer datos
# sales_df = spark.createDataFrame(pd_sales)
# calendar_df = spark.createDataFrame(pd_calendar)
# prices_df = spark.createDataFrame(pd_prices)

# # Convertir de wide a long usando 'stack'
# num_days = 1913  # o el número de columnas d_*
# stack_expr = ", ".join([f"'d_{i}', d_{i}" for i in range(1, num_days + 1)])
# sales_long = sales_df.selectExpr(
#     "id", "item", "category", "department", "store", "store_code", "region",
#     f"stack({num_days}, {stack_expr}) as (d, qty_sell)"
# )

# # Unir con calendario
# sales_long = sales_long.join(calendar_df.select("d", "date", "yearweek"), on="d", how="left")

# # Unir con precios
# sales_long = sales_long.join(
#     prices_df,
#     on=["item", "category", "store_code", "yearweek"],
#     how="left"
# )

# # Calcular total vendido
# sales_long = sales_long.withColumn("total_sell", col("qty_sell") * col("sell_price"))

# # Guardar resultado
# #sales_long.write.parquet("final_sales.parquet", mode="overwrite")


In [None]:
# # Filtrar qty_sell para que seleccione registros mayores que 0

# sales_long = sales_long.filter(col("qty_sell") > 0)

In [None]:
# sales_long.show()