In [2]:
import os
import json
import pandas as pd

BASE_DIR = "pulse/data"

dataframes = {
    "aggregated_transaction": [],
    "aggregated_user": [],
    "aggregated_insurance": [],
    "map_transaction": [],
    "map_user": [],
    "map_insurance": [],
    "top_transaction": [],
    "top_user": [],
    "top_insurance": []
}

def get_state_year_quarter(filepath):
    parts = os.path.normpath(filepath).split(os.sep)
    if 'state' in parts:
        state_index = parts.index('state')
        state = parts[state_index + 1]
        year = int(parts[state_index + 2])
        quarter = int(parts[state_index + 3].replace('.json', ''))
    else:
        state = 'India'
        year = int(parts[-2])
        quarter = int(parts[-1].replace('.json', ''))
    return state, year, quarter

def process_json_files(base_path, callback):
    for root, dirs, files in os.walk(base_path):
        if '.ipynb_checkpoints' in root:
            continue
        for file in files:
            if file.endswith(".json"):
                file_path = os.path.join(root, file)
                try:
                    with open(file_path, 'r') as f:
                        data = json.load(f)
                    callback(file_path, data)
                except Exception as e:
                    print(f"Error reading {file_path}: {e}")

def parse_aggregated_transaction(path, data):
    state, year, quarter = get_state_year_quarter(path)
    for item in data.get("data", {}).get("transactionData", []):
        dataframes["aggregated_transaction"].append({
            "State": state,
            "Year": year,
            "Quarter": quarter,
            "Type": item["name"],
            "Count": item["paymentInstruments"][0]["count"],
            "Amount": item["paymentInstruments"][0]["amount"]
        })

def parse_aggregated_user(path, data):
    state, year, quarter = get_state_year_quarter(path)
    agg = data.get("data", {}).get("aggregated")
    if agg:
        dataframes["aggregated_user"].append({
            "State": state,
            "Year": year,
            "Quarter": quarter,
            "RegisteredUsers": agg["registeredUsers"],
            "AppOpens": agg["appOpens"]
        })

def parse_aggregated_insurance(path, data):
    state, year, quarter = get_state_year_quarter(path)
    for item in data.get("data", {}).get("transactionData", []):
        dataframes["aggregated_insurance"].append({
            "State": state,
            "Year": year,
            "Quarter": quarter,
            "Count": item["paymentInstruments"][0]["count"],
            "Amount": item["paymentInstruments"][0]["amount"]
        })

def parse_map_transaction(path, data):
    state, year, quarter = get_state_year_quarter(path)
    for entry in data.get("data", {}).get("hoverDataList", []):
        dataframes["map_transaction"].append({
            "State": state,
            "Region": entry["name"],
            "Year": year,
            "Quarter": quarter,
            "Count": entry["metric"][0]["count"],
            "Amount": entry["metric"][0]["amount"]
        })

def parse_map_user(path, data):
    state, year, quarter = get_state_year_quarter(path)
    for region, info in data.get("data", {}).get("hoverData", {}).items():
        dataframes["map_user"].append({
            "State": state,
            "Region": region,
            "Year": year,
            "Quarter": quarter,
            "RegisteredUsers": info["registeredUsers"],
            "AppOpens": info["appOpens"]
        })

def parse_map_insurance(path, data):
    state, year, quarter = get_state_year_quarter(path)
    for entry in data.get("data", {}).get("hoverDataList", []):
        dataframes["map_insurance"].append({
            "State": state,
            "Region": entry["name"],
            "Year": year,
            "Quarter": quarter,
            "Count": entry["metric"][0]["count"],
            "Amount": entry["metric"][0]["amount"]
        })

def parse_top_transaction(path, data):
    state, year, quarter = get_state_year_quarter(path)
    top_data = data.get("data")
    if not top_data:
        return
    for category in ["states", "districts", "pincodes"]:
        items = top_data.get(category)
        if not items:
            continue
        for entry in items:
            dataframes["top_transaction"].append({
                "State": state,
                "Category": category,
                "Region": entry.get("entityName"),
                "Year": year,
                "Quarter": quarter,
                "Count": entry["metric"]["count"],
                "Amount": entry["metric"]["amount"]
            })

def parse_top_user(path, data):
    state, year, quarter = get_state_year_quarter(path)
    top_data = data.get("data")
    if not top_data:
        return
    for category in ["states", "districts", "pincodes"]:
        items = top_data.get(category)
        if not items:
            continue
        for entry in items:
            dataframes["top_user"].append({
                "State": state,
                "Category": category,
                "Region": entry.get("name"),
                "Year": year,
                "Quarter": quarter,
                "RegisteredUsers": entry["registeredUsers"]
            })

def parse_top_insurance(path, data):
    state, year, quarter = get_state_year_quarter(path)
    top_data = data.get("data")
    if not top_data:
        return
    for category in ["states", "districts", "pincodes"]:
        items = top_data.get(category)
        if not items:
            continue
        for entry in items:
            dataframes["top_insurance"].append({
                "State": state,
                "Category": category,
                "Region": entry.get("entityName"),
                "Year": year,
                "Quarter": quarter,
                "Count": entry["metric"]["count"],
                "Amount": entry["metric"]["amount"]
            })

# Run all processors
process_json_files(os.path.join(BASE_DIR, "aggregated/transaction"), parse_aggregated_transaction)
process_json_files(os.path.join(BASE_DIR, "aggregated/user"), parse_aggregated_user)
process_json_files(os.path.join(BASE_DIR, "aggregated/insurance"), parse_aggregated_insurance)

process_json_files(os.path.join(BASE_DIR, "map/transaction"), parse_map_transaction)
process_json_files(os.path.join(BASE_DIR, "map/user"), parse_map_user)
process_json_files(os.path.join(BASE_DIR, "map/insurance"), parse_map_insurance)

process_json_files(os.path.join(BASE_DIR, "top/transaction"), parse_top_transaction)
process_json_files(os.path.join(BASE_DIR, "top/user"), parse_top_user)
process_json_files(os.path.join(BASE_DIR, "top/insurance"), parse_top_insurance)

# Save to CSVs
for name, records in dataframes.items():
    df = pd.DataFrame(records)
    df.to_csv(f"{name}.csv", index=False)
    print(f"✅ Exported: {name}.csv")


✅ Exported: aggregated_transaction.csv
✅ Exported: aggregated_user.csv
✅ Exported: aggregated_insurance.csv
✅ Exported: map_transaction.csv
✅ Exported: map_user.csv
✅ Exported: map_insurance.csv
✅ Exported: top_transaction.csv
✅ Exported: top_user.csv
✅ Exported: top_insurance.csv


In [1]:
import sqlite3
import pandas as pd
import os

# Create SQLite DB
conn = sqlite3.connect('phonepe_pulse.db')

# Loop through CSVs and insert into DB
csv_folder = '.'  # wherever your CSVs are saved
for file in os.listdir(csv_folder):
    if file.endswith('.csv'):
        df = pd.read_csv(file)
        table_name = file.replace('.csv', '')
        df.to_sql(table_name, conn, if_exists='replace', index=False)
        print(f"Inserted {table_name} into phonepe_pulse.db")

conn.close()


Inserted aggregated_insurance into phonepe_pulse.db
Inserted aggregated_transaction into phonepe_pulse.db
Inserted aggregated_user into phonepe_pulse.db
Inserted map_insurance into phonepe_pulse.db
Inserted map_transaction into phonepe_pulse.db
Inserted map_user into phonepe_pulse.db
Inserted top_insurance into phonepe_pulse.db
Inserted top_transaction into phonepe_pulse.db
Inserted top_user into phonepe_pulse.db
