<a href="https://colab.research.google.com/github/juliansilvera89/Automated-Data-pipeline---ENG/blob/main/Automated_datapipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyarrow fastparquet faker

Collecting fastparquet
  Downloading fastparquet-2025.12.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (4.4 kB)
Collecting faker
  Downloading faker-40.1.0-py3-none-any.whl.metadata (16 kB)
Downloading fastparquet-2025.12.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m32.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading faker-40.1.0-py3-none-any.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m67.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker, fastparquet
Successfully installed faker-40.1.0 fastparquet-2025.12.0


In [3]:
import os

base_path = "/content/data_pipeline"

folders = [
    "data/raw",
    "data/processed",
    "data/warehouse",
    "scripts",
    "logs"
]

for folder in folders:
    os.makedirs(os.path.join(base_path, folder), exist_ok=True)

In [29]:
!pip install faker

import pandas as pd
import random
from faker import Faker

fake = Faker()

n_rows = 20000
data = []

for i in range(1, n_rows + 1):
    order_date = random.choice([
        fake.date_this_decade().strftime("%Y-%m-%d"),
        fake.date_this_decade().strftime("%d/%m/%Y"),
        None,
        "invalid_date"
    ])

    unit_price = random.choice([
        round(random.uniform(5, 500), 2),
        f"${round(random.uniform(5, 500), 2)}",
        None,
        "N/A"
    ])

    quantity = random.choice([1, 2, 3, None, -1])

    data.append({
        "order_id": random.randint(1, 15000),  # duplicados
        "order_date": order_date,
        "customer_id": fake.uuid4(),
        "country": random.choice(["USA", "usa", "United States", None]),
        "product_category": random.choice(["Electronics", "electronics", "Home", "HOME", None]),
        "quantity": quantity,
        "unit_price": unit_price,
        "currency": random.choice(["USD", "usd", None])
    })

df_raw = pd.DataFrame(data)

raw_path = f"{base_path}/data/raw/orders_raw.csv"
df_raw.to_csv(raw_path, index=False)

df_raw.head(), raw_path



(   order_id    order_date                           customer_id  \
 0      2754  invalid_date  06d38f89-577d-484a-b2be-9466a05d316e   
 1      1505    2020-02-07  d34fc710-06ad-4d90-9b01-7de0b761d8fc   
 2       864    11/06/2025  31f44b1c-3db3-4b52-b525-2363f8c1feee   
 3      1799  invalid_date  f017961a-2546-46fd-b798-41e7fdd4d82a   
 4     13860    19/12/2022  b99543c6-c792-4909-b67e-349b2193647e   
 
          country product_category  quantity unit_price currency  
 0            usa      electronics       NaN     197.41      USD  
 1            USA             Home      -1.0     102.76      USD  
 2            USA             Home       2.0     $63.59     None  
 3  United States             Home       NaN       None     None  
 4           None      electronics       NaN    $301.81     None  ,
 '/content/data_pipeline/data/raw/orders_raw.csv')

In [6]:
df_raw.isna().sum()

Unnamed: 0,0
order_id,0
order_date,4980
customer_id,0
country,5031
product_category,3949
quantity,3955
unit_price,4979
currency,6756


In [7]:
import logging
import os
from datetime import datetime

log_path = "/content/data_pipeline/logs/ingestion.log"

logging.basicConfig(
    filename=log_path,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

logging.info("Inicio del proceso de ingesta")
log_path

'/content/data_pipeline/logs/ingestion.log'

In [12]:
import pandas as pd

raw_file = "/content/data_pipeline/data/raw/orders_raw.csv"

expected_columns = {
    "order_id",
    "order_date",
    "customer_id",
    "country",
    "product_category",
    "quantity",
    "unit_price",
    "currency"
}

try:
    if not os.path.exists(raw_file):
        raise FileNotFoundError("Archivo RAW no encontrado")

    df_ingest = pd.read_csv(raw_file)

    if df_ingest.empty:
        raise ValueError("El archivo está vacío")

    if not expected_columns.issubset(df_ingest.columns):
        raise ValueError("Faltan columnas esperadas")

    logging.info(f"Ingesta OK - filas: {len(df_ingest)}")

except Exception as e:
    logging.error(f"Error en ingesta: {e}")
    raise

In [9]:
snapshot_path = f"/content/data_pipeline/data/raw/orders_snapshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df_ingest.to_csv(snapshot_path, index=False)

logging.info(f"Snapshot guardado en {snapshot_path}")

snapshot_path

'/content/data_pipeline/data/raw/orders_snapshot_20260104_230140.csv'

In [15]:
!cat /content/data_pipeline/logs/ingestion.log


2026-01-04 23:03:58,985 - INFO - Inicio del proceso de ingesta
2026-01-04 23:03:59,122 - INFO - Ingesta OK - filas: 20000
2026-01-04 23:03:59,124 - INFO - Snapshot guardado en /content/data_pipeline/data/raw/orders_snapshot_20260104_230359.csv


In [11]:
import os

log_dir = "/content/data_pipeline/logs"
os.makedirs(log_dir, exist_ok=True)

os.listdir("/content/data_pipeline")

['scripts', 'logs', 'data']

In [14]:
import logging
import os
from datetime import datetime
import pandas as pd

# Paths
raw_file = "/content/data_pipeline/data/raw/orders_raw.csv"
log_path = "/content/data_pipeline/logs/ingestion.log"

# Crear logger explícito
logger = logging.getLogger("ingestion_logger")
logger.setLevel(logging.INFO)

# Evitar logs duplicados
if logger.hasHandlers():
    logger.handlers.clear()

file_handler = logging.FileHandler(log_path)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)

logger.addHandler(file_handler)

expected_columns = {
    "order_id",
    "order_date",
    "customer_id",
    "country",
    "product_category",
    "quantity",
    "unit_price",
    "currency"
}

try:
    logger.info("Inicio del proceso de ingesta")

    if not os.path.exists(raw_file):
        raise FileNotFoundError("Archivo RAW no encontrado")

    df_ingest = pd.read_csv(raw_file)

    if df_ingest.empty:
        raise ValueError("El archivo está vacío")

    if not expected_columns.issubset(df_ingest.columns):
        raise ValueError("Faltan columnas esperadas")

    snapshot_path = f"/content/data_pipeline/data/raw/orders_snapshot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    df_ingest.to_csv(snapshot_path, index=False)

    logger.info(f"Ingesta OK - filas: {len(df_ingest)}")
    logger.info(f"Snapshot guardado en {snapshot_path}")

    snapshot_path

except Exception as e:
    logger.error(f"Error en ingesta: {e}")
    raise

INFO:ingestion_logger:Inicio del proceso de ingesta
INFO:ingestion_logger:Ingesta OK - filas: 20000
INFO:ingestion_logger:Snapshot guardado en /content/data_pipeline/data/raw/orders_snapshot_20260104_230359.csv


In [16]:
import pandas as pd
import numpy as np
import os

# Paths
snapshot_file = sorted([
    f"/content/data_pipeline/data/raw/{f}"
    for f in os.listdir("/content/data_pipeline/data/raw")
    if f.startswith("orders_snapshot")
])[-1]

processed_path = "/content/data_pipeline/data/processed/orders_clean.parquet"

# Load
df = pd.read_csv(snapshot_file)

# ---------------------------
# 1. FECHAS
# ---------------------------
df["order_date"] = pd.to_datetime(
    df["order_date"],
    errors="coerce",
    infer_datetime_format=True
)

# ---------------------------
# 2. COUNTRY
# ---------------------------
df["country"] = df["country"].str.upper()
df["country"] = df["country"].replace({
    "UNITED STATES": "USA",
    "USA": "USA"
})

# ---------------------------
# 3. CATEGORY
# ---------------------------
df["product_category"] = df["product_category"].str.upper()

# ---------------------------
# 4. PRICE
# ---------------------------
df["unit_price"] = (
    df["unit_price"]
    .astype(str)
    .str.replace("$", "", regex=False)
)

df["unit_price"] = pd.to_numeric(df["unit_price"], errors="coerce")

# ---------------------------
# 5. QUANTITY
# ---------------------------
df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce")
df.loc[df["quantity"] <= 0, "quantity"] = 1
df["quantity"] = df["quantity"].fillna(1)

# ---------------------------
# 6. DUPLICADOS
# ---------------------------
df = df.drop_duplicates(subset=["order_id"])

# ---------------------------
# 7. TOTAL
# ---------------------------
df["total_amount"] = df["quantity"] * df["unit_price"]

# ---------------------------
# 8. SAVE PARQUET
# ---------------------------
os.makedirs("/content/data_pipeline/data/processed", exist_ok=True)
df.to_parquet(processed_path, index=False)

processed_path, df.head()

  df["order_date"] = pd.to_datetime(


('/content/data_pipeline/data/processed/orders_clean.parquet',
    order_id order_date                           customer_id country  \
 0     11795 2022-02-19  59b25339-105a-455b-a007-4c983d2c0218     USA   
 1      5626        NaT  e34bda4d-c557-451c-b3cf-ee97eee7c58a     NaN   
 2       988        NaT  54ac5803-c640-447a-9ebb-db264f66fb57     USA   
 3      6471 2025-02-02  4e2833a2-44ca-45d9-b93a-9f0d92433daf     USA   
 4      6662 2022-09-10  56205470-b3ea-482e-a983-4e991304af9e     USA   
 
   product_category  quantity  unit_price currency  total_amount  
 0             HOME       2.0      309.49      NaN        618.98  
 1      ELECTRONICS       1.0         NaN      usd           NaN  
 2             HOME       1.0      437.48      USD        437.48  
 3      ELECTRONICS       1.0      369.71      USD        369.71  
 4             HOME       3.0      471.17      USD       1413.51  )

In [18]:
df.dtypes


Unnamed: 0,0
order_id,int64
order_date,datetime64[ns]
customer_id,object
country,object
product_category,object
quantity,float64
unit_price,float64
currency,object
total_amount,float64


In [19]:
df.isna().sum()

Unnamed: 0,0
order_id,0
order_date,8306
customer_id,0
country,2799
product_category,2141
quantity,0
unit_price,5510
currency,3702
total_amount,5510


In [20]:
df.describe()


Unnamed: 0,order_id,order_date,quantity,unit_price,total_amount
count,11032.0,2726,11032.0,5522.0,5522.0
mean,7464.100707,2023-01-06 19:42:44.636830464,1.598441,252.349654,407.821208
min,1.0,2020-01-02 00:00:00,1.0,5.25,5.25
25%,3700.75,2021-07-03 06:00:00,1.0,129.8025,169.82
50%,7453.5,2023-01-11 12:00:00,1.0,254.59,330.635
75%,11201.25,2024-07-09 12:00:00,2.0,374.27,496.6925
max,15000.0,2026-01-02 00:00:00,3.0,499.92,1495.47
std,4330.452053,,0.794842,141.982183,326.827809


In [21]:
import pandas as pd
import sqlite3
import os

# Paths
parquet_file = "/content/data_pipeline/data/processed/orders_clean.parquet"
dw_path = "/content/data_pipeline/data/warehouse/ecommerce_dw.db"

# Carga parquet
df = pd.read_parquet(parquet_file)

# Crea DW
os.makedirs("/content/data_pipeline/data/warehouse", exist_ok=True)
conn = sqlite3.connect(dw_path)

# Carga a la tabla
df.to_sql("fact_orders", conn, if_exists="replace", index=False)

conn.close()

dw_path, df.head()

('/content/data_pipeline/data/warehouse/ecommerce_dw.db',
    order_id order_date                           customer_id country  \
 0     11795 2022-02-19  59b25339-105a-455b-a007-4c983d2c0218     USA   
 1      5626        NaT  e34bda4d-c557-451c-b3cf-ee97eee7c58a    None   
 2       988        NaT  54ac5803-c640-447a-9ebb-db264f66fb57     USA   
 3      6471 2025-02-02  4e2833a2-44ca-45d9-b93a-9f0d92433daf     USA   
 4      6662 2022-09-10  56205470-b3ea-482e-a983-4e991304af9e     USA   
 
   product_category  quantity  unit_price currency  total_amount  
 0             HOME       2.0      309.49     None        618.98  
 1      ELECTRONICS       1.0         NaN      usd           NaN  
 2             HOME       1.0      437.48      USD        437.48  
 3      ELECTRONICS       1.0      369.71      USD        369.71  
 4             HOME       3.0      471.17      USD       1413.51  )

In [22]:
import sqlite3

conn = sqlite3.connect("/content/data_pipeline/data/warehouse/ecommerce_dw.db")

pd.read_sql("SELECT COUNT(*) AS total_rows FROM fact_orders", conn)

Unnamed: 0,total_rows
0,11032


In [23]:
# Ventas totales

pd.read_sql("""
SELECT
    ROUND(SUM(total_amount), 2) AS total_sales
FROM fact_orders
""", conn)

Unnamed: 0,total_sales
0,2251988.71


In [25]:
# Ventas por país

pd.read_sql("""
SELECT
    country,
    ROUND(SUM(total_amount), 2) AS sales
FROM fact_orders
GROUP BY country
ORDER BY sales DESC
""", conn)

Unnamed: 0,country,sales
0,USA,1673005.32
1,,578983.39


In [31]:
#Por categorias

pd.read_sql("""
SELECT
    product_category,
    COUNT(*) AS orders,
    ROUND(SUM(total_amount), 2) AS sales
FROM fact_orders
GROUP BY product_category
ORDER BY sales DESC
""", conn)

Unnamed: 0,product_category,orders,sales
0,HOME,4445,931222.55
1,ELECTRONICS,4446,895161.28
2,,2141,425604.88


In [32]:
#Ticker promedio

pd.read_sql("""
SELECT
    ROUND(AVG(total_amount), 2) AS avg_ticket
FROM fact_orders
""", conn)



Unnamed: 0,avg_ticket
0,407.82


In [33]:
import pandas as pd
import numpy as np
import sqlite3
import logging
import os
from datetime import datetime

# -----------------------
# CONFIG
# -----------------------
BASE_PATH = "/content/data_pipeline"
RAW_PATH = f"{BASE_PATH}/data/raw/orders_raw.csv"
PROCESSED_PATH = f"{BASE_PATH}/data/processed/orders_clean.parquet"
DW_PATH = f"{BASE_PATH}/data/warehouse/ecommerce_dw.db"
LOG_PATH = f"{BASE_PATH}/logs/pipeline.log"

os.makedirs(f"{BASE_PATH}/logs", exist_ok=True)

# -----------------------
# LOGGER
# -----------------------
logger = logging.getLogger("pipeline")
logger.setLevel(logging.INFO)

if logger.hasHandlers():
    logger.handlers.clear()

handler = logging.FileHandler(LOG_PATH)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

# -----------------------
# INGEST
# -----------------------
def ingest():
    logger.info("INGESTA - inicio")

    if not os.path.exists(RAW_PATH):
        raise FileNotFoundError("RAW no encontrado")

    df = pd.read_csv(RAW_PATH)

    if df.empty:
        raise ValueError("RAW vacío")

    logger.info(f"INGESTA - filas: {len(df)}")
    return df

# -----------------------
# TRANSFORM
# -----------------------
def transform(df):
    logger.info("TRANSFORM - inicio")

    df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
    df["country"] = df["country"].str.upper().replace({"UNITED STATES": "USA"})
    df["product_category"] = df["product_category"].str.upper()

    df["unit_price"] = (
        df["unit_price"]
        .astype(str)
        .str.replace("$", "", regex=False)
    )
    df["unit_price"] = pd.to_numeric(df["unit_price"], errors="coerce")

    df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce")
    df.loc[df["quantity"] <= 0, "quantity"] = 1
    df["quantity"] = df["quantity"].fillna(1)

    df = df.drop_duplicates(subset=["order_id"])
    df["total_amount"] = df["quantity"] * df["unit_price"]

    os.makedirs(f"{BASE_PATH}/data/processed", exist_ok=True)
    df.to_parquet(PROCESSED_PATH, index=False)

    logger.info(f"TRANSFORM - guardado parquet ({len(df)})")
    return df

# -----------------------
# LOAD DW
# -----------------------
def load_dw(df):
    logger.info("DW - carga inicio")

    os.makedirs(f"{BASE_PATH}/data/warehouse", exist_ok=True)
    conn = sqlite3.connect(DW_PATH)

    df.to_sql("fact_orders", conn, if_exists="replace", index=False)
    conn.close()

    logger.info("DW - carga finalizada")

# -----------------------
# ORQUESTADOR
# -----------------------
def run_pipeline():
    logger.info("PIPELINE START")
    df_raw = ingest()
    df_clean = transform(df_raw)
    load_dw(df_clean)
    logger.info("PIPELINE SUCCESS")

# -----------------------
# RUN
# -----------------------
try:
    run_pipeline()
    "PIPELINE EJECUTADO OK"
except Exception as e:
    logger.error(f"PIPELINE FAILED: {e}")
    raise

INFO:pipeline:PIPELINE START
INFO:pipeline:INGESTA - inicio
INFO:pipeline:INGESTA - filas: 20000
INFO:pipeline:TRANSFORM - inicio
  df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["total_amount"] = df["quantity"] * df["unit_price"]
INFO:pipeline:TRANSFORM - guardado parquet (11040)
INFO:pipeline:DW - carga inicio
INFO:pipeline:DW - carga finalizada
INFO:pipeline:PIPELINE SUCCESS


In [34]:
!cat /content/data_pipeline/logs/pipeline.log

2026-01-05 00:41:02,525 - INFO - PIPELINE START
2026-01-05 00:41:02,526 - INFO - INGESTA - inicio
2026-01-05 00:41:02,601 - INFO - INGESTA - filas: 20000
2026-01-05 00:41:02,603 - INFO - TRANSFORM - inicio
2026-01-05 00:41:02,730 - INFO - TRANSFORM - guardado parquet (11040)
2026-01-05 00:41:02,731 - INFO - DW - carga inicio
2026-01-05 00:41:02,898 - INFO - DW - carga finalizada
2026-01-05 00:41:02,901 - INFO - PIPELINE SUCCESS


In [35]:
import pandas as pd
import sqlite3
import logging
import os
from datetime import datetime, timedelta

# -----------------------
# CONFIG
# -----------------------
BASE_PATH = "/content/data_pipeline"
RAW_PATH = f"{BASE_PATH}/data/raw/orders_raw.csv"
PROCESSED_BASE = f"{BASE_PATH}/data/processed"
DW_PATH = f"{BASE_PATH}/data/warehouse/ecommerce_dw.db"
LOG_PATH = f"{BASE_PATH}/logs/scheduler.log"

os.makedirs(f"{BASE_PATH}/logs", exist_ok=True)
os.makedirs(PROCESSED_BASE, exist_ok=True)

# -----------------------
# LOGGER
# -----------------------
logger = logging.getLogger("scheduler")
logger.setLevel(logging.INFO)

if logger.hasHandlers():
    logger.handlers.clear()

handler = logging.FileHandler(LOG_PATH)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

# -----------------------
# PIPELINE FUNCTIONS
# -----------------------
def ingest():
    df = pd.read_csv(RAW_PATH)
    return df

def transform(df, run_date):
    df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
    df["country"] = df["country"].str.upper().replace({"UNITED STATES": "USA"})
    df["product_category"] = df["product_category"].str.upper()

    df["unit_price"] = (
        df["unit_price"]
        .astype(str)
        .str.replace("$", "", regex=False)
    )
    df["unit_price"] = pd.to_numeric(df["unit_price"], errors="coerce")

    df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce")
    df.loc[df["quantity"] <= 0, "quantity"] = 1
    df["quantity"] = df["quantity"].fillna(1)

    df = df.drop_duplicates(subset=["order_id"])
    df["total_amount"] = df["quantity"] * df["unit_price"]

    parquet_path = f"{PROCESSED_BASE}/orders_{run_date}.parquet"
    df.to_parquet(parquet_path, index=False)

    return df, parquet_path

def load_dw(df):
    conn = sqlite3.connect(DW_PATH)
    df.to_sql("fact_orders", conn, if_exists="replace", index=False)
    conn.close()

# -----------------------
# ORQUESTADOR
# -----------------------
def run_pipeline(run_date):
    logger.info(f"PIPELINE START - run_date={run_date}")

    df_raw = ingest()
    df_clean, parquet_path = transform(df_raw, run_date)
    load_dw(df_clean)

    logger.info(f"PIPELINE SUCCESS - parquet={parquet_path}")

# -----------------------
# SIMULADOR DIARIO
# -----------------------
start_date = datetime(2024, 1, 1)
days = 3  # simulamos 3 días

for i in range(days):
    run_date = (start_date + timedelta(days=i)).strftime("%Y-%m-%d")
    run_pipeline(run_date)

"SIMULACIÓN DIARIA COMPLETADA"

INFO:scheduler:PIPELINE START - run_date=2024-01-01
  df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["total_amount"] = df["quantity"] * df["unit_price"]
INFO:scheduler:PIPELINE SUCCESS - parquet=/content/data_pipeline/data/processed/orders_2024-01-01.parquet
INFO:scheduler:PIPELINE START - run_date=2024-01-02
  df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["total_amount"] = df["quantity"] * df["unit_price"]
INFO:sch

'SIMULACIÓN DIARIA COMPLETADA'

In [36]:
!ls /content/data_pipeline/data/processed


orders_2024-01-01.parquet  orders_2024-01-03.parquet
orders_2024-01-02.parquet  orders_clean.parquet


In [37]:
!cat /content/data_pipeline/logs/scheduler.log


2026-01-05 00:46:33,026 - INFO - PIPELINE START - run_date=2024-01-01
2026-01-05 00:46:33,282 - INFO - PIPELINE SUCCESS - parquet=/content/data_pipeline/data/processed/orders_2024-01-01.parquet
2026-01-05 00:46:33,288 - INFO - PIPELINE START - run_date=2024-01-02
2026-01-05 00:46:33,555 - INFO - PIPELINE SUCCESS - parquet=/content/data_pipeline/data/processed/orders_2024-01-02.parquet
2026-01-05 00:46:33,561 - INFO - PIPELINE START - run_date=2024-01-03
2026-01-05 00:46:33,829 - INFO - PIPELINE SUCCESS - parquet=/content/data_pipeline/data/processed/orders_2024-01-03.parquet
