In [1]:
from connections import sf_engine
from sqlalchemy import text
from custom_types import OrderNumber, Carton, Sku

# Create the engine
engine = sf_engine()

# Test the connection
try:
    with engine.connect() as conn:
        result = conn.execute(text("SELECT * FROM DAGSTER_IO.DS_DEV.WISMO_CARTONS LIMIT 1"))
        row = result.fetchone()
        print("Connection successful!", row)
except Exception as e:
    print("Connection failed:", e)

uk27690.east-us-2.azure GT_DAGSTER DAGSTER_ROLE




Connection successful! (None, '202041', datetime.date(2025, 3, 26), '0', 1.0, 'On-Time', datetime.date(2025, 3, 27), datetime.date(2025, 3, 27), 'GENFT', 'GENERAL FREIGHT     ', 'https://portal.shiptrackapp.com/view.aspx?tracking=2020410003262025')


In [2]:
from typing import Tuple
from connections import DB, SCHEMA, sf_engine
import pandas as pd
import importlib
import custom_types
importlib.reload(custom_types)
from custom_types import OrderNumber, Carton, Sku

def check_in_original(order_number: str, conn) -> bool:
    """
    Check if the row exists in DAGSTER_IO.DS_DEV.CHURN_READ_SQL
    where ORDERNUMBER equals the given order_number.
    """
    # Compose the full table reference safely
    full_table = f"{DB}.{SCHEMA}.{'wismo_orders'}"
    sql = f"""
        SELECT 1
        FROM {full_table}
        WHERE ORIGINALORDERNUMBER = {order_number}
    """
    df = pd.read_sql(sql, conn)

    return not df.empty

def get_orders(order_number:int, conn) -> list:

    # Compose the full table reference safely
    full_table = f"{DB}.{SCHEMA}.{'wismo_orders'}"
    sql = f"""
        SELECT *
        FROM {full_table}
        WHERE postsplitordernumber = {order_number}
    """

    df = pd.read_sql(sql, conn)

    if df.empty:
        return []
    
    rows = [
        {k: v for k, v in row.items()}
        for _, row in df.iterrows()
    ]

    return rows

def original_order_query(order_number, conn):
    full_table = f"{DB}.{SCHEMA}.{'wismo_orders'}"
    sql = f"""
    SELECT *
    FROM {full_table}
    WHERE ORIGINALORDERNUMBER = {order_number}
    """
    df = pd.read_sql(sql, conn)

    rows = [
        {k: v for k, v in row.items()}
        for _, row in df.iterrows()
    ]

    return rows

def get_skus(order_number, conn):
    # Compose the full table reference safely
    full_table = f"{DB}.{SCHEMA}.{'wismo_skus'}"
    sql = f"""
        SELECT *
        FROM {full_table}
        WHERE postsplitordernumber = {order_number}
    """
    df = pd.read_sql(sql, conn)

    rows = [
        {k: v for k, v in row.items()}
        for _, row in df.iterrows()
    ]

    return rows

def get_cartons(order_number: str, conn) -> list:
    """
    Get the cartons from DAGSTER_IO.DS_DEV.CHURN_READ_SQL
    where ORDERNUMBER equals the given order_number.
    """

    # Compose the full table reference safely
    full_table = f"{DB}.{SCHEMA}.{'wismo_cartons'}"
    sql = f"""
        SELECT *
        FROM {full_table}
        WHERE postsplitordernumber = {order_number}
    """

    df = pd.read_sql(sql, conn)

    if df.empty:
        return []
    
    rows = [
        {k: v for k, v in row.items()}
        for _, row in df.iterrows()
    ]

    return rows


def process_order_number(order_number: int, conn):

    # create an empty list to hold OrderNumber objects
    order_list = []

    # all order numbers might have multiple orders due to back order levels
    # we will treat all backorder levels as separate orders

    orders = get_orders(order_number, conn)

    exists_in_original = check_in_original(order_number, conn)

    # snowflake query to get all the skus under
    skus = get_skus(order_number, conn)

    # query snowflake to get all the cartons under the postsplitordernumber 
    cartons = get_cartons(order_number, conn)
    
    # iterate through each order that appears under the postsplitordernumber
    for order in orders:

        sku_list = []
        # for each order get all the skus that are for it
        for sku in skus:
            if sku['postsplitordernumber'] == order['postsplitordernumber'] and sku['ordersuffix'] == order['ordersuffix']:
                pick_qty = None if sku["pickqty"] is None else int(float(sku["pickqty"]))
                sku_list.append(
                    Sku(
                        orderNumber=int(sku["postsplitordernumber"]),
                        orderSuffix=sku["ordersuffix"],
                        sku=sku["sku"],
                        pickQty=pick_qty
                    )
                )

        # for each order
        carton_list = []

        # for each carton we add to list of cartons but only if it matches the 
        for carton in cartons:
            carton_sku_list = []
            for sku in skus:
                if sku['postsplitordernumber'] == carton["postsplitordernumber"] and sku["ordersuffix"] == carton["ordersuffix"]:
                    pick_qty = None if sku["pickqty"] is None else int(float(sku["pickqty"]))
                    carton_sku_list.append(
                        Sku(
                            orderNumber=sku["postsplitordernumber"],
                            orderSuffix=sku["ordersuffix"],
                            sku=sku["sku"],
                            pickQty=pick_qty
                        )
                    )

            if carton['postsplitordernumber'] == order['postsplitordernumber'] and carton['ordersuffix'] == order['ordersuffix']:
                carton_list.append(
                    Carton(
                        orderNumber=carton["postsplitordernumber"],
                        orderSuffix=carton['ordersuffix'],
                        cartonId=carton["cartonid"],
                        deliveryStatusDescription=carton["deliverystatusdescription"],
                        actualDeliveryDate=carton["actualdeliverydate"],
                        expectedDeliveryDate=carton["expecteddeliverydate"],
                        carrierCode=carton["carriercode"],
                        carrierDescription=carton["carrierdescription"],
                        traceAndTraceLink=carton["trace_and_trace_link"],
                        skus=carton_sku_list
                    )
                )

        if exists_in_original:
            # make snowflake query to query the original order column
            split_orders = original_order_query(order_number, conn)
            split_order_list = []
            for order in split_orders:
                split_order_list.extend(process_order_number(order['postsplitordernumber'], conn))
        else:
            split_order_list=None
        
        print(sku_list)
        print(carton_list)
        order_list.append(
            OrderNumber(
                orderNumber=order_number,
                orderBookedDate=order['orderbookeddate'],
                orderSuffix=order['ordersuffix'],
                orderStatus=order['orderstatus'],  # Pydantic will validate this against OrderStatusType
                orderContactFullName=order['ordercontactfullname'],
                contactEmailAddress=order['contactemailaddress'],
                contactPhone=order['contactphone'],
                splitOrders=split_order_list,
                skus=sku_list,
                cartons=carton_list
                ))
        
    return order_list

with sf_engine().connect() as conn:
    object = process_order_number(533212, conn)

object

[Sku(orderNumber=533214, orderSuffix=0, sku='85391', pickQty=1), Sku(orderNumber=533214, orderSuffix=0, sku='OD825190', pickQty=1)]
[Carton(orderNumber=533214, orderSuffix=0, cartonId=1, deliveryStatusDescription='Late Delivery', expectedDeliveryDate=datetime.datetime(2025, 6, 23, 0, 0), actualDeliveryDate=datetime.datetime(2025, 6, 25, 0, 0), carrierCode='PUROL', carrierDescription='PUROLATOR NSP       ', traceAndTraceLink='https://www.purolator.com/en/shipping/tracker?pin=NTD000060509', skus=[Sku(orderNumber=533214, orderSuffix=0, sku='85391', pickQty=1), Sku(orderNumber=533214, orderSuffix=0, sku='OD825190', pickQty=1)])]


ValidationError: 3 validation errors for OrderNumber
skus.0
  Input should be a valid dictionary or instance of Sku [type=model_type, input_value=Sku(orderNumber=533214, o... sku='85391', pickQty=1), input_type=Sku]
    For further information visit https://errors.pydantic.dev/2.12/v/model_type
skus.1
  Input should be a valid dictionary or instance of Sku [type=model_type, input_value=Sku(orderNumber=533214, o...u='OD825190', pickQty=1), input_type=Sku]
    For further information visit https://errors.pydantic.dev/2.12/v/model_type
cartons.0
  Input should be a valid dictionary or instance of Carton [type=model_type, input_value=Carton(orderNumber=533214...'OD825190', pickQty=1)]), input_type=Carton]
    For further information visit https://errors.pydantic.dev/2.12/v/model_type