In [18]:
!git clone https://github.com/PhonePe/pulse.git

fatal: destination path 'pulse' already exists and is not an empty directory.


In [19]:
import os, json
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
from pathlib import Path
from dotenv import load_dotenv, dotenv_values 

# PostgreSQL connection settings
PG_USER = os.getenv("PG_USER")
PG_PASS = os.getenv("PG_PASS")
PG_HOST = os.getenv("PG_HOST")
PG_PORT = os.getenv("PG_PORT")
PG_DB = os.getenv("PG_DB")

BASE_DIR = r"E:\phonepe\pulse\data" 

AGG_DIR = os.path.join(BASE_DIR, "aggregated")
MAP_DIR = os.path.join(BASE_DIR, "map")
TOP_DIR = os.path.join(BASE_DIR, "top")


In [20]:
def clean_state(s: str) -> str:
    s = s.replace("-", " ").title()
    s = s.replace("Andaman & Nicobar Islands", "Andaman And Nicobar")
    s = s.replace("Dadra & Nagar Haveli & Daman & Diu",
                  "Dadra And Nagar Haveli And Daman Diu")
    return s


In [21]:
#Aggregated: transaction, user, insurance

def process_aggregated(category: str) -> pd.DataFrame:
    recs = []
    cat_path = os.path.join(AGG_DIR, category, "country", "india", "state")
    for state in os.listdir(cat_path):
        sp = os.path.join(cat_path, state)
        for year in os.listdir(sp):
            yp = os.path.join(sp, year)
            for file in os.listdir(yp):
                q = int(file.strip(".json"))
                with open(os.path.join(yp, file), "r") as f:
                    data = json.load(f)

                if category in ["transaction", "insurance"]:
                    for t in (data["data"].get("transactionData") or []):
                        recs.append({
                            "State": clean_state(state),
                            "Year": int(year),
                            "Quarter": q,
                            "Type": t["name"],
                            "Count": t["paymentInstruments"][0]["count"],
                            "Amount": t["paymentInstruments"][0].get("amount", 0.0)
                        })
                elif category == "user":
                    for u in (data["data"].get("usersByDevice") or []):
                        recs.append({
                            "State": clean_state(state),
                            "Year": int(year),
                            "Quarter": q,
                            "Brand": u["brand"],
                            "Count": u["count"],
                            "Percentage": u["percentage"]
                        })
    return pd.DataFrame(recs)

df_aggregated_transaction = process_aggregated("transaction").rename(
    columns={"Type":"Transaction_Type","Count":"Transaction_Count","Amount":"Transaction_Amount"}
)
df_aggregated_user = process_aggregated("user")
df_aggregated_insurance = process_aggregated("insurance").rename(
    columns={"Type":"Insurance_Type","Count":"Insurance_Count","Amount":"Insurance_Amount"}
)

print("agg_txn:", len(df_aggregated_transaction), "agg_user:", len(df_aggregated_user), "agg_ins:", len(df_aggregated_insurance))
display(df_aggregated_transaction.head()), 
display(df_aggregated_user.head()), 
display(df_aggregated_insurance.head())




agg_txn: 5034 agg_user: 6732 agg_ins: 682


Unnamed: 0,State,Year,Quarter,Transaction_Type,Transaction_Count,Transaction_Amount
0,Andaman And Nicobar,2018,1,Recharge & bill payments,4200,1845307.0
1,Andaman And Nicobar,2018,1,Peer-to-peer payments,1871,12138660.0
2,Andaman And Nicobar,2018,1,Merchant payments,298,452507.2
3,Andaman And Nicobar,2018,1,Financial Services,33,10601.42
4,Andaman And Nicobar,2018,1,Others,256,184689.9


Unnamed: 0,State,Year,Quarter,Brand,Count,Percentage
0,Andaman And Nicobar,2018,1,Xiaomi,1665,0.247033
1,Andaman And Nicobar,2018,1,Samsung,1445,0.214392
2,Andaman And Nicobar,2018,1,Vivo,982,0.145697
3,Andaman And Nicobar,2018,1,Oppo,501,0.074332
4,Andaman And Nicobar,2018,1,OnePlus,332,0.049258


Unnamed: 0,State,Year,Quarter,Insurance_Type,Insurance_Count,Insurance_Amount
0,Andaman And Nicobar,2020,2,Insurance,6,1360.0
1,Andaman And Nicobar,2020,3,Insurance,41,15380.0
2,Andaman And Nicobar,2020,4,Insurance,124,157975.0
3,Andaman And Nicobar,2021,1,Insurance,225,244266.0
4,Andaman And Nicobar,2021,2,Insurance,137,181504.0


In [22]:
#Map: transaction, user, insurance

def process_map_transaction() -> pd.DataFrame:
    recs = []
    p = os.path.join(MAP_DIR, "transaction", "hover", "country", "india", "state")
    for state in os.listdir(p):
        sp = os.path.join(p, state)
        for year in os.listdir(sp):
            yp = os.path.join(sp, year)
            for file in os.listdir(yp):
                q = int(file.strip(".json"))
                with open(os.path.join(yp, file), "r") as f:
                    data = json.load(f)
                for e in (data["data"].get("hoverDataList") or []):
                    recs.append({
                        "State": clean_state(state), "Year": int(year), "Quarter": q,
                        "Name": e["name"],
                        "Transaction_Count": e["metric"][0]["count"],
                        "Transaction_Amount": e["metric"][0].get("amount")
                    })
    return pd.DataFrame(recs)

def process_map_user() -> pd.DataFrame:
    recs = []
    p = os.path.join(MAP_DIR, "user", "hover", "country", "india", "state")
    for state in os.listdir(p):
        sp = os.path.join(p, state)
        for year in os.listdir(sp):
            yp = os.path.join(sp, year)
            for file in os.listdir(yp):
                q = int(file.strip(".json"))
                with open(os.path.join(yp, file), "r") as f:
                    data = json.load(f)
                hd = data["data"].get("hoverData") or {}
                for name, item in hd.items():
                    recs.append({
                        "State": clean_state(state), "Year": int(year), "Quarter": q,
                        "Name": name,
                        "Registered_Users": item["registeredUsers"],
                        "App_Opens": item.get("appOpens")
                    })
    return pd.DataFrame(recs)

def process_map_insurance() -> pd.DataFrame:
    recs = []
    p = os.path.join(MAP_DIR, "insurance", "hover", "country", "india", "state")
    for state in os.listdir(p):
        sp = os.path.join(p, state)
        for year in os.listdir(sp):
            yp = os.path.join(sp, year)
            for file in os.listdir(yp):
                q = int(file.strip(".json"))
                with open(os.path.join(yp, file), "r") as f:
                    data = json.load(f)
                for e in (data["data"].get("hoverDataList") or []):
                    recs.append({
                        "State": clean_state(state), "Year": int(year), "Quarter": q,
                        "Name": e["name"],
                        "Insurance_Count": e["metric"][0]["count"],
                        "Insurance_Amount": e["metric"][0].get("amount")
                    })
    return pd.DataFrame(recs)

df_map_transaction = process_map_transaction()
df_map_user        = process_map_user()
df_map_insurance   = process_map_insurance()

print("map_txn:", len(df_map_transaction), "map_user:", len(df_map_user), "map_ins:", len(df_map_insurance))
display(df_map_transaction.head()), 
display(df_map_user.head()), 
display(df_map_insurance.head())


map_txn: 20604 map_user: 20608 map_ins: 13876


Unnamed: 0,State,Year,Quarter,Name,Transaction_Count,Transaction_Amount
0,Andaman And Nicobar,2018,1,north and middle andaman district,442,931663.1
1,Andaman And Nicobar,2018,1,south andaman district,5688,12560250.0
2,Andaman And Nicobar,2018,1,nicobars district,528,1139849.0
3,Andaman And Nicobar,2018,2,north and middle andaman district,825,1317863.0
4,Andaman And Nicobar,2018,2,south andaman district,9395,23948240.0


Unnamed: 0,State,Year,Quarter,Name,Registered_Users,App_Opens
0,Andaman And Nicobar,2018,1,north and middle andaman district,632,0
1,Andaman And Nicobar,2018,1,south andaman district,5846,0
2,Andaman And Nicobar,2018,1,nicobars district,262,0
3,Andaman And Nicobar,2018,2,north and middle andaman district,911,0
4,Andaman And Nicobar,2018,2,south andaman district,8143,0


Unnamed: 0,State,Year,Quarter,Name,Insurance_Count,Insurance_Amount
0,Andaman And Nicobar,2020,2,south andaman district,3,795.0
1,Andaman And Nicobar,2020,2,nicobars district,3,565.0
2,Andaman And Nicobar,2020,3,north and middle andaman district,1,281.0
3,Andaman And Nicobar,2020,3,south andaman district,35,13651.0
4,Andaman And Nicobar,2020,3,nicobars district,5,1448.0


In [23]:
def _nonnull(s):
    """Return True if s is a non-empty, non-'none'/'nan' string."""
    if s is None:
        return False
    s = str(s).strip()
    return s != "" and s.lower() not in {"none", "nan"}

def process_top_transaction() -> pd.DataFrame:
    rows = []
    p = os.path.join(TOP_DIR, "transaction", "country", "india", "state")
    for state in os.listdir(p):
        sp = os.path.join(p, state)
        for year in os.listdir(sp):
            yp = os.path.join(sp, year)
            for file in os.listdir(yp):
                q = int(file.strip(".json"))
                with open(os.path.join(yp, file), "r") as f:
                    data = json.load(f)
                for key, etype in (("states","State"), ("districts","District"), ("pincodes","Pincode")):
                    for e in (data["data"].get(key) or []):
                        name = e.get("entityName")
                        if not _nonnull(name):
                            continue   
                        rows.append({
                            "State": clean_state(state),
                            "Year": int(year),
                            "Quarter": q,
                            "Entity_Type": etype,
                            "Entity_Name": name.strip(),
                            "Count": e["metric"]["count"],
                            "Amount": e["metric"].get("amount")
                        })
    return pd.DataFrame(rows)

def process_top_user() -> pd.DataFrame:
    rows = []
    p = os.path.join(TOP_DIR, "user", "country", "india", "state")
    for state in os.listdir(p):
        sp = os.path.join(p, state)
        for year in os.listdir(sp):
            yp = os.path.join(sp, year)
            for file in os.listdir(yp):
                q = int(file.strip(".json"))
                with open(os.path.join(yp, file), "r") as f:
                    data = json.load(f)
                for key, etype in (("districts","District"), ("pincodes","Pincode")):
                    for e in (data["data"].get(key) or []):
                        name = e.get("name")
                        if not _nonnull(name):
                            continue
                        rows.append({
                            "State": clean_state(state),
                            "Year": int(year),
                            "Quarter": q,
                            "Entity_Type": etype,
                            "Entity_Name": name.strip(),
                            "Registered_Users": e["registeredUsers"]
                        })
    return pd.DataFrame(rows)

def process_top_insurance() -> pd.DataFrame:
    rows = []
    p = os.path.join(TOP_DIR, "insurance", "country", "india", "state")
    for state in os.listdir(p):
        sp = os.path.join(p, state)
        for year in os.listdir(sp):
            yp = os.path.join(sp, year)
            for file in os.listdir(yp):
                q = int(file.strip(".json"))
                with open(os.path.join(yp, file), "r") as f:
                    data = json.load(f)
                for key, etype in (("states","State"), ("districts","District"), ("pincodes","Pincode")):
                    for e in (data["data"].get(key) or []):
                        name = e.get("entityName")
                        if not _nonnull(name):
                            continue
                        rows.append({
                            "State": clean_state(state),
                            "Year": int(year),
                            "Quarter": q,
                            "Entity_Type": etype,
                            "Entity_Name": name.strip(),
                            "Insurance_Count": e["metric"]["count"],
                            "Insurance_Amount": e["metric"].get("amount")
                        })
    return pd.DataFrame(rows)

df_top_transaction = process_top_transaction()
df_top_user        = process_top_user()
df_top_insurance   = process_top_insurance()

print("Top -> txn:", len(df_top_transaction),
      " user:", len(df_top_user),
      " ins:", len(df_top_insurance))
display(df_top_transaction.head())
display(df_top_user.head())
display(df_top_insurance.head())


Top -> txn: 18293  user: 18296  ins: 12273


Unnamed: 0,State,Year,Quarter,Entity_Type,Entity_Name,Count,Amount
0,Andaman And Nicobar,2018,1,District,south andaman,5688,12560250.0
1,Andaman And Nicobar,2018,1,District,nicobars,528,1139849.0
2,Andaman And Nicobar,2018,1,District,north and middle andaman,442,931663.1
3,Andaman And Nicobar,2018,1,Pincode,744101,1622,2769298.0
4,Andaman And Nicobar,2018,1,Pincode,744103,1223,2238042.0


Unnamed: 0,State,Year,Quarter,Entity_Type,Entity_Name,Registered_Users
0,Andaman And Nicobar,2018,1,District,south andaman,5846
1,Andaman And Nicobar,2018,1,District,north and middle andaman,632
2,Andaman And Nicobar,2018,1,District,nicobars,262
3,Andaman And Nicobar,2018,1,Pincode,744103,1608
4,Andaman And Nicobar,2018,1,Pincode,744101,1108


Unnamed: 0,State,Year,Quarter,Entity_Type,Entity_Name,Insurance_Count,Insurance_Amount
0,Andaman And Nicobar,2020,2,District,nicobars,3,565.0
1,Andaman And Nicobar,2020,2,District,south andaman,3,795.0
2,Andaman And Nicobar,2020,2,Pincode,744301,3,565.0
3,Andaman And Nicobar,2020,2,Pincode,744104,2,513.0
4,Andaman And Nicobar,2020,2,Pincode,744101,1,282.0


In [24]:
schema_statements = [
# Aggregated
"""
CREATE TABLE IF NOT EXISTS aggregated_user (
  state       TEXT NOT NULL,
  year        INT  NOT NULL,
  quarter     INT  NOT NULL,
  brand       TEXT NOT NULL,
  count       BIGINT NOT NULL,
  percentage  NUMERIC(10,6) NOT NULL,
  CONSTRAINT uq_aggregated_user UNIQUE (state, year, quarter, brand)
);
""",
"""
CREATE TABLE IF NOT EXISTS aggregated_transaction (
  state              TEXT NOT NULL,
  year               INT  NOT NULL,
  quarter            INT  NOT NULL,
  transaction_type   TEXT NOT NULL,
  transaction_count  BIGINT NOT NULL,
  transaction_amount NUMERIC(20,2),
  CONSTRAINT uq_aggregated_transaction UNIQUE (state, year, quarter, transaction_type)
);
""",
"""
CREATE TABLE IF NOT EXISTS aggregated_insurance (
  state            TEXT NOT NULL,
  year             INT  NOT NULL,
  quarter          INT  NOT NULL,
  insurance_type   TEXT NOT NULL,
  insurance_count  BIGINT NOT NULL,
  insurance_amount NUMERIC(20,2),
  CONSTRAINT uq_aggregated_insurance UNIQUE (state, year, quarter, insurance_type)
);
""",
# Map
"""
CREATE TABLE IF NOT EXISTS map_user (
  state             TEXT NOT NULL,
  year              INT  NOT NULL,
  quarter           INT  NOT NULL,
  name              TEXT NOT NULL,
  registered_users  BIGINT,
  app_opens         BIGINT,
  CONSTRAINT uq_map_user UNIQUE (state, year, quarter, name)
);
""",
"""
CREATE TABLE IF NOT EXISTS map_map (
  state               TEXT NOT NULL,
  year                INT  NOT NULL,
  quarter             INT  NOT NULL,
  name                TEXT NOT NULL,
  transaction_count   BIGINT,
  transaction_amount  NUMERIC(20,2),
  CONSTRAINT uq_map_map UNIQUE (state, year, quarter, name)
);
""",
"""
CREATE TABLE IF NOT EXISTS map_insurance (
  state            TEXT NOT NULL,
  year             INT  NOT NULL,
  quarter          INT  NOT NULL,
  name             TEXT NOT NULL,
  insurance_count  BIGINT,
  insurance_amount NUMERIC(20,2),
  CONSTRAINT uq_map_insurance UNIQUE (state, year, quarter, name)
);
""",
# Top
"""
CREATE TABLE IF NOT EXISTS top_user (
  state             TEXT NOT NULL,
  year              INT  NOT NULL,
  quarter           INT  NOT NULL,
  entity_type       TEXT NOT NULL,
  entity_name       TEXT NOT NULL,
  registered_users  BIGINT,
  CONSTRAINT uq_top_user UNIQUE (state, year, quarter, entity_type, entity_name)
);
""",
"""
CREATE TABLE IF NOT EXISTS top_map (
  state       TEXT NOT NULL,
  year        INT  NOT NULL,
  quarter     INT  NOT NULL,
  entity_type TEXT NOT NULL,
  entity_name TEXT NOT NULL,
  count       BIGINT,
  amount      NUMERIC(20,2),
  CONSTRAINT uq_top_map UNIQUE (state, year, quarter, entity_type, entity_name)
);
""",
"""
CREATE TABLE IF NOT EXISTS top_insurance (
  state             TEXT NOT NULL,
  year              INT  NOT NULL,
  quarter           INT  NOT NULL,
  entity_type       TEXT NOT NULL,
  entity_name       TEXT NOT NULL,
  insurance_count   BIGINT,
  insurance_amount  NUMERIC(20,2),
  CONSTRAINT uq_top_insurance UNIQUE (state, year, quarter, entity_type, entity_name)
);
"""
]

index_statements = [
"CREATE INDEX IF NOT EXISTS idx_agg_user_syq ON aggregated_user (state, year, quarter);",
"CREATE INDEX IF NOT EXISTS idx_agg_txn_syq  ON aggregated_transaction (state, year, quarter);",
"CREATE INDEX IF NOT EXISTS idx_agg_ins_syq  ON aggregated_insurance (state, year, quarter);",
"CREATE INDEX IF NOT EXISTS idx_map_user_syq ON map_user (state, year, quarter);",
"CREATE INDEX IF NOT EXISTS idx_map_map_syq  ON map_map  (state, year, quarter);",
"CREATE INDEX IF NOT EXISTS idx_map_ins_syq  ON map_insurance (state, year, quarter);",
"CREATE INDEX IF NOT EXISTS idx_top_user_syq ON top_user (state, year, quarter);",
"CREATE INDEX IF NOT EXISTS idx_top_map_syq  ON top_map  (state, year, quarter);",
"CREATE INDEX IF NOT EXISTS idx_top_ins_syq  ON top_insurance (state, year, quarter);",
]

conn = psycopg2.connect(host=PG_HOST, dbname=PG_DB, user=PG_USER, password=PG_PASS, port=PG_PORT)
with conn:
    with conn.cursor() as cur:
        for s in schema_statements:
            cur.execute(s)
        for s in index_statements:
            cur.execute(s)
print("Schema created with unique keys and indexes.")


Schema created with unique keys and indexes.


In [25]:
with psycopg2.connect(host=PG_HOST, dbname=PG_DB, user=PG_USER, password=PG_PASS, port=PG_PORT) as conn:
    with conn.cursor() as cur:
        # aggregated_transaction
        sql = """
        INSERT INTO aggregated_transaction
        (state, year, quarter, transaction_type, transaction_count, transaction_amount)
        VALUES %s
        ON CONFLICT (state, year, quarter, transaction_type)
        DO UPDATE SET
          transaction_count = EXCLUDED.transaction_count,
          transaction_amount = EXCLUDED.transaction_amount;
        """
        rows = [
            (r["State"], int(r["Year"]), int(r["Quarter"]),
             r["Transaction_Type"], int(r["Transaction_Count"]),
             (float(r["Transaction_Amount"]) if pd.notna(r["Transaction_Amount"]) else None))
            for _, r in df_aggregated_transaction.iterrows()
        ]
        if rows:
            execute_values(cur, sql, rows, page_size=2000)

        # aggregated_user
        sql = """
        INSERT INTO aggregated_user
        (state, year, quarter, brand, count, percentage)
        VALUES %s
        ON CONFLICT (state, year, quarter, brand)
        DO UPDATE SET
          count = EXCLUDED.count,
          percentage = EXCLUDED.percentage;
        """
        rows = [
            (r["State"], int(r["Year"]), int(r["Quarter"]),
             r["Brand"], int(r["Count"]),
             (float(r["Percentage"]) if pd.notna(r["Percentage"]) else None))
            for _, r in df_aggregated_user.iterrows()
        ]
        if rows:
            execute_values(cur, sql, rows, page_size=2000)

        # aggregated_insurance
        sql = """
        INSERT INTO aggregated_insurance
        (state, year, quarter, insurance_type, insurance_count, insurance_amount)
        VALUES %s
        ON CONFLICT (state, year, quarter, insurance_type)
        DO UPDATE SET
          insurance_count = EXCLUDED.insurance_count,
          insurance_amount = EXCLUDED.insurance_amount;
        """
        rows = [
            (r["State"], int(r["Year"]), int(r["Quarter"]),
             r["Insurance_Type"], int(r["Insurance_Count"]),
             (float(r["Insurance_Amount"]) if pd.notna(r["Insurance_Amount"]) else None))
            for _, r in df_aggregated_insurance.iterrows()
        ]
        if rows:
            execute_values(cur, sql, rows, page_size=2000)

        # map_map (transaction map)
        sql = """
        INSERT INTO map_map
        (state, year, quarter, name, transaction_count, transaction_amount)
        VALUES %s
        ON CONFLICT (state, year, quarter, name)
        DO UPDATE SET
          transaction_count = EXCLUDED.transaction_count,
          transaction_amount = EXCLUDED.transaction_amount;
        """
        rows = [
            (r["State"], int(r["Year"]), int(r["Quarter"]), r["Name"],
             int(r["Transaction_Count"]),
             (float(r["Transaction_Amount"]) if pd.notna(r["Transaction_Amount"]) else None))
            for _, r in df_map_transaction.iterrows()
        ]
        if rows:
            execute_values(cur, sql, rows, page_size=2000)

        # map_user
        sql = """
        INSERT INTO map_user
        (state, year, quarter, name, registered_users, app_opens)
        VALUES %s
        ON CONFLICT (state, year, quarter, name)
        DO UPDATE SET
          registered_users = EXCLUDED.registered_users,
          app_opens = EXCLUDED.app_opens;
        """
        rows = [
            (r["State"], int(r["Year"]), int(r["Quarter"]), r["Name"],
             int(r["Registered_Users"]),
             (int(r["App_Opens"]) if pd.notna(r["App_Opens"]) else None))
            for _, r in df_map_user.iterrows()
        ]
        if rows:
            execute_values(cur, sql, rows, page_size=2000)

        # map_insurance
        sql = """
        INSERT INTO map_insurance
        (state, year, quarter, name, insurance_count, insurance_amount)
        VALUES %s
        ON CONFLICT (state, year, quarter, name)
        DO UPDATE SET
          insurance_count = EXCLUDED.insurance_count,
          insurance_amount = EXCLUDED.insurance_amount;
        """
        rows = [
            (r["State"], int(r["Year"]), int(r["Quarter"]), r["Name"],
             int(r["Insurance_Count"]),
             (float(r["Insurance_Amount"]) if pd.notna(r["Insurance_Amount"]) else None))
            for _, r in df_map_insurance.iterrows()
        ]
        if rows:
            execute_values(cur, sql, rows, page_size=2000)

        # top_map (transaction top)
        sql = """
        INSERT INTO top_map
        (state, year, quarter, entity_type, entity_name, count, amount)
        VALUES %s
        ON CONFLICT (state, year, quarter, entity_type, entity_name)
        DO UPDATE SET
          count = EXCLUDED.count,
          amount = EXCLUDED.amount;
        """
        rows = [
            (r["State"], int(r["Year"]), int(r["Quarter"]), r["Entity_Type"], r["Entity_Name"],
             int(r["Count"]), (float(r["Amount"]) if pd.notna(r["Amount"]) else None))
            for _, r in df_top_transaction.iterrows()
        ]
        if rows:
            execute_values(cur, sql, rows, page_size=2000)

        # top_user
        sql = """
        INSERT INTO top_user
        (state, year, quarter, entity_type, entity_name, registered_users)
        VALUES %s
        ON CONFLICT (state, year, quarter, entity_type, entity_name)
        DO UPDATE SET
          registered_users = EXCLUDED.registered_users;
        """
        rows = [
            (r["State"], int(r["Year"]), int(r["Quarter"]), r["Entity_Type"], r["Entity_Name"],
             int(r["Registered_Users"]))
            for _, r in df_top_user.iterrows()
        ]
        if rows:
            execute_values(cur, sql, rows, page_size=2000)

        # top_insurance
        sql = """
        INSERT INTO top_insurance
        (state, year, quarter, entity_type, entity_name, insurance_count, insurance_amount)
        VALUES %s
        ON CONFLICT (state, year, quarter, entity_type, entity_name)
        DO UPDATE SET
          insurance_count = EXCLUDED.insurance_count,
          insurance_amount = EXCLUDED.insurance_amount;
        """
        rows = [
            (r["State"], int(r["Year"]), int(r["Quarter"]), r["Entity_Type"], r["Entity_Name"],
             int(r["Insurance_Count"]), (float(r["Insurance_Amount"]) if pd.notna(r["Insurance_Amount"]) else None))
            for _, r in df_top_insurance.iterrows()
        ]
        if rows:
            execute_values(cur, sql, rows, page_size=2000)

print("UPSERT complete for all tables.")


UPSERT complete for all tables.


In [26]:
from sqlalchemy import create_engine

engine = create_engine(f"postgresql+psycopg2://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}")

q = """
SELECT 'aggregated_transaction' AS table, COUNT(*) AS rows FROM aggregated_transaction
UNION ALL SELECT 'aggregated_user', COUNT(*) FROM aggregated_user
UNION ALL SELECT 'aggregated_insurance', COUNT(*) FROM aggregated_insurance
UNION ALL SELECT 'map_map', COUNT(*) FROM map_map
UNION ALL SELECT 'map_user', COUNT(*) FROM map_user
UNION ALL SELECT 'map_insurance', COUNT(*) FROM map_insurance
UNION ALL SELECT 'top_map', COUNT(*) FROM top_map
UNION ALL SELECT 'top_user', COUNT(*) FROM top_user
UNION ALL SELECT 'top_insurance', COUNT(*) FROM top_insurance;
"""
pd.read_sql(q, engine)


Unnamed: 0,table,rows
0,aggregated_insurance,682
1,aggregated_transaction,5034
2,aggregated_user,6732
3,top_insurance,12273
4,map_insurance,13876
5,top_map,18293
6,top_user,18296
7,map_map,20604
8,map_user,20608


In [27]:
pd.read_sql("SELECT * FROM aggregated_transaction LIMIT 5;", engine)
pd.read_sql("SELECT * FROM aggregated_user LIMIT 5;", engine)
pd.read_sql("SELECT * FROM top_map ORDER BY amount DESC NULLS LAST LIMIT 5;", engine)


Unnamed: 0,state,year,quarter,entity_type,entity_name,count,amount
0,Karnataka,2024,4,District,bengaluru urban,1674347983,1790144000000.0
1,Telangana,2022,4,District,hyderabad,1133829812,1753799000000.0
2,Karnataka,2024,3,District,bengaluru urban,1631451937,1750492000000.0
3,Karnataka,2024,2,District,bengaluru urban,1497173080,1626461000000.0
4,Telangana,2022,3,District,hyderabad,1018658174,1586047000000.0


Another cell


In [28]:
# Lock DB settings and show row counts for all 9 tables
from sqlalchemy import create_engine


engine = create_engine(f"postgresql+psycopg2://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}")

counts = pd.read_sql("""
SELECT 'aggregated_transaction' AS table, COUNT(*) AS rows FROM aggregated_transaction
UNION ALL SELECT 'aggregated_user', COUNT(*) FROM aggregated_user
UNION ALL SELECT 'aggregated_insurance', COUNT(*) FROM aggregated_insurance
UNION ALL SELECT 'map_map', COUNT(*) FROM map_map
UNION ALL SELECT 'map_user', COUNT(*) FROM map_user
UNION ALL SELECT 'map_insurance', COUNT(*) FROM map_insurance
UNION ALL SELECT 'top_map', COUNT(*) FROM top_map
UNION ALL SELECT 'top_user', COUNT(*) FROM top_user
UNION ALL SELECT 'top_insurance', COUNT(*) FROM top_insurance;
""", engine)

display(counts)


Unnamed: 0,table,rows
0,aggregated_insurance,682
1,aggregated_transaction,5034
2,aggregated_user,6732
3,top_insurance,12273
4,map_insurance,13876
5,top_map,18293
6,top_user,18296
7,map_map,20604
8,map_user,20608


In [29]:
engine = create_engine(f"postgresql+psycopg2://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}")

# Latest available Y/Q in your data
latest = pd.read_sql("SELECT MAX(year) AS y, MAX(quarter) AS q FROM aggregated_transaction;", engine).iloc[0]
YEAR = int(latest["y"]); QUARTER = int(latest["q"])
print("Using:", YEAR, f"Q{QUARTER}")

Using: 2024 Q4


In [30]:


# --- CS1A: Top states by transaction amount (latest quarter)
q1 = """
WITH latest AS (SELECT MAX(year) y, MAX(quarter) q FROM aggregated_transaction)
SELECT state, year, quarter,
       SUM(transaction_amount)::numeric(20,2) AS amount,
       SUM(transaction_count) AS txns
FROM aggregated_transaction, latest
WHERE year = latest.y AND quarter = latest.q
GROUP BY state, year, quarter
ORDER BY amount DESC NULLS LAST
LIMIT 15;
"""
df1 = pd.read_sql(q1, engine)
print("\nCS1A — Top states by amount (latest):"); display(df1)

# --- CS1B: YoY growth by state (latest quarter vs same quarter prev year)
q2 = """
WITH latest AS (SELECT MAX(year) y, MAX(quarter) q FROM aggregated_transaction),
base AS (
  SELECT state, year, quarter, SUM(transaction_amount) AS amt
  FROM aggregated_transaction
  GROUP BY state, year, quarter
)
SELECT cur.state, cur.year, cur.quarter,
       cur.amt AS cur_amount, prev.amt AS prev_amount,
       ROUND(((cur.amt - prev.amt) / NULLIF(prev.amt,0)) * 100, 2) AS yoy_pct
FROM latest
JOIN base cur ON cur.year = latest.y AND cur.quarter = latest.q
LEFT JOIN base prev ON prev.state = cur.state
                   AND prev.year  = cur.year - 1
                   AND prev.quarter = cur.quarter
ORDER BY yoy_pct DESC NULLS LAST
LIMIT 15;
"""
df2 = pd.read_sql(q2, engine)
print("\nCS1B — YoY growth by state (latest vs LY same Q):"); display(df2)

# --- CS2A: Brand dominance (latest quarter)
q3 = """
WITH latest AS (SELECT MAX(year) y, MAX(quarter) q FROM aggregated_user)
SELECT brand, SUM(count) AS users, ROUND(AVG(percentage)*100, 2) AS avg_share_pct
FROM aggregated_user, latest
WHERE year = latest.y AND quarter = latest.q
GROUP BY brand
ORDER BY users DESC;
"""
df3 = pd.read_sql(q3, engine)
print("\nCS2A — Brand dominance (latest):"); display(df3)

# --- CS2B: Engagement — app opens per user by state (chosen Y/Q)
q4 = f"""
SELECT state,
       SUM(registered_users) AS reg_users,
       SUM(app_opens) AS app_opens,
       ROUND(SUM(app_opens)::numeric / NULLIF(SUM(registered_users),0), 2) AS opens_per_user
FROM map_user
WHERE year = {YEAR} AND quarter = {QUARTER}
GROUP BY state
ORDER BY opens_per_user DESC NULLS LAST
LIMIT 20;
"""
df4 = pd.read_sql(q4, engine)
print(f"\nCS2B — Engagement (opens per user) — Q{QUARTER} {YEAR}:"); display(df4)

# --- CS3: Insurance penetration (latest quarter)
q5 = """
WITH latest AS (SELECT MAX(year) y, MAX(quarter) q FROM aggregated_insurance)
SELECT state,
       SUM(insurance_amount)::numeric(20,2) AS amount,
       SUM(insurance_count) AS cnt
FROM aggregated_insurance, latest
WHERE year = latest.y AND quarter = latest.q
GROUP BY state
ORDER BY amount DESC NULLS LAST
LIMIT 15;
"""
df5 = pd.read_sql(q5, engine)
print("\nCS3 — Insurance: top states by amount (latest):"); display(df5)

# --- CS4: Top geographies (transactions) for chosen Y/Q
q6 = f"""
SELECT entity_type, entity_name, state,
       SUM(count) AS txns, SUM(amount)::numeric(20,2) AS amount
FROM top_map
WHERE year = {YEAR} AND quarter = {QUARTER}
GROUP BY entity_type, entity_name, state
ORDER BY amount DESC NULLS LAST
LIMIT 25;
"""
df6 = pd.read_sql(q6, engine)
print(f"\nCS4 — Top geographies by amount — Q{QUARTER} {YEAR}:"); display(df6)

# --- CS5: Top user registrations for chosen Y/Q
q7 = f"""
SELECT entity_type, entity_name, state,
       SUM(registered_users) AS users
FROM top_user
WHERE year = {YEAR} AND quarter = {QUARTER}
GROUP BY entity_type, entity_name, state
ORDER BY users DESC
LIMIT 25;
"""
df7 = pd.read_sql(q7, engine)
print(f"\nCS5 — Top user registrations — Q{QUARTER} {YEAR}:"); display(df7)



CS1A — Top states by amount (latest):


Unnamed: 0,state,year,quarter,amount,txns
0,Karnataka,2024,4,4151820000000.0,3455056000.0
1,Maharashtra,2024,4,4151028000000.0,3634232000.0
2,Telangana,2024,4,3859051000000.0,2822452000.0
3,Andhra Pradesh,2024,4,3452016000000.0,2296446000.0
4,Uttar Pradesh,2024,4,3214107000000.0,2544890000.0
5,Rajasthan,2024,4,2793049000000.0,2043470000.0
6,Bihar,2024,4,2098927000000.0,1559668000.0
7,Madhya Pradesh,2024,4,2003305000000.0,1685402000.0
8,West Bengal,2024,4,1810100000000.0,1227136000.0
9,Odisha,2024,4,1305458000000.0,1143098000.0



CS1B — YoY growth by state (latest vs LY same Q):


Unnamed: 0,state,year,quarter,cur_amount,prev_amount,yoy_pct
0,Lakshadweep,2024,4,247756700.0,119831400.0,106.75
1,Manipur,2024,4,12061710000.0,6800325000.0,77.37
2,Jammu & Kashmir,2024,4,160951000000.0,93070140000.0,72.94
3,Ladakh,2024,4,10854420000.0,6942987000.0,56.34
4,Sikkim,2024,4,12641100000.0,8590608000.0,47.15
5,Bihar,2024,4,2098927000000.0,1434736000000.0,46.29
6,Arunachal Pradesh,2024,4,35079890000.0,24128800000.0,45.39
7,West Bengal,2024,4,1810100000000.0,1249395000000.0,44.88
8,Nagaland,2024,4,13959100000.0,9674575000.0,44.29
9,Chhattisgarh,2024,4,576828400000.0,403985000000.0,42.78



CS2A — Brand dominance (latest):


Unnamed: 0,brand,users,avg_share_pct



CS2B — Engagement (opens per user) — Q4 2024:


Unnamed: 0,state,reg_users,app_opens,opens_per_user
0,Lakshadweep,11769.0,1797824.0,152.76
1,Andaman And Nicobar,178788.0,18691680.0,104.55
2,Jharkhand,12212421.0,1018665000.0,83.41
3,Chhattisgarh,10651135.0,869239300.0,81.61
4,Gujarat,29861641.0,2406424000.0,80.59
5,Meghalaya,571272.0,44407770.0,77.73
6,Bihar,33345285.0,2527843000.0,75.81
7,Madhya Pradesh,32876660.0,2492104000.0,75.8
8,Rajasthan,36828375.0,2764744000.0,75.07
9,Assam,10480927.0,775169500.0,73.96



CS3 — Insurance: top states by amount (latest):


Unnamed: 0,state,amount,cnt
0,Karnataka,305861806.0,183532.0
1,Maharashtra,266942711.0,168078.0
2,Uttar Pradesh,199179867.0,116589.0
3,Tamil Nadu,173784251.0,127002.0
4,Kerala,144083113.0,89533.0
5,Rajasthan,127930986.0,73530.0
6,Telangana,124755619.0,78498.0
7,West Bengal,120602777.0,91719.0
8,Madhya Pradesh,101096769.0,59688.0
9,Delhi,94390728.0,67962.0



CS4 — Top geographies by amount — Q4 2024:


Unnamed: 0,entity_type,entity_name,state,txns,amount
0,District,bengaluru urban,Karnataka,1674348000.0,1790144000000.0
1,District,pune,Maharashtra,900035100.0,817012200000.0
2,District,rangareddy,Telangana,616593600.0,775467300000.0
3,District,medchal malkajgiri,Telangana,534184900.0,651550000000.0
4,District,hyderabad,Telangana,497079400.0,601741700000.0
5,District,jaipur,Rajasthan,419541100.0,484830500000.0
6,District,visakhapatnam,Andhra Pradesh,229565000.0,302371400000.0
7,District,nashik,Maharashtra,228252500.0,293636200000.0
8,District,indore,Madhya Pradesh,264106000.0,277133000000.0
9,District,gurugram,Haryana,301187500.0,265842900000.0



CS5 — Top user registrations — Q4 2024:


Unnamed: 0,entity_type,entity_name,state,users
0,District,bengaluru urban,Karnataka,18101416.0
1,District,pune,Maharashtra,12734314.0
2,District,thane,Maharashtra,7300941.0
3,District,mumbai suburban,Maharashtra,6875379.0
4,District,rangareddy,Telangana,5988320.0
5,District,hyderabad,Telangana,5916910.0
6,District,ahmedabad,Gujarat,5702922.0
7,District,chennai,Tamil Nadu,5541760.0
8,District,jaipur,Rajasthan,5371681.0
9,District,north twenty four parganas,West Bengal,5085095.0
