In [39]:
import json
import os
import mysql.connector
import pandas as pd
from datetime import datetime

# --- Database Connection Configuration ---
DB_CONFIG = {
    'host': 'localhost',
    'user': 'root', 
    'password': 'sandeep', 
    'database': 'phonepe_pulse_db'
}

# --- Path to my cloned 'pulse' repository data ---
# This path points to the 'data' folder inside my cloned 'pulse' repository.
PULSE_DATA_BASE_PATH = r'C:\Users\hanum\OneDrive\Desktop\Labmentix\week6\data'

def connect_db():
    """Establishes a database connection."""
    try:
        connection = mysql.connector.connect(**DB_CONFIG)
        if connection.is_connected():
            print("Successfully connected to MySQL database!")
        return connection
    except mysql.connector.Error as err:
        print(f"Error connecting to MySQL: {err}")
        return None

In [14]:
def load_aggregated_transaction_data():
    """Loads aggregated transaction data into the Aggregated_transaction table."""
    connection = connect_db()
    if not connection: return
    cursor = connection.cursor()
    success_count = 0
    error_count = 0

    data_path = os.path.join(PULSE_DATA_BASE_PATH, 'aggregated', 'transaction', 'country', 'india', 'state')

    for state_name in os.listdir(data_path):
        state_folder_path = os.path.join(data_path, state_name)
        if os.path.isdir(state_folder_path):
            state_name_clean = state_name.replace('.json', '')
            for year_folder in os.listdir(state_folder_path):
                year_path = os.path.join(state_folder_path, year_folder)
                if os.path.isdir(year_path):
                    year = int(year_folder)
                    for quarter_file in os.listdir(year_path):
                        if quarter_file.endswith('.json'):
                            quarter_path = os.path.join(year_path, quarter_file)
                            quarter = int(quarter_file.replace('.json', ''))
                            try:
                                with open(quarter_path, 'r') as f:
                                    data = json.load(f)
                                for item in data['data']['transactionData']:
                                    transaction_type = item['name']
                                    count = item['paymentInstruments'][0]['count']
                                    amount = item['paymentInstruments'][0]['amount']
                                    insert_query = """
                                    INSERT INTO Aggregated_transaction
                                    (state, year, quarter, transaction_type, transaction_count, transaction_amount)
                                    VALUES (%s, %s, %s, %s, %s, %s)
                                    ON DUPLICATE KEY UPDATE
                                        transaction_count = VALUES(transaction_count),
                                        transaction_amount = VALUES(transaction_amount);
                                    """
                                    cursor.execute(insert_query, (state_name_clean, year, quarter, transaction_type, count, amount))
                                    success_count += 1
                            except Exception as e:
                                print(f"Error processing {quarter_path}: {e}")
                                error_count += 1
                                connection.rollback()
                                continue
    connection.commit()
    cursor.close()
    connection.close()
    print(f"\nFinished loading Aggregated_transaction data. Loaded: {success_count}, Errors: {error_count}")

In [15]:
load_aggregated_transaction_data()

Successfully connected to MySQL database!

Finished loading Aggregated_transaction data. Loaded: 5034, Errors: 0


In [44]:
def load_aggregated_user_data():
    """Loads aggregated user data into the Aggregated_user table."""
    connection = connect_db()
    if not connection:
        return
    cursor = connection.cursor()
    success_count = 0
    error_count = 0

    # Base path for aggregated user data
    data_path = os.path.join(PULSE_DATA_BASE_PATH, 'aggregated', 'user', 'country', 'india', 'state')

    # Ensure the data path exists before proceeding
    if not os.path.exists(data_path):
        print(f"Error: Data path not found: {data_path}")
        connection.close()
        return

    for state_name in os.listdir(data_path):
        state_folder_path = os.path.join(data_path, state_name)
        if os.path.isdir(state_folder_path):
            # Clean state name (remove .json if present, though it shouldn't be for folders)
            state_name_clean = state_name.replace('.json', '')
            # Corrected: Iterate over contents of state_folder_path
            for year_folder in os.listdir(state_folder_path):
                year_path = os.path.join(state_folder_path, year_folder)
                if os.path.isdir(year_path):
                    try:
                        year = int(year_folder)
                    except ValueError:
                        print(f"Skipping non-integer year folder: {year_folder}")
                        continue

                    for quarter_file in os.listdir(year_path):
                        if quarter_file.endswith('.json'):
                            quarter_path = os.path.join(year_path, quarter_file)
                            try:
                                quarter = int(quarter_file.replace('.json', ''))
                            except ValueError:
                                print(f"Skipping non-integer quarter file: {quarter_file}")
                                continue

                            try:
                                with open(quarter_path, 'r') as f:
                                    data = json.load(f)

                                # Extract data points
                                registered_users = data['data']['aggregated']['registeredUsers']
                                # Use .get() for 'appOpens' as it might be optional or missing
                                app_opens = data['data']['aggregated'].get('appOpens')
                                # Use the responseTimestamp from the JSON as the record's timestamp
                                response_timestamp = data.get('responseTimestamp')

                                # Insert query for Aggregated_user table (MySQL compatible)
                                insert_query = """
                                INSERT INTO Aggregated_user
                                (state, year, quarter, registered_users, app_opens, timestamp)
                                VALUES (%s, %s, %s, %s, %s, %s)
                                ON DUPLICATE KEY UPDATE
                                    registered_users = VALUES(registered_users),
                                    app_opens = VALUES(app_opens),
                                    timestamp = VALUES(timestamp);
                                """
                                # For ON DUPLICATE KEY UPDATE to work, you need a UNIQUE constraint
                                # or a PRIMARY KEY on (state, year, quarter) in your table definition.
                                # Make sure your Aggregated_user table has this.

                                cursor.execute(insert_query, (state_name_clean, year, quarter,
                                                              registered_users, app_opens, response_timestamp))
                                success_count += 1
                            except json.JSONDecodeError as e:
                                print(f"Error decoding JSON from {quarter_path}: {e}")
                                error_count += 1
                            except KeyError as e:
                                print(f"Missing key in JSON from {quarter_path}: {e}")
                                error_count += 1
                            except Exception as e:
                                print(f"Error processing {quarter_path}: {e}")
                                error_count += 1
                                connection.rollback() # Rollback on error to maintain data integrity
                                continue
    connection.commit() # Commit all successful inserts
    cursor.close()
    connection.close()
    print(f"\nFinished loading Aggregated_user data. Loaded: {success_count}, Errors: {error_count}")

In [45]:
load_aggregated_user_data()

Successfully connected to MySQL database!

Finished loading Aggregated_user data. Loaded: 1008, Errors: 0


In [50]:
def load_users_by_device_data():
    """Loads users by device data into the users_by_device table."""
    connection = connect_db()
    if not connection:
        return
    cursor = connection.cursor()
    success_count = 0
    error_count = 0

    # Base path for aggregated user data (same as for aggregated_user_data)
    data_path = os.path.join(PULSE_DATA_BASE_PATH, 'aggregated', 'user', 'country', 'india', 'state')

    # Ensure the data path exists before proceeding
    if not os.path.exists(data_path):
        print(f"Error: Data path not found: {data_path}")
        connection.close()
        return

    for state_name in os.listdir(data_path):
        state_folder_path = os.path.join(data_path, state_name)
        if os.path.isdir(state_folder_path):
            state_name_clean = state_name.replace('.json', '')
            # Corrected: Iterate over contents of state_folder_path
            for year_folder in os.listdir(state_folder_path):
                year_path = os.path.join(state_folder_path, year_folder)
                if os.path.isdir(year_path):
                    try:
                        year = int(year_folder)
                    except ValueError:
                        print(f"Skipping non-integer year folder: {year_folder}")
                        continue

                    for quarter_file in os.listdir(year_path):
                        if quarter_file.endswith('.json'):
                            quarter_path = os.path.join(year_path, quarter_file)
                            try:
                                quarter = int(quarter_file.replace('.json', ''))
                            except ValueError:
                                print(f"Skipping non-integer quarter file: {quarter_file}")
                                continue

                            try:
                                with open(quarter_path, 'r') as f:
                                    data = json.load(f)

                                # Extract usersByDevice array
                                # Ensure data['data'] is not None before calling .get() on it
                                # And ensure usersByDevice_list is an empty list if its value is None
                                users_by_device_list = []
                                if 'data' in data and data['data'] is not None:
                                    users_by_device_list = data['data'].get('usersByDevice')
                                    if users_by_device_list is None:
                                        users_by_device_list = [] # Explicitly set to empty list if null

                                response_timestamp = data.get('responseTimestamp')

                                for device_data in users_by_device_list: # This loop expects an iterable
                                    brand_name = device_data.get('brand')
                                    count = device_data.get('count')
                                    percentage = device_data.get('percentage')

                                    if brand_name is None or count is None or percentage is None:
                                        print(f"Skipping incomplete device data in {quarter_path}: {device_data}")
                                        continue

                                    # Insert query for users_by_device table (MySQL compatible)
                                    insert_query = """
                                    INSERT INTO users_by_device
                                    (state, year, quarter, brand_name, count, percentage, timestamp)
                                    VALUES (%s, %s, %s, %s, %s, %s, %s)
                                    ON DUPLICATE KEY UPDATE
                                        count = VALUES(count),
                                        percentage = VALUES(percentage),
                                        timestamp = VALUES(timestamp);
                                    """
                                    # For ON DUPLICATE KEY UPDATE to work, you need a UNIQUE constraint
                                    # or a PRIMARY KEY on (state, year, quarter, brand_name) in your table definition.

                                    cursor.execute(insert_query, (state_name_clean, year, quarter,
                                                                  brand_name, count, percentage, response_timestamp))
                                    success_count += 1
                            except json.JSONDecodeError as e:
                                print(f"Error decoding JSON from {quarter_path}: {e}")
                                error_count += 1
                            except KeyError as e:
                                print(f"Missing key in JSON from {quarter_path}: {e}")
                                error_count += 1
                            except Exception as e:
                                print(f"Error processing {quarter_path}: {e}")
                                error_count += 1
                                connection.rollback()
                                continue
    connection.commit()
    cursor.close()
    connection.close()
    print(f"\nFinished loading users_by_device data. Loaded: {success_count}, Errors: {error_count}")


In [51]:
load_users_by_device_data()

Successfully connected to MySQL database!

Finished loading users_by_device data. Loaded: 6732, Errors: 0


In [None]:
def load_aggregated_insurance_data():
    """Loads aggregated insurance data into the Aggregated_insurance table."""
    connection = connect_db()
    if not connection: return
    cursor = connection.cursor()
    success_count = 0
    error_count = 0

    data_path = os.path.join(PULSE_DATA_BASE_PATH, 'aggregated', 'insurance', 'country', 'india', 'state')

    for state_name_folder in os.listdir(data_path):
        state_folder_path = os.path.join(data_path, state_name_folder)
        if os.path.isdir(state_folder_path):
            state_name_clean = state_name_folder.replace('.json', '')
            for year_folder in os.listdir(state_folder_path):
                year_path = os.path.join(state_folder_path, year_folder)
                if os.path.isdir(year_path):
                    year = int(year_folder)
                    for quarter_file in os.listdir(year_path):
                        if quarter_file.endswith('.json'):
                            file_full_path = os.path.join(year_path, quarter_file)
                            quarter = int(quarter_file.replace('.json', ''))
                            try:
                                with open(file_full_path, 'r') as f:
                                    data = json.load(f)
                                for item in data['data']['transactionData']: # Structure is similar to aggregated_transaction
                                    insurance_type = item['name']
                                    premium_count = item['paymentInstruments'][0]['count']
                                    premium_amount = item['paymentInstruments'][0]['amount']

                                    insert_query = """
                                    INSERT INTO Aggregated_insurance
                                    (state, year, quarter, insurance_type, premium_count, premium_amount)
                                    VALUES (%s, %s, %s, %s, %s, %s)
                                    ON DUPLICATE KEY UPDATE
                                        premium_count = VALUES(premium_count),
                                        premium_amount = VALUES(premium_amount);
                                    """
                                    cursor.execute(insert_query, (state_name_clean, year, quarter, insurance_type, premium_count, premium_amount))
                                    success_count += 1
                            except Exception as e:
                                print(f"Error processing {file_full_path}: {e}")
                                error_count += 1
                                connection.rollback()
                                continue
    connection.commit()
    cursor.close()
    connection.close()
    print(f"\nFinished loading Aggregated_insurance data. Loaded: {success_count}, Errors: {error_count}")

In [None]:
load_aggregated_insurance_data()

Successfully connected to MySQL database!

Finished loading Aggregated_insurance data. Loaded: 682, Errors: 0


In [None]:
def load_map_transaction_data():
    """Loads map transaction data into the Map_transaction table."""
    connection = connect_db()
    if not connection: return
    cursor = connection.cursor()
    success_count = 0
    error_count = 0

    data_path = os.path.join(PULSE_DATA_BASE_PATH, 'map', 'transaction', 'hover', 'country', 'india', 'state')

    for state_name in os.listdir(data_path):
        state_folder_path = os.path.join(data_path, state_name)
        if os.path.isdir(state_folder_path):
            state_name_clean = state_name.replace('.json', '')
            for year_folder in os.listdir(state_folder_path):
                year_path = os.path.join(state_folder_path, year_folder)
                if os.path.isdir(year_path):
                    year = int(year_folder)
                    for quarter_file in os.listdir(year_path):
                        if quarter_file.endswith('.json'):
                            quarter_path = os.path.join(year_path, quarter_file)
                            quarter = int(quarter_file.replace('.json', ''))
                            try:
                                with open(quarter_path, 'r') as f:
                                    data = json.load(f)
                                for district_data in data['data']['hoverDataList']:
                                    district_name = district_data['name']
                                    # Assuming there's only one paymentInstrument for total count/amount for the district
                                    transaction_count = district_data['metric'][0]['count']
                                    transaction_amount = district_data['metric'][0]['amount']

                                    insert_query = """
                                    INSERT INTO Map_transaction
                                    (state, year, quarter, district, transaction_count, transaction_amount)
                                    VALUES (%s, %s, %s, %s, %s, %s)
                                    ON DUPLICATE KEY UPDATE
                                        transaction_count = VALUES(transaction_count),
                                        transaction_amount = VALUES(transaction_amount);
                                    """
                                    cursor.execute(insert_query, (state_name_clean, year, quarter, district_name, transaction_count, transaction_amount))
                                    success_count += 1
                            except Exception as e:
                                print(f"Error processing {quarter_path}: {e}")
                                error_count += 1
                                connection.rollback()
                                continue
    connection.commit()
    cursor.close()
    connection.close()
    print(f"\nFinished loading Map_transaction data. Loaded: {success_count}, Errors: {error_count}")


In [None]:
load_map_transaction_data()

Successfully connected to MySQL database!

Finished loading Map_transaction data. Loaded: 20604, Errors: 0


In [None]:
def load_map_user_data():
    """Loads map user data into the Map_user table."""
    connection = connect_db()
    if not connection: return
    cursor = connection.cursor()
    success_count = 0
    error_count = 0

    data_path = os.path.join(PULSE_DATA_BASE_PATH, 'map', 'user', 'hover', 'country', 'india', 'state')

    for state_name in os.listdir(data_path):
        state_folder_path = os.path.join(data_path, state_name)
        if os.path.isdir(state_folder_path):
            state_name_clean = state_name.replace('.json', '')
            for year_folder in os.listdir(state_folder_path):
                year_path = os.path.join(state_folder_path, year_folder)
                if os.path.isdir(year_path):
                    year = int(year_folder)
                    for quarter_file in os.listdir(year_path):
                        if quarter_file.endswith('.json'):
                            quarter_path = os.path.join(year_path, quarter_file)
                            quarter = int(quarter_file.replace('.json', ''))
                            try:
                                with open(quarter_path, 'r') as f:
                                    data = json.load(f)
                                # *** CHANGE STARTS HERE ***
                                # Iterate through items in the 'hoverData' dictionary
                                for district_name, user_data in data['data']['hoverData'].items():
                                    registered_users = user_data['registeredUsers']
                                    app_opens = user_data.get('appOpens') # Use .get() for appOpens as it might be optional
                                # *** CHANGE ENDS HERE ***

                                    insert_query = """
                                    INSERT INTO Map_user
                                    (state, year, quarter, district, registered_users, app_opens)
                                    VALUES (%s, %s, %s, %s, %s, %s)
                                    ON DUPLICATE KEY UPDATE
                                        registered_users = VALUES(registered_users),
                                        app_opens = VALUES(app_opens);
                                    """
                                    cursor.execute(insert_query, (state_name_clean, year, quarter, district_name, registered_users, app_opens))
                                    success_count += 1
                            except Exception as e:
                                print(f"Error processing {quarter_path}: {e}")
                                error_count += 1
                                connection.rollback()
                                continue
    connection.commit()
    cursor.close()
    connection.close()
    print(f"\nFinished loading Map_user data. Loaded: {success_count}, Errors: {error_count}")

In [None]:
load_map_user_data()

Successfully connected to MySQL database!

Finished loading Map_user data. Loaded: 20608, Errors: 0


In [None]:
def load_map_insurance_data():
    """Loads map insurance data into the Map_insurance table."""
    connection = connect_db()
    if not connection: return
    cursor = connection.cursor()
    success_count = 0
    error_count = 0

    data_path = os.path.join(PULSE_DATA_BASE_PATH, 'map', 'insurance', 'hover', 'country', 'india', 'state')

    for state_name in os.listdir(data_path):
        state_folder_path = os.path.join(data_path, state_name)
        if os.path.isdir(state_folder_path):
            state_name_clean = state_name.replace('.json', '')
            for year_folder in os.listdir(state_folder_path):
                year_path = os.path.join(state_folder_path, year_folder)
                if os.path.isdir(year_path):
                    year = int(year_folder)
                    for quarter_file in os.listdir(year_path):
                        if quarter_file.endswith('.json'):
                            quarter_path = os.path.join(year_path, quarter_file)
                            quarter = int(quarter_file.replace('.json', ''))
                            try:
                                with open(quarter_path, 'r') as f:
                                    data = json.load(f)
                                for district_data in data['data']['hoverDataList']:
                                    district_name = district_data['name']
                                    # Assuming there's only one paymentInstrument for total count/amount for the district
                                    premium_count = district_data['metric'][0]['count']
                                    premium_amount = district_data['metric'][0]['amount']

                                    insert_query = """
                                    INSERT INTO Map_insurance
                                    (state, year, quarter, district, premium_count, premium_amount)
                                    VALUES (%s, %s, %s, %s, %s, %s)
                                    ON DUPLICATE KEY UPDATE
                                        premium_count = VALUES(premium_count),
                                        premium_amount = VALUES(premium_amount);
                                    """
                                    cursor.execute(insert_query, (state_name_clean, year, quarter, district_name, premium_count, premium_amount))
                                    success_count += 1
                            except Exception as e:
                                print(f"Error processing {quarter_path}: {e}")
                                error_count += 1
                                connection.rollback()
                                continue
    connection.commit()
    cursor.close()
    connection.close()
    print(f"\nFinished loading Map_insurance data. Loaded: {success_count}, Errors: {error_count}")

In [None]:
load_map_insurance_data()

Successfully connected to MySQL database!

Finished loading Map_insurance data. Loaded: 13876, Errors: 0


In [62]:
def load_top_transaction_data():
    """Loads top transaction data (districts and pincodes) into respective tables."""
    connection = connect_db()
    if not connection:
        return
    cursor = connection.cursor()
    districts_success_count = 0
    districts_error_count = 0
    pincodes_success_count = 0
    pincodes_error_count = 0

    # Base path for top transaction data (adjust if your path is different)
    # Assuming path like: PULSE_DATA_BASE_PATH/top/insurance/country/india/state/{state}/{year}/{quarter}.json
    data_path = os.path.join(PULSE_DATA_BASE_PATH, 'top', 'transaction', 'country', 'india', 'state')

    # Ensure the data path exists before proceeding
    if not os.path.exists(data_path):
        print(f"Error: Data path not found: {data_path}")
        connection.close()
        return

    for state_name in os.listdir(data_path):
        state_folder_path = os.path.join(data_path, state_name)
        if os.path.isdir(state_folder_path):
            state_name_clean = state_name.replace('.json', '')
            for year_folder in os.listdir(state_folder_path):
                year_path = os.path.join(state_folder_path, year_folder)
                if os.path.isdir(year_path):
                    try:
                        year = int(year_folder)
                    except ValueError:
                        print(f"Skipping non-integer year folder: {year_folder}")
                        continue

                    for quarter_file in os.listdir(year_path):
                        if quarter_file.endswith('.json'):
                            quarter_path = os.path.join(year_path, quarter_file)
                            try:
                                quarter = int(quarter_file.replace('.json', ''))
                            except ValueError:
                                print(f"Skipping non-integer quarter file: {quarter_file}")
                                continue

                            try:
                                with open(quarter_path, 'r') as f:
                                    data = json.load(f)

                                response_timestamp = data.get('responseTimestamp')

                                # --- Process Districts Data ---
                                districts_list = data['data'].get('districts')
                                if districts_list is None: # Handle case where 'districts' is null
                                    districts_list = []

                                for district_data in districts_list:
                                    entity_name = district_data.get('entityName')
                                    metric = district_data.get('metric')
                                    if entity_name and metric:
                                        metric_type = metric.get('type')
                                        count = metric.get('count')
                                        amount = metric.get('amount')

                                        if all(v is not None for v in [metric_type, count, amount]):
                                            insert_query_districts = """
                                            INSERT INTO top_transaction_districts_data
                                            (state, year, quarter, district_name, metric_type, count, amount, timestamp)
                                            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                                            ON DUPLICATE KEY UPDATE
                                                metric_type = VALUES(metric_type),
                                                count = VALUES(count),
                                                amount = VALUES(amount),
                                                timestamp = VALUES(timestamp);
                                            """
                                            try:
                                                cursor.execute(insert_query_districts, (state_name_clean, year, quarter,
                                                                                        entity_name, metric_type, count, amount,
                                                                                        response_timestamp))
                                                districts_success_count += 1
                                            except Exception as e:
                                                print(f"Error inserting district data from {quarter_path} ({entity_name}): {e}")
                                                districts_error_count += 1
                                                connection.rollback()
                                        else:
                                            print(f"Skipping incomplete district metric data in {quarter_path}: {district_data}")
                                    else:
                                        print(f"Skipping incomplete district data in {quarter_path}: {district_data}")

                                # --- Process Pincodes Data ---
                                pincodes_list = data['data'].get('pincodes')
                                if pincodes_list is None: # Handle case where 'pincodes' is null
                                    pincodes_list = []

                                for pincode_data in pincodes_list:
                                    entity_name = pincode_data.get('entityName')
                                    metric = pincode_data.get('metric')
                                    if entity_name and metric:
                                        metric_type = metric.get('type')
                                        count = metric.get('count')
                                        amount = metric.get('amount')

                                        if all(v is not None for v in [metric_type, count, amount]):
                                            insert_query_pincodes = """
                                            INSERT INTO top_transaction_pincodes_data
                                            (state, year, quarter, pincode, metric_type, count, amount, timestamp)
                                            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                                            ON DUPLICATE KEY UPDATE
                                                metric_type = VALUES(metric_type),
                                                count = VALUES(count),
                                                amount = VALUES(amount),
                                                timestamp = VALUES(timestamp);
                                            """
                                            try:
                                                cursor.execute(insert_query_pincodes, (state_name_clean, year, quarter,
                                                                                       entity_name, metric_type, count, amount,
                                                                                       response_timestamp))
                                                pincodes_success_count += 1
                                            except Exception as e:
                                                print(f"Error inserting pincode data from {quarter_path} ({entity_name}): {e}")
                                                pincodes_error_count += 1
                                                connection.rollback()
                                        else:
                                            print(f"Skipping incomplete pincode metric data in {quarter_path}: {pincode_data}")
                                    else:
                                        print(f"Skipping incomplete pincode data in {quarter_path}: {pincode_data}")

                            except json.JSONDecodeError as e:
                                print(f"Error decoding JSON from {quarter_path}: {e}")
                                districts_error_count += 1 # Count as error for file
                                pincodes_error_count += 1 # Count as error for file
                            except KeyError as e:
                                print(f"Missing key in JSON from {quarter_path}: {e}")
                                districts_error_count += 1
                                pincodes_error_count += 1
                            except Exception as e:
                                print(f"Error processing {quarter_path}: {e}")
                                districts_error_count += 1
                                pincodes_error_count += 1
                                connection.rollback()
                                continue
    connection.commit()
    cursor.close()
    connection.close()
    print(f"\nFinished loading Top Insurance data.")
    print(f"  Districts: Loaded: {districts_success_count}, Errors: {districts_error_count}")
    print(f"  Pincodes: Loaded: {pincodes_success_count}, Errors: {pincodes_error_count}")

In [63]:
load_top_transaction_data()

Successfully connected to MySQL database!
Skipping incomplete pincode data in C:\Users\hanum\OneDrive\Desktop\Labmentix\week6\data\top\transaction\country\india\state\ladakh\2019\4.json: {'entityName': None, 'metric': {'type': 'TOTAL', 'count': 2014, 'amount': 10098656.16799601}}
Skipping incomplete pincode data in C:\Users\hanum\OneDrive\Desktop\Labmentix\week6\data\top\transaction\country\india\state\ladakh\2020\4.json: {'entityName': None, 'metric': {'type': 'TOTAL', 'count': 13717, 'amount': 36711603.92132313}}

Finished loading Top Insurance data.
  Districts: Loaded: 8296, Errors: 0
  Pincodes: Loaded: 9997, Errors: 0


In [70]:
def load_top_user_data():
    """Loads top user data (districts and pincodes) into respective tables."""
    connection = connect_db()
    if not connection:
        return
    cursor = connection.cursor()
    districts_success_count = 0
    districts_error_count = 0
    pincodes_success_count = 0
    pincodes_error_count = 0

    # Base path for top user data (adjust if your path is different)
    # Assuming path like: PULSE_DATA_BASE_PATH/top/user/country/india/state/{state}/{year}/{quarter}.json
    data_path = os.path.join(PULSE_DATA_BASE_PATH, 'top', 'user', 'country', 'india', 'state')

    # Ensure the data path exists before proceeding
    if not os.path.exists(data_path):
        print(f"Error: Data path not found: {data_path}")
        connection.close()
        return

    for state_name in os.listdir(data_path):
        state_folder_path = os.path.join(data_path, state_name)
        if os.path.isdir(state_folder_path):
            state_name_clean = state_name.replace('.json', '')
            for year_folder in os.listdir(state_folder_path):
                year_path = os.path.join(state_folder_path, year_folder)
                if os.path.isdir(year_path):
                    try:
                        year = int(year_folder)
                    except ValueError:
                        print(f"Skipping non-integer year folder: {year_folder}")
                        continue

                    for quarter_file in os.listdir(year_path):
                        if quarter_file.endswith('.json'):
                            quarter_path = os.path.join(year_path, quarter_file)
                            try:
                                quarter = int(quarter_file.replace('.json', ''))
                            except ValueError:
                                print(f"Skipping non-integer quarter file: {quarter_file}")
                                continue

                            try:
                                with open(quarter_path, 'r') as f:
                                    data = json.load(f)

                                response_timestamp = data.get('responseTimestamp')

                                # --- Process Districts Data ---
                                districts_list = data['data'].get('districts')
                                if districts_list is None: # Handle case where 'districts' is null
                                    districts_list = []

                                for district_data in districts_list:
                                    entity_name = district_data.get('name')
                                    registered_users = district_data.get('registeredUsers')

                                    # More explicit checks and error messages
                                    if entity_name is None:
                                        print(f"Skipping district data from {quarter_path}: 'name' is missing or None in {district_data}")
                                        districts_error_count += 1
                                        continue
                                    if registered_users is None:
                                        print(f"Skipping district data from {quarter_path}: 'registeredUsers' is missing or None in {district_data}")
                                        districts_error_count += 1
                                        continue

                                    insert_query_districts = """
                                    INSERT INTO top_user_districts_data
                                    (state, year, quarter, district_name, registered_users, timestamp)
                                    VALUES (%s, %s, %s, %s, %s, %s)
                                    ON DUPLICATE KEY UPDATE
                                        registered_users = VALUES(registered_users),
                                        timestamp = VALUES(timestamp);
                                    """
                                    try:
                                        cursor.execute(insert_query_districts, (state_name_clean, year, quarter,
                                                                                entity_name, registered_users,
                                                                                response_timestamp))
                                        districts_success_count += 1
                                    except Exception as e:
                                        print(f"Error inserting district data from {quarter_path} ({entity_name}): {e}")
                                        districts_error_count += 1
                                        connection.rollback()

                                # --- Process Pincodes Data ---
                                pincodes_list = data['data'].get('pincodes')
                                if pincodes_list is None: # Handle case where 'pincodes' is null
                                    pincodes_list = []

                                for pincode_data in pincodes_list:
                                    entity_name = pincode_data.get('name')
                                    registered_users = pincode_data.get('registeredUsers')

                                    # More explicit checks and error messages
                                    if entity_name is None:
                                        print(f"Skipping pincode data from {quarter_path}: 'name' is missing or None in {pincode_data}")
                                        pincodes_error_count += 1
                                        continue
                                    if registered_users is None:
                                        print(f"Skipping pincode data from {quarter_path}: 'registeredUsers' is missing or None in {pincode_data}")
                                        pincodes_error_count += 1
                                        continue

                                    insert_query_pincodes = """
                                    INSERT INTO top_user_pincodes_data
                                    (state, year, quarter, pincode, registered_users, timestamp)
                                    VALUES (%s, %s, %s, %s, %s, %s)
                                    ON DUPLICATE KEY UPDATE
                                        registered_users = VALUES(registered_users),
                                        timestamp = VALUES(timestamp);
                                    """
                                    try:
                                        cursor.execute(insert_query_pincodes, (state_name_clean, year, quarter,
                                                                               entity_name, registered_users,
                                                                               response_timestamp))
                                        pincodes_success_count += 1
                                    except Exception as e:
                                        print(f"Error inserting pincode data from {quarter_path} ({entity_name}): {e}")
                                        pincodes_error_count += 1
                                        connection.rollback()

                            except json.JSONDecodeError as e:
                                print(f"Error decoding JSON from {quarter_path}: {e}")
                                districts_error_count += 1 # Count as error for file
                                pincodes_error_count += 1 # Count as error for file
                            except KeyError as e:
                                print(f"Missing expected key in JSON from {quarter_path}: {e}")
                                districts_error_count += 1
                                pincodes_error_count += 1
                            except Exception as e:
                                print(f"Error processing {quarter_path}: {e}")
                                error_count += 1 # General error count for the file
                                connection.rollback()
                                continue
    connection.commit()
    cursor.close()
    connection.close()
    print(f"\nFinished loading Top User data.")
    print(f"  Districts: Loaded: {districts_success_count}, Errors: {districts_error_count}")
    print(f"  Pincodes: Loaded: {pincodes_success_count}, Errors: {pincodes_error_count}")

In [71]:
load_top_user_data()

Successfully connected to MySQL database!

Finished loading Top User data.
  Districts: Loaded: 8296, Errors: 0
  Pincodes: Loaded: 10000, Errors: 0


In [64]:
def load_top_insurance_data():
    """Loads top transaction data (districts and pincodes) into respective tables."""
    connection = connect_db()
    if not connection:
        return
    cursor = connection.cursor()
    districts_success_count = 0
    districts_error_count = 0
    pincodes_success_count = 0
    pincodes_error_count = 0

    # Base path for top transaction data (adjust if your path is different)
    data_path = os.path.join(PULSE_DATA_BASE_PATH, 'top', 'insurance', 'country', 'india', 'state')

    # Ensure the data path exists before proceeding
    if not os.path.exists(data_path):
        print(f"Error: Data path not found: {data_path}")
        connection.close()
        return

    for state_name in os.listdir(data_path):
        state_folder_path = os.path.join(data_path, state_name)
        if os.path.isdir(state_folder_path):
            state_name_clean = state_name.replace('.json', '')
            for year_folder in os.listdir(state_folder_path):
                year_path = os.path.join(state_folder_path, year_folder)
                if os.path.isdir(year_path):
                    try:
                        year = int(year_folder)
                    except ValueError:
                        print(f"Skipping non-integer year folder: {year_folder}")
                        continue

                    for quarter_file in os.listdir(year_path):
                        if quarter_file.endswith('.json'):
                            quarter_path = os.path.join(year_path, quarter_file)
                            try:
                                quarter = int(quarter_file.replace('.json', ''))
                            except ValueError:
                                print(f"Skipping non-integer quarter file: {quarter_file}")
                                continue

                            try:
                                with open(quarter_path, 'r') as f:
                                    data = json.load(f)

                                response_timestamp = data.get('responseTimestamp')

                                # --- Process Districts Data ---
                                districts_list = data['data'].get('districts')
                                if districts_list is None: # Handle case where 'districts' is null
                                    districts_list = []

                                for district_data in districts_list:
                                    entity_name = district_data.get('entityName')
                                    metric = district_data.get('metric')
                                    if entity_name and metric:
                                        metric_type = metric.get('type')
                                        count = metric.get('count')
                                        amount = metric.get('amount')

                                        if all(v is not None for v in [metric_type, count, amount]):
                                            insert_query_districts = """
                                            INSERT INTO top_insurance_districts_data
                                            (state, year, quarter, district_name, metric_type, count, amount, timestamp)
                                            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                                            ON DUPLICATE KEY UPDATE
                                                metric_type = VALUES(metric_type),
                                                count = VALUES(count),
                                                amount = VALUES(amount),
                                                timestamp = VALUES(timestamp);
                                            """
                                            try:
                                                cursor.execute(insert_query_districts, (state_name_clean, year, quarter,
                                                                                        entity_name, metric_type, count, amount,
                                                                                        response_timestamp))
                                                districts_success_count += 1
                                            except Exception as e:
                                                print(f"Error inserting district data from {quarter_path} ({entity_name}): {e}")
                                                districts_error_count += 1
                                                connection.rollback()
                                        else:
                                            print(f"Skipping incomplete district metric data in {quarter_path}: {district_data}")
                                    else:
                                        print(f"Skipping incomplete district data in {quarter_path}: {district_data}")

                                # --- Process Pincodes Data ---
                                pincodes_list = data['data'].get('pincodes')
                                if pincodes_list is None: # Handle case where 'pincodes' is null
                                    pincodes_list = []

                                for pincode_data in pincodes_list:
                                    entity_name = pincode_data.get('entityName')
                                    metric = pincode_data.get('metric')
                                    if entity_name and metric:
                                        metric_type = metric.get('type')
                                        count = metric.get('count')
                                        amount = metric.get('amount')

                                        if all(v is not None for v in [metric_type, count, amount]):
                                            insert_query_pincodes = """
                                            INSERT INTO top_insurance_pincodes_data
                                            (state, year, quarter, pincode, metric_type, count, amount, timestamp)
                                            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                                            ON DUPLICATE KEY UPDATE
                                                metric_type = VALUES(metric_type),
                                                count = VALUES(count),
                                                amount = VALUES(amount),
                                                timestamp = VALUES(timestamp);
                                            """
                                            try:
                                                cursor.execute(insert_query_pincodes, (state_name_clean, year, quarter,
                                                                                       entity_name, metric_type, count, amount,
                                                                                       response_timestamp))
                                                pincodes_success_count += 1
                                            except Exception as e:
                                                print(f"Error inserting pincode data from {quarter_path} ({entity_name}): {e}")
                                                pincodes_error_count += 1
                                                connection.rollback()
                                        else:
                                            print(f"Skipping incomplete pincode metric data in {quarter_path}: {pincode_data}")
                                    else:
                                        print(f"Skipping incomplete pincode data in {quarter_path}: {pincode_data}")

                            except json.JSONDecodeError as e:
                                print(f"Error decoding JSON from {quarter_path}: {e}")
                                districts_error_count += 1 # Count as error for file
                                pincodes_error_count += 1 # Count as error for file
                            except KeyError as e:
                                print(f"Missing key in JSON from {quarter_path}: {e}")
                                districts_error_count += 1
                                pincodes_error_count += 1
                            except Exception as e:
                                print(f"Error processing {quarter_path}: {e}")
                                districts_error_count += 1
                                pincodes_error_count += 1
                                connection.rollback()
                                continue
    connection.commit()
    cursor.close()
    connection.close()
    print(f"\nFinished loading Top Transaction data.")
    print(f"  Districts: Loaded: {districts_success_count}, Errors: {districts_error_count}")
    print(f"  Pincodes: Loaded: {pincodes_success_count}, Errors: {pincodes_error_count}")


In [65]:
load_top_insurance_data()

Successfully connected to MySQL database!
Skipping incomplete pincode data in C:\Users\hanum\OneDrive\Desktop\Labmentix\week6\data\top\insurance\country\india\state\ladakh\2020\3.json: {'entityName': None, 'metric': {'type': 'TOTAL', 'count': 1, 'amount': 281.0}}
Skipping incomplete pincode data in C:\Users\hanum\OneDrive\Desktop\Labmentix\week6\data\top\insurance\country\india\state\ladakh\2020\4.json: {'entityName': None, 'metric': {'type': 'TOTAL', 'count': 1, 'amount': 658.0}}
Skipping incomplete pincode data in C:\Users\hanum\OneDrive\Desktop\Labmentix\week6\data\top\insurance\country\india\state\ladakh\2022\4.json: {'entityName': None, 'metric': {'type': 'TOTAL', 'count': 8, 'amount': 16020.0}}

Finished loading Top Transaction data.
  Districts: Loaded: 5608, Errors: 0
  Pincodes: Loaded: 6665, Errors: 0


In [52]:
def export_table_to_xlsx(table_name, output_directory="excel_exports"):
    
    connection = None
    try:
        connection = mysql.connector.connect(**DB_CONFIG)
        if connection.is_connected():
            print(f"Connected to MySQL to export '{table_name}'...")
            
            # Create the output directory if it doesn't exist
            if not os.path.exists(output_directory):
                os.makedirs(output_directory)
                print(f"Created directory: {output_directory}")

            # Construct the SQL query to select all data from the table
            query = f"SELECT * FROM {table_name};"
            
            cursor = connection.cursor(dictionary=True) # Fetch as dictionaries for pandas
            cursor.execute(query)
            results = cursor.fetchall()

            if results:
                df = pd.DataFrame(results)
                output_filepath = os.path.join(output_directory, f"{table_name}.xlsx")
                # index=False prevents writing the DataFrame index as a column
                # sheet_name sets the name of the first (and only) sheet in the Excel file
                df.to_excel(output_filepath, index=False, sheet_name=table_name) 
                print(f"Successfully exported data from '{table_name}' to '{output_filepath}'")
            else:
                print(f"No data found in table '{table_name}'. No Excel file created.")

    except mysql.connector.Error as err:
        print(f"Error exporting table '{table_name}': {err}")
    except Exception as e:
        print(f"An unexpected error occurred while exporting '{table_name}': {e}")
    finally:
        if connection and connection.is_connected():
            if 'cursor' in locals() and cursor is not None:
                cursor.close()
            connection.close()


In [73]:
tables_to_export = [
    "Aggregated_transaction",
    "Aggregated_user",
    "Aggregated_insurance",
    "Map_transaction",
    "Map_user",
    "Map_insurance",
    "top_insurance_districts_data",
    "top_insurance_pincodes_data",
    "top_transaction_districts_data",
    "top_transaction_pincodes_data",
    "top_user_districts_data",
    "top_user_pincodes_data",
    "users_by_device"
]

# --- Execute export for each table ---
if __name__ == "__main__":
    for table in tables_to_export:
        export_table_to_xlsx(table)
    print("\nExcel (.xlsx) export process completed for all specified tables.")

Connected to MySQL to export 'Aggregated_transaction'...
Successfully exported data from 'Aggregated_transaction' to 'excel_exports\Aggregated_transaction.xlsx'
Connected to MySQL to export 'Aggregated_user'...
Successfully exported data from 'Aggregated_user' to 'excel_exports\Aggregated_user.xlsx'
Connected to MySQL to export 'Aggregated_insurance'...
Successfully exported data from 'Aggregated_insurance' to 'excel_exports\Aggregated_insurance.xlsx'
Connected to MySQL to export 'Map_transaction'...
Successfully exported data from 'Map_transaction' to 'excel_exports\Map_transaction.xlsx'
Connected to MySQL to export 'Map_user'...
Successfully exported data from 'Map_user' to 'excel_exports\Map_user.xlsx'
Connected to MySQL to export 'Map_insurance'...
Successfully exported data from 'Map_insurance' to 'excel_exports\Map_insurance.xlsx'
Connected to MySQL to export 'top_insurance_districts_data'...
Successfully exported data from 'top_insurance_districts_data' to 'excel_exports\top_ins