In [15]:
#PROGRAM DEPENDENCIES
import sys
import os  
from datetime import datetime, timedelta, date
import datetime as dt

import requests
import pandas as pd
import json
import yaml
import importlib


from snowflake.snowpark.functions import date_trunc, current_date, to_date, month
from snowflake.snowpark.functions import col, lit, when
from snowflake.snowpark.types import DateType
import snowflake.snowpark.functions as f


sys.path.append(os.path.abspath('scripts'))


import scripts.utils
importlib.reload(scripts.utils)
from scripts.utils import get_orders_for_date, backfill_date_generator, get_min_date_from_table, get_max_date_from_table, get_orders_by_ids





In [3]:
#GET SESSION
import sys
if sys.platform == "darwin":
    import scripts.SnowflakeConnector  # brings module into scope
    importlib.reload(scripts.SnowflakeConnector)
    from scripts.SnowflakeConnector import create_active_session
    session = create_active_session()
else:
    from snowflake.snowpark.context import get_active_session
    session = get_active_session()

In [18]:
#Access Token
TOKEN_PATH = "configs/shopify_auth.yaml"
QUERY_PATH = "graphql/queries/get_sellingplan.graphql"
QUERY_PATH_BY_ID = "graphql/queries/get_sellingplan_by_id.graphql"
SELLINGPLAN_TABLE = "snowpark_db.shopify.orders_with_selling_plan"
SELLINGPLAN_STAGE_TABLE = "snowpark_db.public.orders_with_selling_plan_stage"
DATE_COLUMN = "PROCESSED_AT"


with open(TOKEN_PATH, "r") as file:
    config = yaml.safe_load(file)
ACCESS_TOKEN = config.get("ACCESS_TOKEN", {})


with open(QUERY_PATH, "r") as file:
    QUERY_STRING = file.read()

with open(QUERY_PATH_BY_ID, "r") as file:
    QUERY_STRING_BY_ID = file.read()


SHOP_ENDPOINT = "https://shopmbg.myshopify.com/admin/api/2024-04/graphql.json"

In [20]:
#get orders by id

order_numbers = [732530, 732894]
all_orders = get_orders_by_ids(order_numbers, ACCESS_TOKEN, SHOP_ENDPOINT, QUERY_STRING_BY_ID)
now = datetime.utcnow()
paginated_df = pd.DataFrame([
    {
        "PROCESSED_AT" :o['node']['processedAt'] ,
        "ORDER_ID": o['node']['id'].split('/')[-1],
        "ORDER_NAME": o['node']['name'].split('#')[-1],
        "LINEITEM_ID":json.dumps(o['node']['lineItems']),
        "TAGS":json.dumps(o['node']['tags']),
        "LOAD_TIMESTAMP": now
    }
    for o in all_orders
])




paginated_df.tail(10)

Fetched 2 orders for 2 requested IDs


Unnamed: 0,PROCESSED_AT,ORDER_ID,ORDER_NAME,LINEITEM_ID,TAGS,LOAD_TIMESTAMP
0,2025-02-04T05:55:13Z,6433192280169,732530,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""Klickly""]",2025-07-03 19:16:15.480859
1,2025-02-04T21:54:15Z,6433984610409,732894,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""marketing""]",2025-07-03 19:16:15.480859


In [6]:
##See  SELLINGPLAN_TABLE daily summary

SELLINGPLAN = session.table(SELLINGPLAN_TABLE)\
    .select(to_date(col(DATE_COLUMN)).alias("PROCESSED_DATE"), 
            col("ORDER_ID"), 
            col("ORDER_NAME"), 
            col("LINEITEM_ID"), 
            col("TAGS"), 
            col("LOAD_TIMESTAMP")).group_by(col("PROCESSED_DATE"))\
    .agg(f.count(col("ORDER_ID")).alias("ORDER_COUNT")).sort(col("PROCESSED_DATE").desc())

SELLINGPLAN.show()

------------------------------------
|"PROCESSED_DATE"  |"ORDER_COUNT"  |
------------------------------------
|2025-07-02        |60             |
|2025-07-01        |1308           |
|2025-06-30        |1067           |
|2025-06-29        |1174           |
|2025-06-28        |1002           |
|2025-06-27        |1024           |
|2025-06-26        |980            |
|2025-06-25        |1057           |
|2025-06-24        |1019           |
|2025-06-23        |934            |
------------------------------------



In [7]:
##See  SELLINGPLAN_TABLE monthly summary
SELLINGPLAN = session.table(SELLINGPLAN_TABLE)\
    .select(month(to_date(col(DATE_COLUMN))).alias("PROCESSED_MONTH"), 
            col("ORDER_ID"), 
            col("ORDER_NAME"), 
            col("LINEITEM_ID"), 
            col("TAGS"), 
            col("LOAD_TIMESTAMP")).group_by(col("PROCESSED_MONTH"))\
    .agg(f.count(col("ORDER_ID")).alias("ORDER_COUNT")).sort(col("PROCESSED_MONTH").desc())

SELLINGPLAN.show()

-------------------------------------
|"PROCESSED_MONTH"  |"ORDER_COUNT"  |
-------------------------------------
|12                 |64878          |
|11                 |62372          |
|10                 |60798          |
|9                  |58893          |
|8                  |55637          |
|7                  |55840          |
|6                  |79164          |
|5                  |83460          |
|4                  |78129          |
|3                  |76369          |
-------------------------------------



In [8]:
orders = get_orders_for_date("2025-06-11", ACCESS_TOKEN, SHOP_ENDPOINT, QUERY_STRING)


Fetched 999 orders for 2025-06-11


In [9]:
#Function usage example
get_date = backfill_date_generator(start_date=date(2025, 6, 20), stop_date=date(2025, 6, 15))
for _ in range(5):
    print(next(get_date))

2025-06-19
2025-06-18
2025-06-17
2025-06-16
2025-06-15


In [10]:
#maxdate in SELLINGPLAN_TABLE
max_date_in_table = get_max_date_from_table(SELLINGPLAN_TABLE, DATE_COLUMN, session)
max_date_in_table.strftime("%Y-%m-%d")

'2025-07-02'

In [91]:
### Test Extraction by date
load_date = '2025-06-17'
all_orders = get_orders_for_date(load_date, ACCESS_TOKEN, SHOP_ENDPOINT, QUERY_STRING)
now = datetime.utcnow()
paginated_df = pd.DataFrame([
    {
        "PROCESSED_AT" :o['node']['processedAt'] ,
        "ORDER_ID": o['node']['id'].split('/')[-1],
        "ORDER_NAME": o['node']['name'].split('#')[-1],
        "LINEITEM_ID":json.dumps(o['node']['lineItems']),
        "TAGS":json.dumps(o['node']['tags']),
        "LOAD_TIMESTAMP": now
    }
    for o in all_orders
])

#
end_date = datetime.strptime(load_date, "%Y-%m-%d") + timedelta(days=1)
end_date = end_date.strftime('%Y-%m-%dT00:00:00Z')
paginated_df = paginated_df[paginated_df["PROCESSED_AT"]<end_date]


#orders_snow_df = session.create_dataframe(paginated_df)


print(f"end_date = {end_date}")




paginated_df.tail(10)

#1769

Fetched 1006 orders for 2025-06-17
end_date = 2025-06-18T00:00:00Z


Unnamed: 0,PROCESSED_AT,ORDER_ID,ORDER_NAME,LINEITEM_ID,TAGS,LOAD_TIMESTAMP
930,2025-06-17T23:14:34Z,6687473565801,865466,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""nofraud_pass""]",2025-06-27 17:06:17.661623
931,2025-06-17T23:21:11Z,6687480381545,865467,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""nofraud_pass"", ""Subscription First Order""]",2025-06-27 17:06:17.661623
932,2025-06-17T23:42:50Z,6687503253609,865468,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""CashBack:25%"", ""Klickly"", ""nofraud_pass"", ""S...",2025-06-27 17:06:17.661623
933,2025-06-17T23:44:20Z,6687504924777,865469,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""nofraud_pass"", ""Subscription First Order""]",2025-06-27 17:06:17.661623
934,2025-06-17T23:46:24Z,6687507120233,865470,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""nofraud_pass"", ""Subscription First Order""]",2025-06-27 17:06:17.661623
935,2025-06-17T23:51:16Z,6687512035433,865471,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""nofraud_pass""]",2025-06-27 17:06:17.661623
936,2025-06-17T23:53:43Z,6687514525801,865472,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""nofraud_pass"", ""Subscription First Order""]",2025-06-27 17:06:17.661623
937,2025-06-17T23:53:59Z,6687514755177,865473,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""nofraud_pass"", ""Subscription First Order""]",2025-06-27 17:06:17.661623
938,2025-06-17T23:58:07Z,6687519113321,865474,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""CashBack:25%"", ""nofraud_pass"", ""Subscription...",2025-06-27 17:06:17.661623
939,2025-06-17T23:58:22Z,6687519342697,865475,"{""edges"": [{""node"": {""id"": ""gid://shopify/Line...","[""nofraud_pass"", ""Subscription First Order""]",2025-06-27 17:06:17.661623


In [95]:
#final extraction 
#date_list = date_generator(stop_date=date(2025, 1, 1))


date_list = ["2025-06-17", "2025-06-18"]

for load_date in date_list:
    all_orders = get_orders_for_date(load_date, ACCESS_TOKEN, SHOP_ENDPOINT, QUERY_STRING)
    now = datetime.utcnow()
    paginated_df = pd.DataFrame([
        {
            "PROCESSED_AT" :o['node']['processedAt'] ,
            "ORDER_ID": o['node']['id'].split('/')[-1],
            "ORDER_NAME": o['node']['name'].split('#')[-1],
            "LINEITEM_ID":json.dumps(o['node']['lineItems']),
            "TAGS":json.dumps(o['node']['tags']),
            "LOAD_TIMESTAMP": now
        }
        for o in all_orders
    ])

    #
    end_date = datetime.strptime(load_date, "%Y-%m-%d") + timedelta(days=1)
    end_date = end_date.strftime('%Y-%m-%dT00:00:00Z')
    paginated_df = paginated_df[paginated_df["PROCESSED_AT"]<end_date]
    

    #transactions = session.create_dataframe(paginated_df)
    #transactions.write.save_as_table(SELLINGPLAN_TABLE, mode="append")
 

    print(f"Updated transactions for {load_date}")

Fetched 1006 orders for 2025-06-17
Updated transactions for 2025-06-17
Fetched 971 orders for 2025-06-18
Updated transactions for 2025-06-18
