In [1]:
import pandas as pd
import os
from typing import Tuple, List
from datetime import datetime

In [2]:
os.chdir("../..")

In [3]:
class DateColumns:
    TIMESTAMP_COLUMN = "order_purchase_timestamp"
    DATE_COLUMN = "purchase_date"
    
class DataRoot:
    ROOT = "data"
    ZONE = "raw"
    ORDERS = "olist_orders_dataset.parquet"
    

    @classmethod
    def return_root(cls):
        return os.path.join(cls.ROOT, cls.ZONE)

    @classmethod
    def return_orders_path(cls):
        return os.path.join(cls.return_root(), cls.ORDERS)

    

In [6]:
def get_orders_with_dates(orders_path: str) -> Tuple[List[datetime], pd.DataFrame]:
    df = pd.read_parquet(orders_path)
    df.loc[:, DateColumns.DATE_COLUMN] = pd.to_datetime(df[DateColumns.TIMESTAMP_COLUMN]).dt.date
    all_dates = df[DateColumns.DATE_COLUMN].sort_values().unique().tolist()
    return all_dates, df[["order_id", DateColumns.DATE_COLUMN]]

def add_date_column(df, orders_with_date):
    if not DateColumns.DATE_COLUMN in df.columns:
        print("Adding date columns to Dataframe...")
        df = df.merge(orders_with_date, how="inner", on="order_id")
    return df

# validating get_orders_with_dates

In [37]:
dates, orders_with_date = get_orders_with_dates(DataRoot.return_orders_path())
df_list = [orders_with_date[orders_with_date[DateColumns.DATE_COLUMN] == d] for d in dates]

len([x for x in df_list if len(x) > 0]), len(dates)

(634, 634)

## Revoming dates without payments or items

There are 18 days with orders but no items. And one day with an order without payment. 
I will remove those orders otherwise the future DAG will always fails at those dates as I don't have way to find the missing data. 

In [53]:
def filter_days_without_items_and_payments(dates):
    items = (
    pd.read_parquet(os.path.join(DataRoot.return_root(),  "olist_order_items_dataset.parquet"))
    .pipe(add_date_column, orders_with_date)
    )


    payments = (
        pd.read_parquet(os.path.join(DataRoot.return_root(),  "olist_order_payments_dataset.parquet"))
        .pipe(add_date_column, orders_with_date)
    )


    days_without_items = set(dates).difference(set(items.purchase_date))
    days_without_payments = set(dates).difference(set(payments.purchase_date))
    days_without_items_or_payments = days_without_items.union(days_without_payments)
    filtered_dates = list(set(dates).difference(days_without_items_or_payments))
    return filtered_dates

print("Total days that remain", len(filter_days_without_items_and_payments(dates)))

Adding date columns to Dataframe...
Adding date columns to Dataframe...
Total days that remain 615


## Pipeline

In [56]:
print("Getting dates to chunk data")
dates, orders_with_date = get_orders_with_dates(DataRoot.return_orders_path())
dates = filter_days_without_items_and_payments(dates)


datasets_to_chunks = [
    "olist_order_items_dataset.parquet",
    "olist_order_payments_dataset.parquet",
    "olist_orders_dataset.parquet"
]


print("Start chunking the data...")
for dataset in datasets_to_chunks:
    print(f"Creating chunks for {dataset}")
    
    complete_path = os.path.join(DataRoot.return_root(), dataset)
    folder = complete_path.split(".")[0]
    # read_df and create generator:

    df = (
            pd.read_parquet(complete_path)
            .pipe(add_date_column, orders_with_date)
        )

    df_list = (df[df[DateColumns.DATE_COLUMN] == d] for d in dates)

    # save each chunk file within dataset folder
    for i, df_ in enumerate(df_list):

        if len(df_[DateColumns.DATE_COLUMN]) == 0:
            print(dates[i])
            raise Exception("No rows for date")

        date_str = df_[DateColumns.DATE_COLUMN].iloc[0].strftime("%Y-%m-%d")
        if not os.path.exists(folder):
            os.makedirs(folder)
        filename = dataset.split(".")[0] + "_"  + date_str + ".parquet"
        df_.to_parquet(os.path.join(folder, filename))
    
    print("Removing origin dataset...")
    os.remove(complete_path)


Getting dates to chunk data
Adding date columns to Dataframe...
Adding date columns to Dataframe...
Start chunking the data...
Creating chunks for olist_order_items_dataset.parquet
Adding date columns to Dataframe...
Removing origin dataset...
Creating chunks for olist_order_payments_dataset.parquet
Adding date columns to Dataframe...
Removing origin dataset...
Creating chunks for olist_orders_dataset.parquet
Adding date columns to Dataframe...
Removing origin dataset...
