In [1]:
from __future__ import annotations
import pandas as pd

from pandas import DataFrame
import json
import numpy as np
from typing import Any, Dict, List
import pandasql as ps
import requests


In [2]:
from jinja2 import Template
def render_json_template(template_str: str, context: Dict[str, Any]) -> Dict[str, Any]:
    if not template_str:
        return {}
    rendered = Template(template_str).render(**context)
    if not rendered:
        return {}
    return json.loads(rendered)

In [14]:
def fetch_orders_set() -> DataFrame:
    api_url = "http://localhost:8000/orders"
    timeout_seconds =30

    headers = {
        "Accept": "application/json",
        "User-Agent": "unybarnd-airflow-ingestion",
    }

    try:
        response = requests.get(api_url, params={"order_goal":10}, headers=headers, timeout=timeout_seconds)
    except requests.RequestException as exc:
        raise RuntimeError("Failed to call Amazon orders API") from exc

    if response.status_code >= 400:
        raise RuntimeError(f"Amazon API responded with status {response.status_code}")

    try:
        payload = response.json()
    except json.JSONDecodeError as exc:
        raise ValueError("Amazon API returned invalid JSON") from exc

    data_rows = payload.get("data") or []
    if not isinstance(data_rows, list):
        raise ValueError("Amazon API payload 'data' should be a list")
    if not data_rows:
        raise ValueError("Amazon API returned no order rows")

    metadata = payload.get("metadata") or {}
    required_columns = [
        "order_id",
        "customer_id",
        "created_at",
        "processed_at",
        "currency",
        "total_price",
        "is_b2b"
    ]
    orders_df = pd.DataFrame(data_rows)[required_columns].drop_duplicates()
    if orders_df.empty:
        raise ValueError("Amazon API returned no usable order rows")

    for column in required_columns:
        if column not in orders_df.columns:
            orders_df[column] = None
    orders_df['created_at'] = pd.to_datetime(orders_df['created_at']).dt.date
    return orders_df



In [4]:
from pathlib import Path
base = Path.cwd()     

seeds_root = base / "airflow" / "dbt" / "hitex-case-study" / "seeds"
def load_csv_seed(file_name: str) -> DataFrame:
    file_path = seeds_root / file_name
    if not file_path.exists():
        raise FileNotFoundError(f"Seed file not found: {file_path}")
    df = pd.read_csv(file_path)
    if df.empty:
        raise ValueError(f"Seed file is empty: {file_path}")
    return df

In [15]:
customers_df = load_csv_seed("customer.csv")[["customer_id", "first_name", "last_name", "email" ,"address_json"]]
accounts_df = load_csv_seed("accounts.csv")[["account_id", "company_name", "company_email" ,"company_address"]]
orders_df = fetch_orders_set()


In [16]:
display(orders_df.head(5))

Unnamed: 0,order_id,customer_id,created_at,processed_at,currency,total_price,is_b2b
0,24c8b0e9-4ae6-4acc-9158-3d2ab3f38ca9,df8929ff-18aa-4aab-bb93-5db6c5152107,2025-09-21,2025-09-21T08:06:53Z,EUR,8.32,False
1,24c8b0e9-4ae6-4acc-9158-3d2ab3f38ca9,df8929ff-18aa-4aab-bb93-5db6c5152107,2025-09-21,2025-09-21T08:06:53Z,EUR,19.74,False
2,24c8b0e9-4ae6-4acc-9158-3d2ab3f38ca9,df8929ff-18aa-4aab-bb93-5db6c5152107,2025-09-21,2025-09-21T08:06:53Z,EUR,26.22,False
3,24c8b0e9-4ae6-4acc-9158-3d2ab3f38ca9,df8929ff-18aa-4aab-bb93-5db6c5152107,2025-09-21,2025-09-21T08:06:53Z,EUR,26.18,False
4,7c955946-e88a-488f-aec0-37a15c681166,76dc04f0-fd3f-4749-84d1-4055d1fd3e0a,2025-09-21,2025-09-21T08:06:53Z,EUR,18.52,False


In [None]:
now_berlin = pd.Timestamp.now(tz="Europe/Berlin")
execution_time = now_berlin

In [None]:
merged = (
    orders_df
    .merge(customers_df, on="customer_id", how="left")
    .merge(accounts_df, left_on="customer_id", right_on="account_id", how="left")
)

In [None]:
from IPython.display import display
display(merged.head(5))

In [None]:
filename = "sdbflab"
file_path = ""
file_upload_timestamp = ""

import uuid
result_df = pd.DataFrame({
    "ingested_at": now_berlin,  # same timestamp for all rows
    "amazon_order_id": merged["order_id"],
    "purchase_date": merged["created_at"],
    "last_update_date": merged["processed_at"],
    "order_status": "Dispatched",
    "fulfillment_channel": "AFN",
    "sales_channel": "Amazon.de",
    "buyer_name": np.where(
        merged["is_b2b"].fillna(False),
        merged["company_name"].fillna(""),
        (merged["first_name"].fillna("") + " " + merged["last_name"].fillna("")).str.strip()
    ),
    "buyer_email": merged["email"].combine_first(merged["company_email"]),
    "shipping_address_json": merged["address_json"].combine_first(merged["company_address"]),
    "currency": merged["currency"],
    # Mapping says "order_total : order_total" â€“ assuming you want total_price from orders_df:
    "order_total": merged["total_price"],
    "is_b2b": merged["is_b2b"],

    "ingestion_uuid": [str(uuid.uuid4()) for _ in range(len(merged))]
})



In [None]:
display(
    result_df.assign(
    load_at= execution_time,
    load_id = filename,
    source_file= file_path,
    source_ts= file_upload_timestamp
))

In [27]:
df = pd.read_csv("shopify_order_20250921_09.csv")
order_df = (
        df
        .groupby(["order_id","created_at","currency","customer_id","is_b2b"], as_index=False)
        .agg(
            product_count=("product_id", "nunique"),
            total_quantity=("quantity", "sum"),
            total_price=("total_price", "sum")
        )
    )

In [28]:
display(order_df)

Unnamed: 0,order_id,created_at,currency,customer_id,is_b2b,product_count,total_quantity,total_price
0,2b42b51c-f15b-4a0c-b8c0-172884f90b5c,2025-09-21T05:23:02+00:00,EUR,faccccc9-d167-4d5c-9af3-239073b8b0fa,True,9,29,286.04
1,3b8ba939-a1f4-4bc2-b7f5-6a6e575094b8,2025-09-21T00:16:31+00:00,EUR,15dcb417-d393-4cc0-8083-368e58490771,True,9,20,198.05
2,57e6a24a-ba36-439c-8a31-94689d554416,2025-09-21T05:31:39+00:00,EUR,35cddfe7-7ce1-471c-a107-c97de7c6eba6,False,4,11,117.16
3,6aa147d6-b470-49f5-a134-fa7dfd819e55,2025-09-21T07:45:14+00:00,EUR,e9b92c15-0f19-471e-a0c5-4ab4479737f7,False,7,19,182.19
4,9ce3529e-e843-4814-9358-c30976c0a791,2025-09-21T04:45:20+00:00,EUR,5b423267-0050-4cbd-8f5e-5310f3407942,False,3,15,102.45
5,ad3fb42f-cf14-463f-8da2-5b6e2d287573,2025-09-21T03:25:50+00:00,EUR,34edb8f6-965b-4cb6-a1a2-a17f3f9ca569,True,7,24,225.84
6,be12abf8-5285-481f-b64f-929475c77fe9,2025-09-21T00:40:30+00:00,EUR,8fb4214b-ac04-426a-baf5-791f46134ec6,False,5,18,174.51
7,dbabacc0-b176-42ee-b8d3-c48242e6fb8c,2025-09-21T02:09:10+00:00,EUR,41196d41-e4af-4d02-a6dd-1395e3358677,True,10,31,330.31
8,df12cb5c-e9ed-4afb-b30c-e12b05f46b9e,2025-09-21T02:45:34+00:00,EUR,be36e4cc-450d-4763-aecb-99857f64c369,False,10,33,269.65
9,faa8d9f5-5c18-4d3b-923e-df7e17c49309,2025-09-21T06:17:05+00:00,EUR,b6ee6aa5-4443-44ca-82f7-41e316a548d7,True,5,19,197.18
