# PhonePe Pulse Data Loader

This notebook demonstrates how to load PhonePe Pulse JSON data into a MariaDB database.
It covers cloning the PhonePe Pulse GitHub repository, reading various JSON file formats,
converting them into tabular form using Pandas, and inserting the data into appropriate
MariaDB tables with duplicate checking. The notebook is self-contained and can be run
from top to bottom. Feel free to modify the file paths or uncomment the dataset
sections you wish to process.

### DataBase Connectivity

In [None]:
from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker

db_user = 'root'
db_password = 'RajaSri%4007'    # URL-encoded @
db_host = '192.168.1.5'
db_port = '3306'
db_name = 'PhonePeV2'

class DBConnect:
    def __init__(self):
        self.engine = create_engine(
            f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}",
            echo=False,
        )
        # reflect once at start
        self.metadata = MetaData()
        self.metadata.reflect(bind=self.engine)
        self.Session = sessionmaker(bind=self.engine)

    def refresh_metadata(self):
        """If you created new tables in another session, call this before insert."""
        self.metadata.clear()
        self.metadata.reflect(bind=self.engine)

    def insert(self, table_name: str, data: dict):
        # ensure we see newly created tables
        if table_name not in self.metadata.tables:
            self.refresh_metadata()
        if table_name not in self.metadata.tables:
            raise RuntimeError(f"Table {table_name} not found in database.")
        table = self.metadata.tables[table_name]
        with self.Session() as session:
            # IMPORTANT: expand dict
            session.execute(table.insert().values(**data))
            session.commit()


### *** DON'T FORGET TO IMPORT DBTable.SQL INTO YOUR CURRENT DATABASE *** 

In [None]:
import os
import json
import pandas as pd
from git import Repo
from sqlalchemy import select, and_

# Repository configuration
REPO_URL = 'https://github.com/PhonePe/pulse.git'
REPO_PATH = 'GitFolder'

# Define paths for aggregated, map, and top datasets
# Aggregated datasets
agg_user_PATH  = os.path.join(REPO_PATH, 'data', 'aggregated', 'user',        'country', 'india')
agg_trans_PATH = os.path.join(REPO_PATH, 'data', 'aggregated', 'transaction', 'country', 'india')
agg_ins_PATH   = os.path.join(REPO_PATH, 'data', 'aggregated', 'insurance',   'country', 'india')

# Map datasets
map_user_PATH     = os.path.join(REPO_PATH, 'data', 'map', 'user',        'hover',  'country', 'india')
map_trans_PATH    = os.path.join(REPO_PATH, 'data', 'map', 'transaction', 'hover',  'country', 'india')
map_insHover_PATH = os.path.join(REPO_PATH, 'data', 'map', 'insurance', 'hover',  'country', 'india')
map_insCntry_PATH = os.path.join(REPO_PATH, 'data', 'map', 'insurance',           'country', 'india')

# Top datasets
top_user_PATH  = os.path.join(REPO_PATH, 'data', 'top', 'user',        'country', 'india')
top_trans_PATH = os.path.join(REPO_PATH, 'data', 'top', 'transaction', 'country', 'india')
top_ins_PATH   = os.path.join(REPO_PATH, 'data', 'top', 'insurance',   'country', 'india')


In [None]:
# Clone or update the PhonePe Pulse repository
if not os.path.exists(REPO_PATH):
    print("Cloning GitHub repository...")
    Repo.clone_from(REPO_URL, REPO_PATH)
else:
    try:
        repo = Repo(REPO_PATH)
        origin = repo.remotes.origin
        origin.fetch()
        repo.git.checkout('master')
        origin.pull('master')
        print("Repository updated.")
    except Exception as e:
        print("Repository update skipped due to error:", e)


### Helper Functions

The following functions assist with loading JSON files, normalizing values, checking for existing
rows in the database to avoid duplicates, and inserting rows into the database.

In [None]:

# Load a JSON file from disk
def load_json(file_path: str) -> dict:
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

# Convert Python values to DB-safe scalars (convert dict/list to JSON string)
def to_db_scalar(value):
    if value is None or (isinstance(value, float) and pd.isna(value)):
        return None
    if isinstance(value, (dict, list)):
        return json.dumps(value, ensure_ascii=False)
    return value

# Normalize year and quarter fields
def normalize_year_quarter(row: dict) -> dict:
    if 'year' in row and row['year'] is not None:
        try:
            row['year'] = int(row['year'])
        except Exception:
            pass
    if 'quarter' in row and row['quarter'] is not None:
        q_str = str(row['quarter']).upper().lstrip('Q')
        try:
            row['quarter'] = int(q_str)
        except Exception:
            pass
    return row

# Natural keys used to identify duplicates for each table
KEY_COLUMNS = {
    'agg_user':     ['state', 'year', 'quarter'],
    'agg_trans':    ['state', 'year', 'quarter', 'transaction_name', 'payment_type'],
    'agg_ins':      ['state', 'year', 'quarter', 'insurance_name', 'payment_type'],
    'map_user':     ['state', 'year', 'quarter', 'hover_state'],
    'map_trans':    ['state', 'year', 'quarter', 'location_name', 'metric_type'],
    'map_insHover': ['state', 'year', 'quarter', 'location_name', 'metric_type'],
    'map_insCntry': ['year', 'quarter', 'latitude', 'longitude'],
    'top_user':     ['year', 'quarter', 'state_name', 'district_name', 'pincode'],
    'top_trans':    ['year', 'quarter', 'state_entity', 'district_entity', 'pincode_entity'],
    'top_ins':      ['year', 'quarter', 'state_entity', 'district_entity', 'pincode_entity'],
}

# Mapping from JSON type to actual MariaDB table name
TABLE_NAME_MAP = {
    'agg_user':     'agg_user',
    'agg_trans':    'agg_trans',
    'agg_ins':      'agg_ins',
    'map_user':     'map_user',
    'map_trans':    'map_trans',
    'map_insHover': 'map_ins_hover',
    'map_insCntry': 'map_ins_cntry',
    'top_user':     'top_user',
    'top_trans':    'top_trans',
    'top_ins':      'top_ins',
}

# Check if a row already exists in the database (to avoid duplicates)
def row_exists(db: DBConnect, type_name: str, record: dict) -> bool:
    """Return True if a row with the same natural key already exists."""
    table_name = TABLE_NAME_MAP.get(type_name, type_name)
    if table_name not in db.metadata.tables:
        db.refresh_metadata()
    if table_name not in db.metadata.tables:
        return False
    table = db.metadata.tables[table_name]

    key_cols = KEY_COLUMNS.get(type_name, [])
    if not key_cols:
        return False  # no key → treat as not existing

    conds = []
    for col in key_cols:
        if col not in record:
            # Key value missing in this row → can't match anything; treat as not existing
            return False
        conds.append(table.c[col] == record[col])

    stmt = select(table.c[list(table.c.keys())[0]]).where(and_(*conds)).limit(1)

    with db.engine.connect() as conn:
        res = conn.execute(stmt).first()
        return res is not None

# Insert DataFrame rows into the database with duplicate skipping
def insert_to_db(type_name: str, df: pd.DataFrame):
    table_name = TABLE_NAME_MAP.get(type_name, type_name)
    db = DBConnect()
    for _, row in df.iterrows():
        record = row.where(pd.notnull(row), None).to_dict()
        record = normalize_year_quarter(record)
        record = {k: to_db_scalar(v) for k, v in record.items()}

        if row_exists(db, type_name, record):
            continue  # skip duplicates by natural key
        try:
            db.insert(table_name, record)
        except Exception as e:
            print(f"❌ DB Insert Error into {table_name}: {e}")
            print(f"Row data -> {record}")


### JSON Processing

The `JsonProcess` function below handles all ten JSON structures used in the PhonePe Pulse data.
It converts nested structures into flat Pandas DataFrames with consistent column names. If a JSON
file type contains a list of entries (e.g. multiple transactions or multiple state metrics),
the function returns multiple rows.

In [None]:

def JsonProcess(LoadedJson: dict, Type: str, state=None, year=None, quarter=None) -> pd.DataFrame:
    d = LoadedJson.get('data', {}) or {}
    success = LoadedJson.get('success')
    code = LoadedJson.get('code')
    response_ts = LoadedJson.get('responseTimestamp')

    # Aggregated Users
    if Type == 'agg_user':
        return pd.DataFrame([{
            'state': state, 'year': year, 'quarter': quarter,
            'success': success, 'code': code,
            'registered_users': (d.get('aggregated') or {}).get('registeredUsers'),
            'app_opens': (d.get('aggregated') or {}).get('appOpens'),
            'users_by_device': d.get('usersByDevice'),
            'response_ts': response_ts
        }])

    # Aggregated Transactions
    elif Type == 'agg_trans':
        rows = []
        for td in (d.get('transactionData') or []):
            pi = (td.get('paymentInstruments') or [{}])[0]
            rows.append({
                'state': state, 'year': year, 'quarter': quarter,
                'success': success, 'code': code,
                'from_ts': d.get('from'), 'to_ts': d.get('to'),
                'transaction_name': td.get('name'),
                'payment_type': pi.get('type'),
                'payment_count': pi.get('count'),
                'payment_amount': pi.get('amount'),
                'response_ts': response_ts
            })
        return pd.DataFrame(rows or [{}])

    # Aggregated Insurance
    elif Type == 'agg_ins':
        rows = []
        for td in (d.get('transactionData') or []):
            pi = (td.get('paymentInstruments') or [{}])[0]
            rows.append({
                'state': state, 'year': year, 'quarter': quarter,
                'success': success, 'code': code,
                'from_ts': d.get('from'), 'to_ts': d.get('to'),
                'insurance_name': td.get('name'),
                'payment_type': pi.get('type'),
                'payment_count': pi.get('count'),
                'payment_amount': pi.get('amount'),
                'response_ts': response_ts
            })
        return pd.DataFrame(rows or [{}])

    # Map Users
    elif Type == 'map_user':
        rows = []
        for st_name, info in (d.get('hoverData') or {}).items():
            rows.append({
                'state': state, 'year': year, 'quarter': quarter,
                'success': success, 'code': code,
                'hover_state': st_name,
                'registered_users': (info or {}).get('registeredUsers'),
                'app_opens': (info or {}).get('appOpens'),
                'response_ts': response_ts
            })
        return pd.DataFrame(rows or [{}])

    # Map Transactions
    elif Type == 'map_trans':
        rows = []
        for item in (d.get('hoverDataList') or []):
            for m in (item.get('metric') or []):
                rows.append({
                    'state': state, 'year': year, 'quarter': quarter,
                    'success': success, 'code': code,
                    'location_name': item.get('name'),
                    'metric_type': m.get('type'),
                    'metric_count': m.get('count'),
                    'metric_amount': m.get('amount'),
                    'response_ts': response_ts
                })
        return pd.DataFrame(rows or [{}])

    # Map Insurance Country
    elif Type == 'map_insCntry':
        rows = []
        meta = d.get('meta') or {}
        grid = (d.get('data') or {})
        cols = grid.get('columns') or []
        data_rows = grid.get('data') or []
        print(data_rows)
        for r in data_rows:
            row = dict(zip(cols, r))
            rows.append({
                'state': state, 'year': year, 'quarter': quarter,
                'success': success, 'code': code,
                'data_level': meta.get('dataLevel'),
                'grid_level': meta.get('gridLevel'),
                'percentiles': meta.get('percentiles'),
                'latitude': row.get('lat'),
                'longitude': row.get('lng'),
                'metric_value': row.get('metric'),
                'label': row.get('label'),
                'response_ts': response_ts
            })
        return pd.DataFrame(rows or [{}])

    # Map Insurance Hover
    elif Type == 'map_insHover':
        rows = []
        for item in (d.get('hoverDataList') or []):
            for m in (item.get('metric') or []):
                rows.append({
                    'state': state, 'year': year, 'quarter': quarter,
                    'success': success, 'code': code,
                    'location_name': item.get('name'),
                    'metric_type': m.get('type'),
                    'metric_count': m.get('count'),
                    'metric_amount': m.get('amount'),
                    'response_ts': response_ts
                })
        return pd.DataFrame(rows or [{}])

    # Top Users
    elif Type == 'top_user':
        states_list = d.get('states') or []
        districts = d.get('districts') or []
        pincodes = d.get('pincodes') or []
        max_len = max(len(states_list), len(districts), len(pincodes), 1)

        def get_or_none(lst, idx, key):
            return (lst[idx].get(key) if idx < len(lst) else None) if lst else None

        rows = []
        for i in range(max_len):
            rows.append({
                'state': state, 'year': year, 'quarter': quarter,
                'success': success, 'code': code,
                'state_name': get_or_none(states_list, i, 'name'),
                'state_registered_users': get_or_none(states_list, i, 'registeredUsers'),
                'district_name': get_or_none(districts, i, 'name'),
                'district_registered_users': get_or_none(districts, i, 'registeredUsers'),
                'pincode': get_or_none(pincodes, i, 'name'),
                'pincode_registered_users': get_or_none(pincodes, i, 'registeredUsers'),
                'response_ts': response_ts
            })
        return pd.DataFrame(rows)

    # Top Transactions
    elif Type == 'top_trans':
        states_list = d.get('states') or []
        districts = d.get('districts') or []
        pincodes = d.get('pincodes') or []
        max_len = max(len(states_list), len(districts), len(pincodes), 1)

        def g(lst, idx, *path):
            if idx >= len(lst):
                return None
            node = lst[idx]
            for p in path:
                node = node.get(p) if isinstance(node, dict) else None
                if node is None:
                    break
            return node

        rows = []
        for i in range(max_len):
            rows.append({
                'state': state, 'year': year, 'quarter': quarter,
                'success': success, 'code': code,
                'state_entity': g(states_list, i, 'entityName'),
                'state_metric_type': g(states_list, i, 'metric', 'type'),
                'state_metric_count': g(states_list, i, 'metric', 'count'),
                'state_metric_amount': g(states_list, i, 'metric', 'amount'),
                'district_entity': g(districts, i, 'entityName'),
                'district_metric_type': g(districts, i, 'metric', 'type'),
                'district_metric_count': g(districts, i, 'metric', 'count'),
                'district_metric_amount': g(districts, i, 'metric', 'amount'),
                'pincode_entity': g(pincodes, i, 'entityName'),
                'pincode_metric_type': g(pincodes, i, 'metric', 'type'),
                'pincode_metric_count': g(pincodes, i, 'metric', 'count'),
                'pincode_metric_amount': g(pincodes, i, 'metric', 'amount'),
                'response_ts': response_ts
            })
        return pd.DataFrame(rows)

    # Top Insurance
    elif Type == 'top_ins':
        states_list = d.get('states') or []
        districts = d.get('districts') or []
        pincodes = d.get('pincodes') or []
        max_len = max(len(states_list), len(districts), len(pincodes), 1)

        def g(lst, idx, *path):
            if idx >= len(lst):
                return None
            node = lst[idx]
            for p in path:
                node = node.get(p) if isinstance(node, dict) else None
                if node is None:
                    break
            return node

        rows = []
        for i in range(max_len):
            rows.append({
                'state': state, 'year': year, 'quarter': quarter,
                'success': success, 'code': code,
                'state_entity': g(states_list, i, 'entityName'),
                'state_metric_type': g(states_list, i, 'metric', 'type'),
                'state_metric_count': g(states_list, i, 'metric', 'count'),
                'state_metric_amount': g(states_list, i, 'metric', 'amount'),
                'district_entity': g(districts, i, 'entityName'),
                'district_metric_type': g(districts, i, 'metric', 'type'),
                'district_metric_count': g(districts, i, 'metric', 'count'),
                'district_metric_amount': g(districts, i, 'metric', 'amount'),
                'pincode_entity': g(pincodes, i, 'entityName'),
                'pincode_metric_type': g(pincodes, i, 'metric', 'type'),
                'pincode_metric_count': g(pincodes, i, 'metric', 'count'),
                'pincode_metric_amount': g(pincodes, i, 'metric', 'amount'),
                'response_ts': response_ts
            })
        return pd.DataFrame(rows)

    else:
        return pd.DataFrame([{'notice': f'Unknown Type: {Type}'}])


### Processing Folders

The `ProcessFolder` function walks through the dataset directories, handling both year-level and state-level
structures. It converts each JSON file into a DataFrame and inserts the rows into the corresponding MariaDB
table, skipping duplicates automatically.

In [None]:

def ProcessFolder(PATH: str, Type: str):
    for year in os.listdir(PATH):
        year_path = os.path.join(PATH, year)
        if not os.path.isdir(year_path):
            continue
        # Year-level directory
        if year.lower() != 'state':
            for file in os.listdir(year_path):
                if not file.endswith('.json'):
                    continue
                quarter = os.path.splitext(file)[0]
                file_path = os.path.join(year_path, file)
                data = load_json(file_path)
                df = JsonProcess(data, Type, year=year, quarter=quarter)
                insert_to_db(Type, df)
        else:
            # State-level directory
            for state in os.listdir(year_path):
                state_path = os.path.join(year_path, state)
                if not os.path.isdir(state_path):
                    continue
                print(f'Processing state: {state}')
                for styear in os.listdir(state_path):
                    year_folder = os.path.join(state_path, styear)
                    if not os.path.isdir(year_folder):
                        continue
                    for file in os.listdir(year_folder):
                        if not file.endswith('.json'):
                            continue
                        quarter = os.path.splitext(file)[0]
                        file_path = os.path.join(year_folder, file)
                        data = load_json(file_path)
                        df = JsonProcess(data, Type, state=state.lower(), year=styear, quarter=quarter)
                        insert_to_db(Type, df)


## Executing the Loader

Uncomment any of the calls below to process the corresponding dataset. Each call reads all
available JSON files under the specified path and loads them into MariaDB, skipping any duplicate rows.

In [None]:

# Example usage:
# ProcessFolder(agg_user_PATH,  'agg_user')
# ProcessFolder(agg_trans_PATH, 'agg_trans')
# ProcessFolder(agg_ins_PATH,   'agg_ins')

# ProcessFolder(map_user_PATH,     'map_user')
# ProcessFolder(map_trans_PATH,    'map_trans')
# ProcessFolder(map_insHover_PATH, 'map_insHover')
# ProcessFolder(map_insCntry_PATH, 'map_insCntry')

# ProcessFolder(top_user_PATH,  'top_user')
# ProcessFolder(top_trans_PATH, 'top_trans')
# ProcessFolder(top_ins_PATH,   'top_ins')
