## Imports and Setup

In [1]:
import pandas as pd
import numpy as np
import os
import sys
# import logging
from dotenv import load_dotenv

In [2]:
from pyathena import connect
from sqlalchemy import create_engine
from pyathena.pandas.cursor import PandasCursor

In [3]:
lib_loc = "D:\\ONDC_Project\\Funnel_Analysis\\webapp\\"

In [5]:
if os.path.exists(lib_loc):
  try:
    sys.path.append("D:\\ONDC_Project\\Funnel_Analysis\\")
  except:
    print(FileNotFoundError)
  else:
    print("Library location loaded.")
 

Library location loaded.


In [6]:
try:
  load_dotenv(".env")
except:
  print("Unable to load the Envitonment Variables. Exiting.")
  sys.exit()
else:
  print("Environment Variables loaded. Proceeding.")

Environment Variables loaded. Proceeding.


In [7]:
# AWS ENVs
# ==================================================================

aws_schema = os.getenv('SCHEMA_NAME')
aws_tbl = os.getenv('TABLE_NAME')
aws_access_key = os.getenv('AWS_ACCESS_KEY')
aws_secret_key = os.getenv('AWS_SECRET_KEY')
aws_region = os.getenv('AWS_REGION')
aws_staging_dir = os.getenv('S3_STAGING_DIR')
aws_db = os.getenv('DATABASE_NAME')


# TGT DB Envs
# ================================================================== 

tgt_user = os.getenv("POSTGRES_USER")
tgt_pwd = os.getenv("POSTGRES_PASSWORD")
tgt_host = os.getenv("POSTGRES_HOST")
tgt_port = os.getenv("POSTGRES_PORT") 
tgt_schema = os.getenv("POSTGRES_SCHEMA")
tgt_db = os.getenv("POSTGRES_DB")

In [9]:
def connect_pg_src():   
  try:
    pg_conn = create_engine(f"postgresql+psycopg://{tgt_user}:{tgt_pwd}@{tgt_host}:{tgt_port}/{tgt_db}")
  except Exception as e:
    print(e)
    print("Unable to connect to the Target Datbase. Falling back to File System.")
  else:
    print("Connected to the Database. Proceeding.")
    return pg_conn

def connect_src_athena():
  try:
      pandas_ath_cursor = connect(
          aws_access_key_id=aws_access_key,
          aws_secret_access_key=aws_secret_key,
          s3_staging_dir=aws_staging_dir,
          region_name=aws_region,
          schema_name=aws_schema,
          cursor_class=PandasCursor).cursor()
  except Exception as e:
      print(e.args[0])
      print("Falling to file system if available." )
  else:
      print("Connected to Athena Database using pandas' connector.")
      return pandas_ath_cursor

In [12]:
def run_sql_athena(query: str, db_cursor, size: int=0):
	df = pd.DataFrame()
	try:
		print("Executing the query on AWS Athena.")
		if size != 0:
			df = db_cursor.execute(query).fetchmany(size).as_pandas()
		else:
			df = db_cursor.execute(query).as_pandas()
	except Exception as e:
		print(e.args[0])
		return
	else:
		print("Successfully executed the query.")
	return df

def write_to_pg(schema_name: str, table_name: str, db_conn:str, df_tgt: pd.DataFrame,chunk: int = 50000):
  try:
    df_tgt.to_sql(con=db_conn,
              chunksize=chunk,if_exists="replace", 
                            name=table_name, schema=schema_name)
  except Exception as e:
    print(e.args[0])
    return False
  else:
    print("Write Successful")
    return True

def pair_mismatch(tgt_df: pd.DataFrame, src: str, tgt: str, srch_col:str = "transaction_id") -> list[str]:
  """
  Returns a list of Transaction IDs where there is a pair mismatch for the given pair. 
  df = Pandas Dataframe. This should be a crosstab format dataframe. 
  src = Source Column. 
  tgt = Target Column.
  srch_col = Column to return. Default is transaction_id.
    """
  list_missing = list(tgt_df[tgt_df[src] != tgt_df[tgt]][srch_col])
  return list_missing

def mulitple_calls(tgt_df: pd.DataFrame, col_to_srch: str, srch_col: str = "transaction_id", occur: int = 1) -> list[str]:
  """
  Returns a list of Transaction IDs if they have multiple instances.   
  df = Pandas Dataframe. 
  col_to_srch = The column to perform the search operation in.
  srch_col = The column to return. The default value is transaction_id. 
  occur = The number of occurances to search. Default value is 1. This will always search '> occur' instances. 
    """
  return list(tgt_df[tgt_df[col_to_srch] > occur][srch_col])

## Connect to DB

In [11]:
ath_conn = connect_src_athena()

Connected to Athena Database using pandas' connector.


## Queries

#### Date Range

In [30]:
dt_rng = f"""select distinct date(date_parse("select_timestamp", '%Y-%m-%dT%H:%i:%sZ')) as date_vals
from {aws_db}.{aws_tbl}
group by date(date_parse("select_timestamp", '%Y-%m-%dT%H:%i:%sZ'))
order by date(date_parse("select_timestamp", '%Y-%m-%dT%H:%i:%sZ'));"""

In [31]:
df_dt_rng = run_sql_athena(dt_rng,ath_conn)

Executing the query on AWS Athena.
Successfully executed the query.


In [44]:
df_dt_rng.head(5)

Unnamed: 0,date_vals
0,2024-04-05
1,2024-05-02
2,2024-05-16
3,2024-05-17
4,2024-05-20


In [None]:
last_date = df_dt_rng.iloc[df_dt_rng.shape[0]-2]["date_vals"]

#### Transactions per date

In [23]:
unique_tr_by_day = f"""select date(date_parse("select_timestamp", '%Y-%m-%dT%H:%i:%sZ')) as select_date,
count(distinct transaction_id)
from {aws_db}.{aws_tbl}
group by date(date_parse("select_timestamp", '%Y-%m-%dT%H:%i:%sZ'))
order by date(date_parse("select_timestamp", '%Y-%m-%dT%H:%i:%sZ'));"""

In [36]:
tr_by_day = f"""select date(date_parse("select_timestamp", '%Y-%m-%dT%H:%i:%sZ')) as select_date,
count(transaction_id)
from {aws_db}.{aws_tbl}
group by date(date_parse("select_timestamp", '%Y-%m-%dT%H:%i:%sZ'))
order by date(date_parse("select_timestamp", '%Y-%m-%dT%H:%i:%sZ'));"""

In [37]:
df_tr_by_dt = run_sql_athena(tr_by_day,ath_conn)

Executing the query on AWS Athena.
Successfully executed the query.


In [25]:
df_uniq_tr_by_dt = run_sql_athena(unique_tr_by_day,ath_conn)

Executing the query on AWS Athena.
Successfully executed the query.


In [43]:
df_uniq_tr_by_dt.head(5)

Unnamed: 0,select_date,_col1
0,2024-04-05,2
1,2024-05-02,1
2,2024-05-16,1
3,2024-05-17,2
4,2024-05-20,1


In [42]:
df_tr_by_dt.head(5)

Unnamed: 0,select_date,_col1
0,2024-04-05,3
1,2024-05-02,6
2,2024-05-16,6
3,2024-05-17,12
4,2024-05-20,6


### Analysis on BAP Ids

In [39]:
buyer_seller = f"""SELECT 
    provider_id,provider_name, bpp_id, bap_id
FROM 
    {aws_db}.{aws_tbl}
GROUP BY 
    provider_id,provider_name, bpp_id, bap_id
ORDER BY 
	provider_id, provider_name, bpp_id, bap_id"""

In [40]:
df_buyer_seller = run_sql_athena(buyer_seller, ath_conn)

Executing the query on AWS Athena.
Successfully executed the query.


In [41]:
df_buyer_seller.head(5)

Unnamed: 0,provider_id,provider_name,bpp_id,bap_id
0,000c63cb00796625e37692675549790a,6TH DIMENSION HOMES AND CARE PRIVATE LIMITED,prod-sellerapp.shiprocket.com,ondc.paytm.com
1,00YIUUMNZnGMMbLyvuBU,Shivshakti Super Market,ondc.addble.com,ondc.paytm.com
2,00YIUUMNZnGMMbLyvuBU,Shivshakti Super Market,ondc.addble.com,prd.mystore.in
3,00b158cc-7150-47db-9e43-94f6d84040e0,Sage Pet Lounge,api.esamudaay.com/ondc/sdk/bpp/retail/uespl,ondc.paytm.com
4,00d08ce803324ba4357f5eb31848ff4f,MILAGROW NUTS PRIVATE LIMITED,prod-sellerapp.shiprocket.com,ondc.paytm.com


In [67]:
tr_prov = f"""WITH transaction_provider_counts AS (
    SELECT
        transaction_id,
        provider_id,
        COUNT(*) AS provider_count
    FROM
        {aws_db}.{aws_tbl}
    GROUP BY
        transaction_id,
        provider_id
),
transaction_unique_provider_counts AS (
    SELECT
        transaction_id,
        COUNT(DISTINCT provider_id) AS unique_provider_count
    FROM
        {aws_db}.{aws_tbl}
    GROUP BY
        transaction_id
)
SELECT
    tpc.transaction_id,
    tpc.provider_id,
    tpc.provider_count
FROM
    transaction_provider_counts tpc
JOIN
    transaction_unique_provider_counts tupc
    ON tpc.transaction_id = tupc.transaction_id
WHERE
    tupc.unique_provider_count > 1
order by tupc.transaction_id, tpc.provider_id"""

In [68]:
df_tr_prov = run_sql_athena(tr_prov, ath_conn)

Executing the query on AWS Athena.
Successfully executed the query.


In [69]:
df_tr_prov.head(5)

Unnamed: 0,transaction_id,provider_id,provider_count
0,0001e709-5ed3-436c-8f34-c04c27174ca5,30647898,6
1,0001e709-5ed3-436c-8f34-c04c27174ca5,46315883,6
2,0001ee6c-4660-439f-8d33-e9a223b4dcfc,34144401,2
3,0001ee6c-4660-439f-8d33-e9a223b4dcfc,45013394,2
4,0001ff16-3926-4e7e-9444-cb510083e046,36415,2


In [75]:
multi_calls = f"""WITH transaction_type_counts AS (
    SELECT
        transaction_id,
        provider_id,
        transaction_type,
        COUNT(*) AS type_count
    FROM
        {aws_db}.{aws_tbl}
    GROUP BY
        transaction_id,
        provider_id,
        transaction_type
)
SELECT
    transaction_id,
    provider_id,
    transaction_type,
    type_count
FROM
    transaction_type_counts
WHERE
    type_count > 1
order by type_count desc, transaction_id, provider_id"""

In [76]:
df_multi_calls = run_sql_athena(multi_calls, ath_conn)

Executing the query on AWS Athena.
Successfully executed the query.


In [77]:
df_multi_calls.head(5)

Unnamed: 0,transaction_id,provider_id,transaction_type,type_count
0,5e6ff7b3-56af-447a-8d10-87734379151e,0444ff21-fb7c-4579-9459-987ffbb765-3425,on_init,7
1,5e6ff7b3-56af-447a-8d10-87734379151e,0444ff21-fb7c-4579-9459-987ffbb765-3425,select,7
2,5e6ff7b3-56af-447a-8d10-87734379151e,0444ff21-fb7c-4579-9459-987ffbb765-3425,on_select,7
3,5e6ff7b3-56af-447a-8d10-87734379151e,0444ff21-fb7c-4579-9459-987ffbb765-3425,init,7
4,,,init,7
