In [None]:
import requests
import pyodbc
import datetime
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

# CONFIG
ETSY_CLIENT_ID = "redacted"
ETSY_CLIENT_SECRET = "redacted"
ETSY_REFRESH_TOKEN = "redacted"
ETSY_SHOP_ID = "redacted"

GELATO_API_KEY = "redacted"

SQL_CONN_STR = (
    "DRIVER={ODBC Driver 17 for SQL Server};"
    "SERVER=localhost\\SQLEXPRESS;"
    "DATABASE=EtsyGelatoDB;"
    "Trusted_Connection=yes;"
)

# -------------------------------
# TASKS
# -------------------------------

@task(retries=3, retry_delay_seconds=60)
def get_etsy_access_token():
    url = "https://api.etsy.com/v3/public/oauth/token"
    data = {
        "grant_type": "refresh_token",
        "client_id": ETSY_CLIENT_ID,
        "refresh_token": ETSY_REFRESH_TOKEN
    }
    r = requests.post(url, auth=(ETSY_CLIENT_ID, ETSY_CLIENT_SECRET), data=data)
    r.raise_for_status()
    return r.json()["access_token"]


@task(retries=3, retry_delay_seconds=60, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=24))
def get_etsy_receipts(access_token: str):
    url = f"https://openapi.etsy.com/v3/application/shops/{ETSY_SHOP_ID}/receipts"
    headers = {"Authorization": f"Bearer {access_token}"}
    r = requests.get(url, headers=headers)
    r.raise_for_status()
    return r.json().get("results", [])


@task(retries=3, retry_delay_seconds=60)
def get_etsy_transactions(access_token: str):
    url = f"https://openapi.etsy.com/v3/application/shops/{ETSY_SHOP_ID}/transactions"
    headers = {"Authorization": f"Bearer {access_token}"}
    r = requests.get(url, headers=headers)
    r.raise_for_status()
    return r.json().get("results", [])


@task(retries=3, retry_delay_seconds=60)
def get_gelato_fulfillments():
    url = "https://order.gelatoapis.com/v4/orders"
    headers = {'Content-Type': 'application/json','X-API-KEY': GELATO_API_KEY}

    r = requests.get(url, headers=headers)
    r.raise_for_status()
    data = r.json()

    orders = []

    for order in data.get("orders", []):
        order_row = dict(order)

        url_order = f"https://order.gelatoapis.com/v4/orders/{order.get("id")}"
        r = requests.get(url_order, headers=headers)
        r.raise_for_status()
        order = r.json()

        # Extract receipts (nested object)
        receipts = order.get("receipts", {})
        order_row["subtotal"] = receipts[0].get("productsPriceInitial", 0)
        order_row["shipping"] = receipts[0].get("shippingPrice", 0)
        order_row["tax"] = receipts[0].get("totalVat", 0)

        orders.append(order_row)

    return r.json().get("orders", [])


@task
def save_etsy_receipts(receipts: list):
    conn = pyodbc.connect(SQL_CONN_STR)
    cursor = conn.cursor()
    for r in receipts:
        cursor.execute("""
            INSERT INTO etsy_receipts (
                receipt_id, buyer_user_id, buyer_name, buyer_email, payment_status,
                total_price, currency_code, shipping_cost, created_at, updated_at, pulled_at
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """,
        r.get("receipt_id"),
        r.get("buyer_user_id"),
        r.get("name"),
        r.get("email"),
        r.get("payment_status"),
        r.get("grandtotal", {}).get("amount"),
        r.get("grandtotal", {}).get("currency_code"),
        r.get("shipping_cost", {}).get("amount"),
        r.get("creation_tsz"),
        r.get("last_modified_tsz"),
        datetime.datetime.utcnow()
        )
    conn.commit()
    cursor.close()
    conn.close()
    return len(receipts)


@task
def save_etsy_transactions(transactions: list):
    conn = pyodbc.connect(SQL_CONN_STR)
    cursor = conn.cursor()
    for t in transactions:
        cursor.execute("""
            INSERT INTO etsy_transactions (
                transaction_id, receipt_id, item_id, title, quantity, price,
                sku, variation_details, pulled_at
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """,
        t.get("transaction_id"),
        t.get("receipt_id"),
        t.get("listing_id"),
        t.get("title"),
        t.get("quantity"),
        t.get("price"),
        t.get("sku"),
        str(t.get("variations")),
        datetime.datetime.utcnow()
        )
    conn.commit()
    cursor.close()
    conn.close()
    return len(transactions)


@task
def save_gelato_fulfillments(orders: list):
    conn = pyodbc.connect(SQL_CONN_STR)
    cursor = conn.cursor()
    for o in orders:
        for item in o.get("items", []):
            cursor.execute("""
                INSERT INTO gelato_fulfillments (
                    id, clientId, orderReferenceId, fulfillmentStatus,
                    financialStatus, currency, totalInclVat, orderType, channel,
                    storeId, connectedOrderIds, firstName, lastName, country,
                    itemsCount, createdAt, updatedAt, orderedAt,
                    refusalReasonCode, appServices, customerReferenceId,
                    sourceClientId, delegationClientIds, delayDays
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            o.get("id"),
            item.get("referenceId"),  # should match Etsy transaction_id
            o.get("status"),
            item.get("cost"),
            item.get("shipping_method"),
            item.get("tracking_number"),
            o.get("shipped_at"),
            datetime.datetime.utcnow()
            )
    conn.commit()



# -------------------------------
# FLOW
# -------------------------------

@flow(name="Etsy v2 + Gelato Pipeline")
def etsy_gelato_pipeline():
    print("Starting Prefect Flow (Etsy v2 + Gelato)")

    receipts = get_etsy_receipts()
    transactions = get_etsy_transactions()
    gelato_orders = get_gelato_fulfillments()

    receipt_count = save_etsy_receipts(receipts)
    transaction_count = save_etsy_transactions(transactions)
    fulfillment_count = save_gelato_fulfillments(gelato_orders)

    print(f"Saved {receipt_count} receipts, {transaction_count} transactions, {fulfillment_count} fulfillments.")

if __name__ == "__main__":
    etsy_gelato_pipeline()
