In [2]:
import json, os, glob
from sqlalchemy import create_engine, text
from dotenv import load_dotenv

# ---------------------- Load DB connection ----------------------
load_dotenv()

host = os.getenv("MYSQL_HOST", "localhost")
port = os.getenv("MYSQL_PORT", "3306")
db   = os.getenv("MYSQL_DB", "phonepe_pulse")
user = os.getenv("MYSQL_USER", "root")
pwd  = os.getenv("MYSQL_PWD", "1234qwert")

# ✅ Create SQLAlchemy engine
try:
    engine = create_engine(
        f"mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}",
        pool_recycle=3600
    )
    with engine.connect() as conn:
        version = conn.execute(text("SELECT VERSION()")).fetchone()
        print(f"✅ Connected to MySQL {version[0]} as {user}")
except Exception as e:
    print("❌ Could not connect to MySQL. Please check .env and credentials.")
    raise e

# ---------------------- Helpers ----------------------
PULSE_ROOT = os.path.join("data", "raw", "pulse", "data")

def upsert_dim_time(conn, year, quarter):
    res = conn.execute(text(
        "SELECT time_id FROM dim_time WHERE year=:y AND quarter=:q"
    ), {"y":year, "q":quarter}).fetchone()
    if res: return res[0]
    conn.execute(text("INSERT INTO dim_time(year,quarter) VALUES(:y,:q)"),
                 {"y":year,"q":quarter})
    return conn.execute(text("SELECT LAST_INSERT_ID()")).scalar()

def upsert_dim_state(conn, state):
    res = conn.execute(text(
        "SELECT state_id FROM dim_geo_state WHERE state_name=:s"
    ), {"s":state}).fetchone()
    if res: return res[0]
    conn.execute(text("INSERT INTO dim_geo_state(state_name) VALUES(:s)"),
                 {"s":state})
    return conn.execute(text("SELECT LAST_INSERT_ID()")).scalar()

def upsert_dim_district(conn, state_id, district):
    res = conn.execute(text("""
      SELECT district_id FROM dim_geo_district 
      WHERE state_id=:sid AND district_name=:d
    """), {"sid":state_id, "d":district}).fetchone()
    if res: return res[0]
    conn.execute(text("""
      INSERT INTO dim_geo_district(state_id,district_name) VALUES(:sid,:d)
    """), {"sid":state_id,"d":district})
    return conn.execute(text("SELECT LAST_INSERT_ID()")).scalar()

def safe_get(d, key, default=None):
    return d[key] if (isinstance(d, dict) and key in d) else default

# ---------------------- Loaders ----------------------
def load_aggregated_transactions(conn):
    print("📥 Loading aggregated transactions...")
    pattern = os.path.join(PULSE_ROOT, "aggregated", "transaction",
                           "country", "india", "state", "*", "*", "*.json")
    for fp in glob.glob(pattern):
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid = upsert_dim_state(conn, state)
        tid = upsert_dim_time(conn, year, quarter)

        tx_list = safe_get(js, "data", {}).get("transactionData", [])
        for t in tx_list:
            txn_type = safe_get(t, "name", "Unknown")
            instruments = safe_get(t, "paymentInstruments", [])
            total = next((i for i in instruments if i.get("type")=="TOTAL"), None)
            count  = int(total.get("count",0)) if total else 0
            amount = float(total.get("amount",0)) if total else 0.0
            conn.execute(text("""
                INSERT INTO f_agg_txn (state_id,time_id,txn_type,txn_count,txn_amount)
                VALUES (:sid,:tid,:tp,:ct,:am)
            """), {"sid":sid,"tid":tid,"tp":txn_type,"ct":count,"am":amount})

def load_map_transactions(conn):
    print("📥 Loading map transactions...")
    pattern = os.path.join(PULSE_ROOT, "map", "transaction", "hover",
                           "country", "india", "state", "*", "*", "*.json")
    for fp in glob.glob(pattern):
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid = upsert_dim_state(conn, state)
        tid = upsert_dim_time(conn, year, quarter)

        hd_list = safe_get(js, "data", {}).get("hoverDataList", None)
        if hd_list is None:
            hd = safe_get(js, "data", {}).get("hoverData", {})
            hd_list = [{"name":k, "metric":[v.get("metric", [0,0])[0],
                                            v.get("metric",[0,0])[1]]}
                       for k,v in hd.items()]
        for item in hd_list or []:
            district = safe_get(item, "name", "Unknown")
            met = safe_get(item, "metric", [0,0])
            count = int(met[0]) if len(met)>0 else 0
            amount = float(met[1]) if len(met)>1 else 0.0
            did = upsert_dim_district(conn, sid, district)
            conn.execute(text("""
                INSERT INTO f_map_txn_district (state_id,district_id,time_id,txn_count,txn_amount)
                VALUES (:sid,:did,:tid,:ct,:am)
            """), {"sid":sid,"did":did,"tid":tid,"ct":count,"am":amount})

def load_top_transactions(conn):
    print("📥 Loading top transactions...")
    pattern = os.path.join(PULSE_ROOT, "top", "transaction",
                           "country", "india", "state", "*", "*", "*.json")
    for fp in glob.glob(pattern):
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid = upsert_dim_state(conn, state)
        tid = upsert_dim_time(conn, year, quarter)

        districts = safe_get(js, "data", {}).get("districts", [])
        for d in districts:
            name = safe_get(d,"entityName","Unknown")
            metric = safe_get(d,"metric", {"count":0,"amount":0})
            conn.execute(text("""
                INSERT INTO f_top_txn(state_id,time_id,entity_type,entity_name,txn_count,txn_amount)
                VALUES(:sid,:tid,'district',:name,:ct,:am)
            """), {"sid":sid,"tid":tid,"name":name,
                   "ct":int(metric.get("count",0)),"am":float(metric.get("amount",0.0))})

        pincodes = safe_get(js, "data", {}).get("pincodes", [])
        for p in pincodes:
            name = str(safe_get(p,"entityName","000000"))
            metric = safe_get(p,"metric", {"count":0,"amount":0})
            conn.execute(text("""
                INSERT INTO f_top_txn(state_id,time_id,entity_type,entity_name,txn_count,txn_amount)
                VALUES(:sid,:tid,'pincode',:name,:ct,:am)
            """), {"sid":sid,"tid":tid,"name":name,
                   "ct":int(metric.get("count",0)),"am":float(metric.get("amount",0.0))})

def load_aggregated_users(conn):
    print("📥 Loading aggregated users...")
    pattern = os.path.join(PULSE_ROOT, "aggregated", "user",
                           "country", "india", "state", "*", "*", "*.json")
    for fp in glob.glob(pattern):
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid = upsert_dim_state(conn, state)
        tid = upsert_dim_time(conn, year, quarter)

        devices = safe_get(js, "data", {}).get("usersByDevice", [])
        for d in devices:
            brand = safe_get(d, "brand", "Unknown")
            count = int(safe_get(d, "count", 0) or 0)
            pct   = safe_get(d, "percentage", None)
            conn.execute(text("""
              INSERT INTO f_agg_user_device(state_id,time_id,brand,user_count,pct)
              VALUES(:sid,:tid,:br,:ct,:pc)
            """), {"sid":sid,"tid":tid,"br":brand,"ct":count,"pc":pct})

def load_map_users(conn):
    print("📥 Loading map users...")
    pattern = os.path.join(PULSE_ROOT, "map", "user", "hover",
                           "country", "india", "state", "*", "*", "*.json")
    for fp in glob.glob(pattern):
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid = upsert_dim_state(conn, state)
        tid = upsert_dim_time(conn, year, quarter)

        hd = safe_get(js, "data", {}).get("hoverData", {})
        for district, metrics in hd.items():
            reg = int(metrics.get("registeredUsers", 0) or 0)
            app = int(metrics.get("appOpens", 0) or 0)
            did = upsert_dim_district(conn, sid, district)
            conn.execute(text("""
              INSERT INTO f_map_user_district(state_id,district_id,time_id,registered_users,app_opens)
              VALUES(:sid,:did,:tid,:ru,:ao)
            """), {"sid":sid,"did":did,"tid":tid,"ru":reg,"ao":app})

def load_top_users(conn):
    print("📥 Loading top users...")
    pattern = os.path.join(PULSE_ROOT, "top", "user",
                           "country", "india", "state", "*", "*", "*.json")
    for fp in glob.glob(pattern):
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid = upsert_dim_state(conn, state)
        tid = upsert_dim_time(conn, year, quarter)

        pincodes = safe_get(js, "data", {}).get("pincodes", [])
        for p in pincodes:
            name = str(safe_get(p,"entityName","000000"))
            reg  = int(safe_get(p,"registeredUsers",0) or 0)
            conn.execute(text("""
              INSERT INTO f_top_user(state_id,time_id,pincode,registered_users)
              VALUES(:sid,:tid,:pc,:ru)
            """), {"sid":sid,"tid":tid,"pc":name,"ru":reg})

def load_insurance(conn):
    print("📥 Loading insurance data...")
    pattern = os.path.join(PULSE_ROOT, "aggregated", "insurance",
                           "country", "india", "state", "*", "*", "*.json")
    for fp in glob.glob(pattern):
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid = upsert_dim_state(conn, state)
        tid = upsert_dim_time(conn, year, quarter)

        ins_list = safe_get(js, "data", {}).get("insuranceData", [])
        for it in ins_list:
            ins_type = safe_get(it, "name", "Unknown")
            total = next((i for i in safe_get(it,"metrics",[]) if i.get("type")=="TOTAL"), None)
            count = int(total.get("count",0)) if total else 0
            amount = float(total.get("amount",0)) if total else 0.0
            conn.execute(text("""
              INSERT INTO f_agg_insurance(state_id,time_id,ins_type,ins_count,ins_amount)
              VALUES(:sid,:tid,:tp,:ct,:am)
            """), {"sid":sid,"tid":tid,"tp":ins_type,"ct":count,"am":amount})

# ---------------------- Main ----------------------
if __name__ == "__main__":
    try:
        with engine.begin() as conn:
            load_aggregated_transactions(conn)
            load_map_transactions(conn)
            load_top_transactions(conn)
            load_aggregated_users(conn)
            load_map_users(conn)
            load_top_users(conn)
            load_insurance(conn)
        print("🎉 PhonePe Pulse data successfully loaded into MySQL!")
    except Exception as e:
        print("❌ Error during ETL load")
        raise e


✅ Connected to MySQL 8.0.41 as root
📥 Loading aggregated transactions...
📥 Loading map transactions...
📥 Loading top transactions...
📥 Loading aggregated users...
📥 Loading map users...
📥 Loading top users...
📥 Loading insurance data...
🎉 PhonePe Pulse data successfully loaded into MySQL!


In [6]:
import json, os, glob
from sqlalchemy import create_engine, text
from dotenv import load_dotenv

# ------------------ Setup ------------------
load_dotenv()

host = os.getenv("MYSQL_HOST", "localhost")
port = os.getenv("MYSQL_PORT", "3306")
db   = os.getenv("MYSQL_DB", "phonepe_pulse")
user = os.getenv("MYSQL_USER", "root")
pwd  = os.getenv("MYSQL_PWD", "1234qwert")

# ✅ Correct root
PULSE_ROOT = "data"

# ✅ Create engine
engine = create_engine(
    f"mysql+pymysql://{user}:{pwd}@{host}:{port}/{db}",
    pool_recycle=3600
)

# ------------------ Helpers ------------------
def safe_get(d, key, default=None):
    return d[key] if (isinstance(d, dict) and key in d) else default

def parse_metric(met):
    """Handle dict, list, and list-of-dict metric formats."""
    if isinstance(met, dict):  # {"count":123,"amount":456.78}
        count = int(met.get("count", 0) or 0)
        amount = float(met.get("amount", 0.0) or 0.0)

    elif isinstance(met, list):
        if len(met) == 0:
            return 0, 0.0
        if isinstance(met[0], dict):  # [{"count":..,"amount":..}]
            first = met[0]
            count = int(first.get("count", 0) or 0)
            amount = float(first.get("amount", 0.0) or 0.0)
        else:  # [123, 456.78]
            count = int(met[0]) if len(met) > 0 else 0
            amount = float(met[1]) if len(met) > 1 else 0.0
    else:
        count, amount = 0, 0.0
    return count, amount

def upsert_dim_time(conn, year, quarter):
    res = conn.execute(text(
        "SELECT time_id FROM dim_time WHERE year=:y AND quarter=:q"
    ), {"y":year, "q":quarter}).fetchone()
    if res: return res[0]
    conn.execute(text(
        "INSERT INTO dim_time(year,quarter) VALUES(:y,:q)"
    ), {"y":year,"q":quarter})
    return conn.execute(text("SELECT LAST_INSERT_ID()")).scalar()

def upsert_dim_state(conn, state):
    res = conn.execute(text(
        "SELECT state_id FROM dim_geo_state WHERE state_name=:s"
    ), {"s":state}).fetchone()
    if res: return res[0]
    conn.execute(text(
        "INSERT INTO dim_geo_state(state_name) VALUES(:s)"
    ), {"s":state})
    return conn.execute(text("SELECT LAST_INSERT_ID()")).scalar()

def upsert_dim_district(conn, state_id, district):
    res = conn.execute(text("""
      SELECT district_id FROM dim_geo_district 
      WHERE state_id=:sid AND district_name=:d
    """), {"sid":state_id, "d":district}).fetchone()
    if res: return res[0]
    conn.execute(text("""
      INSERT INTO dim_geo_district(state_id,district_name) VALUES(:sid,:d)
    """), {"sid":state_id,"d":district})
    return conn.execute(text("SELECT LAST_INSERT_ID()")).scalar()

# ------------------ Loaders ------------------
def load_aggregated_transactions(conn):
    pattern = os.path.join(PULSE_ROOT, "aggregated", "transaction",
                           "country", "india", "state", "*", "*", "*.json")
    files = glob.glob(pattern)
    print(f"📂 Aggregated Transactions: {len(files)} files")

    for fp in files:
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid, tid = upsert_dim_state(conn, state), upsert_dim_time(conn, year, quarter)

        tx_list = safe_get(js, "data", {}).get("transactionData") or []
        for t in tx_list:
            txn_type = safe_get(t, "name", "Unknown")
            instruments = safe_get(t, "paymentInstruments") or []
            total = next((i for i in instruments if i.get("type")=="TOTAL"), None)
            count, amount = (int(total.get("count",0)), float(total.get("amount",0))) if total else (0,0.0)
            conn.execute(text("""
                INSERT INTO f_agg_txn (state_id,time_id,txn_type,txn_count,txn_amount)
                VALUES (:sid,:tid,:tp,:ct,:am)
            """), {"sid":sid,"tid":tid,"tp":txn_type,"ct":count,"am":amount})
        print(f"   ✅ {state} {year} Q{quarter} → {len(tx_list)} records")

def load_map_transactions(conn):
    pattern = os.path.join(PULSE_ROOT, "map", "transaction", "hover",
                           "country", "india", "state", "*", "*", "*.json")
    files = glob.glob(pattern)
    print(f"📂 Map Transactions: {len(files)} files")

    for fp in files:
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid, tid = upsert_dim_state(conn, state), upsert_dim_time(conn, year, quarter)

        hd_list = safe_get(js, "data", {}).get("hoverDataList")
        if not hd_list:
            hd = safe_get(js, "data", {}).get("hoverData") or {}
            hd_list = [{"name":k, "metric":v.get("metric",{})} for k,v in hd.items()]

        for item in hd_list or []:
            district = safe_get(item, "name", "Unknown")
            count, amount = parse_metric(safe_get(item, "metric", {}))
            did = upsert_dim_district(conn, sid, district)
            conn.execute(text("""
                INSERT INTO f_map_txn_district (state_id,district_id,time_id,txn_count,txn_amount)
                VALUES (:sid,:did,:tid,:ct,:am)
            """), {"sid":sid,"did":did,"tid":tid,"ct":count,"am":amount})
        print(f"   ✅ {state} {year} Q{quarter} → {len(hd_list or [])} records")

def load_top_transactions(conn):
    pattern = os.path.join(PULSE_ROOT, "top", "transaction", "country", "india", "state", "*", "*", "*.json")
    files = glob.glob(pattern)
    print(f"📂 Top Transactions: {len(files)} files")

    for fp in files:
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid, tid = upsert_dim_state(conn, state), upsert_dim_time(conn, year, quarter)

        districts = safe_get(js, "data", {}).get("districts") or []
        for d in districts:
            name = safe_get(d,"entityName","Unknown")
            count, amount = parse_metric(safe_get(d,"metric", {}))
            conn.execute(text("""
                INSERT INTO f_top_txn(state_id,time_id,entity_type,entity_name,txn_count,txn_amount)
                VALUES(:sid,:tid,'district',:name,:ct,:am)
            """), {"sid":sid,"tid":tid,"name":name,"ct":count,"am":amount})

        pincodes = safe_get(js, "data", {}).get("pincodes") or []
        for p in pincodes:
            name = str(safe_get(p,"entityName","000000"))
            count, amount = parse_metric(safe_get(p,"metric", {}))
            conn.execute(text("""
                INSERT INTO f_top_txn(state_id,time_id,entity_type,entity_name,txn_count,txn_amount)
                VALUES(:sid,:tid,'pincode',:name,:ct,:am)
            """), {"sid":sid,"tid":tid,"name":name,"ct":count,"am":amount})
        print(f"   ✅ {state} {year} Q{quarter} → {len(districts)+len(pincodes)} records")

def load_aggregated_users(conn):
    pattern = os.path.join(PULSE_ROOT, "aggregated", "user", "country", "india", "state", "*", "*", "*.json")
    files = glob.glob(pattern)
    print(f"📂 Aggregated Users: {len(files)} files")

    for fp in files:
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid, tid = upsert_dim_state(conn, state), upsert_dim_time(conn, year, quarter)

        devices = safe_get(js, "data", {}).get("usersByDevice") or []
        for d in devices:
            brand = safe_get(d, "brand", "Unknown")
            count = int(safe_get(d, "count", 0) or 0)
            pct   = safe_get(d, "percentage", None)
            conn.execute(text("""
              INSERT INTO f_agg_user_device(state_id,time_id,brand,user_count,pct)
              VALUES(:sid,:tid,:br,:ct,:pc)
            """), {"sid":sid,"tid":tid,"br":brand,"ct":count,"pc":pct})
        print(f"   ✅ {state} {year} Q{quarter} → {len(devices)} devices")

def load_map_users(conn):
    pattern = os.path.join(PULSE_ROOT, "map", "user", "hover", "country", "india", "state", "*", "*", "*.json")
    files = glob.glob(pattern)
    print(f"📂 Map Users: {len(files)} files")

    for fp in files:
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid, tid = upsert_dim_state(conn, state), upsert_dim_time(conn, year, quarter)

        hd = safe_get(js, "data", {}).get("hoverData") or {}
        for district, metrics in hd.items():
            reg = int(metrics.get("registeredUsers", 0) or 0)
            app = int(metrics.get("appOpens", 0) or 0)
            did = upsert_dim_district(conn, sid, district)
            conn.execute(text("""
              INSERT INTO f_map_user_district(state_id,district_id,time_id,registered_users,app_opens)
              VALUES(:sid,:did,:tid,:ru,:ao)
            """), {"sid":sid,"did":did,"tid":tid,"ru":reg,"ao":app})
        print(f"   ✅ {state} {year} Q{quarter} → {len(hd)} districts")

def load_top_users(conn):
    pattern = os.path.join(PULSE_ROOT, "top", "user", "country", "india", "state", "*", "*", "*.json")
    files = glob.glob(pattern)
    print(f"📂 Top Users: {len(files)} files")

    for fp in files:
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid, tid = upsert_dim_state(conn, state), upsert_dim_time(conn, year, quarter)

        pincodes = safe_get(js, "data", {}).get("pincodes") or []
        for p in pincodes:
            name = str(safe_get(p,"entityName","000000"))
            reg  = int(safe_get(p,"registeredUsers",0) or 0)
            conn.execute(text("""
              INSERT INTO f_top_user(state_id,time_id,pincode,registered_users)
              VALUES(:sid,:tid,:pc,:ru)
            """), {"sid":sid,"tid":tid,"pc":name,"ru":reg})
        print(f"   ✅ {state} {year} Q{quarter} → {len(pincodes)} pincodes")

def load_insurance(conn):
    pattern = os.path.join(PULSE_ROOT, "aggregated", "insurance", "country", "india", "state", "*", "*", "*.json")
    files = glob.glob(pattern)
    print(f"📂 Insurance: {len(files)} files")

    for fp in files:
        with open(fp, "r") as f: js = json.load(f)
        parts = fp.replace("\\","/").split("/")
        state, year, quarter = parts[-4], int(parts[-2]), int(parts[-1].split(".")[0])
        sid, tid = upsert_dim_state(conn, state), upsert_dim_time(conn, year, quarter)

        ins_list = safe_get(js, "data", {}).get("insuranceData") or []
        for it in ins_list:
            ins_type = safe_get(it, "name", "Unknown")
            metrics = safe_get(it, "metrics") or []
            total = next((i for i in metrics if i.get("type")=="TOTAL"), None)
            count, amount = parse_metric(total) if total else (0,0.0)
            conn.execute(text("""
              INSERT INTO f_agg_insurance(state_id,time_id,ins_type,ins_count,ins_amount)
              VALUES(:sid,:tid,:tp,:ct,:am)
            """), {"sid":sid,"tid":tid,"tp":ins_type,"ct":count,"am":amount})
        print(f"   ✅ {state} {year} Q{quarter} → {len(ins_list)} records")

# ------------------ Main ------------------
if __name__ == "__main__":
    try:
        with engine.begin() as conn:
            load_aggregated_transactions(conn)
            load_map_transactions(conn)
            load_top_transactions(conn)
            load_aggregated_users(conn)
            load_map_users(conn)
            load_top_users(conn)
            load_insurance(conn)
        print("🎉 PhonePe Pulse data successfully loaded into MySQL!")
    except Exception as e:
        print("❌ Error during ETL load")
        raise e


📂 Aggregated Transactions: 1008 files
   ✅ state 2018 Q1 → 5 records
   ✅ state 2018 Q2 → 5 records
   ✅ state 2018 Q3 → 5 records
   ✅ state 2018 Q4 → 5 records
   ✅ state 2019 Q1 → 5 records
   ✅ state 2019 Q2 → 5 records
   ✅ state 2019 Q3 → 5 records
   ✅ state 2019 Q4 → 5 records
   ✅ state 2020 Q1 → 5 records
   ✅ state 2020 Q2 → 5 records
   ✅ state 2020 Q3 → 5 records
   ✅ state 2020 Q4 → 5 records
   ✅ state 2021 Q1 → 5 records
   ✅ state 2021 Q2 → 5 records
   ✅ state 2021 Q3 → 5 records
   ✅ state 2021 Q4 → 5 records
   ✅ state 2022 Q1 → 5 records
   ✅ state 2022 Q2 → 5 records
   ✅ state 2022 Q3 → 5 records
   ✅ state 2022 Q4 → 5 records
   ✅ state 2023 Q1 → 5 records
   ✅ state 2023 Q2 → 5 records
   ✅ state 2023 Q3 → 5 records
   ✅ state 2023 Q4 → 5 records
   ✅ state 2024 Q1 → 5 records
   ✅ state 2024 Q2 → 5 records
   ✅ state 2024 Q3 → 5 records
   ✅ state 2024 Q4 → 5 records
   ✅ state 2018 Q1 → 5 records
   ✅ state 2018 Q2 → 5 records
   ✅ state 2018 Q3 → 5 records
 