# Punjab Data Analysis - DuckDB Approach

This notebook uses DuckDB to process large CSV files efficiently without loading everything into memory.

### Step 1: Import Libraries and Setup

In [None]:
# Install DuckDB if not already installed
# !pip install duckdb pandas pytz

In [1]:
import os
import duckdb
import pandas as pd
from datetime import datetime
import json
import pytz
import gc

# Configuration
folder_path_demand = '/home/admin1/Downloads/punjab-data-prod-analysis/8sept/amritsar/egbs_demand_v1/'
folder_path_details = '/home/admin1/Downloads/punjab-data-prod-analysis/8sept/amritsar/egbs_demanddetail_v1/'

print("DuckDB version:", duckdb.__version__)
print("Starting DuckDB processing...")

DuckDB version: 1.3.2
Starting DuckDB processing...


### Step 2: Initialize DuckDB Connection

In [2]:
# Initialize DuckDB with disk-based storage for large datasets
# This prevents OOM errors by using disk when needed
con = duckdb.connect('punjab_analysis_duckdb.duckdb')  # Use disk-based database

# Configure DuckDB for large dataset processing
# Note: temp_directory must be set before any operations that use temp space
try:
    con.execute("SET temp_directory='/tmp'")  # Use temp directory for spilling
except:
    print("Warning: Could not set temp directory (may already be in use)")

con.execute("SET memory_limit='2GB'")  # Limit memory usage
con.execute("SET threads=4")  # Reduce thread count to save memory
con.execute("SET preserve_insertion_order=false")  # Save memory

print("DuckDB connection established with optimized settings")
print("Using disk-based storage: punjab_analysis_duckdb.duckdb")

DuckDB connection established with optimized settings
Using disk-based storage: punjab_analysis_duckdb.duckdb


### Step 3: Load Property and Unit Data

In [12]:
# Load property data
print("Loading property data...")
con.execute("""
    CREATE OR REPLACE TABLE property AS 
    SELECT id, propertyid, tenantid, createdtime, additionaldetails, 
           ownershipcategory, status, usagecategory, propertytype
    FROM read_csv_auto('/home/admin1/Downloads/punjab-data-prod-analysis/8sept/amritsar/eg_pt_property.csv')
    WHERE status = 'ACTIVE'
""")

property_count = con.execute("SELECT COUNT(*) FROM property").fetchone()[0]
print(f"Loaded {property_count:,} active properties")

Loading property data...
Loaded 104,141 active properties


In [5]:
# Load unit data with proper NULL handling
print("Loading unit data with NULL handling...")
con.execute("""
    CREATE OR REPLACE TABLE unit AS 
    SELECT 
        propertyid, 
        occupancytype,
        CASE 
            WHEN builtuparea = 'NULL' THEN NULL
            ELSE TRY_CAST(builtuparea AS DOUBLE)
        END as builtuparea,
        CASE 
            WHEN plintharea = 'NULL' THEN NULL
            ELSE TRY_CAST(plintharea AS DOUBLE)
        END as plintharea
    FROM read_csv_auto(
        '/home/admin1/Downloads/punjab-data-prod-analysis/8sept/amritsar/eg_pt_unit.csv',
        ALL_VARCHAR=true  -- Read all columns as VARCHAR first
    )
""")

unit_count = con.execute("SELECT COUNT(*) FROM unit").fetchone()[0]
print(f"Loaded {unit_count:,} units")

Loading unit data with NULL handling...
Loaded 611,923 units


### Step 4: Load Demand Files (Multiple CSVs)

In [5]:
# Get list of demand files
import time
# con.execute("DROP TABLE demand")
demand_files = [os.path.join(folder_path_demand, f) 
                for f in os.listdir(folder_path_demand) if f.endswith('.csv')]

print(f"Found {len(demand_files)} demand files")
print("Processing demand files one by one to avoid memory issues...")

start = time.perf_counter()

if demand_files:
    # Process files incrementally
    for i, file in enumerate(demand_files, 1):
        print(f"Processing demand file {i}/{len(demand_files)}: {os.path.basename(file)}")
        
        if i == 1:
            # Create table with first file
            con.execute(f"""
                CREATE TABLE demand AS 
                SELECT id, taxperiodfrom, taxperiodto, consumercode, status, businessservice
                FROM read_csv_auto('{file}')
                WHERE status = 'ACTIVE' AND businessservice = 'PT'
            """)
        else:
            # Append subsequent files
            con.execute(f"""
                INSERT INTO demand 
                SELECT id, taxperiodfrom, taxperiodto, consumercode, status, businessservice
                FROM read_csv_auto('{file}')
                WHERE status = 'ACTIVE' AND businessservice = 'PT'
            """)
    
    demand_count = con.execute("SELECT COUNT(*) FROM demand").fetchone()[0]
    print(f"\nTotal active PT demands loaded: {demand_count:,}")
    end = time.perf_counter()

print(f"Join completed in {end - start:.2f} seconds")

Found 148 demand files
Processing demand files one by one to avoid memory issues...
Processing demand file 1/148: output_47.csv
Processing demand file 2/148: output_117.csv
Processing demand file 3/148: output_123.csv
Processing demand file 4/148: output_79.csv
Processing demand file 5/148: output_108.csv
Processing demand file 6/148: output_19.csv
Processing demand file 7/148: output_125.csv
Processing demand file 8/148: output_80.csv
Processing demand file 9/148: output_23.csv
Processing demand file 10/148: output_114.csv
Processing demand file 11/148: output_110.csv
Processing demand file 12/148: output_12.csv
Processing demand file 13/148: output_132.csv
Processing demand file 14/148: output_38.csv
Processing demand file 15/148: output_73.csv
Processing demand file 16/148: output_86.csv
Processing demand file 17/148: output_35.csv
Processing demand file 18/148: output_97.csv
Processing demand file 19/148: output_138.csv
Processing demand file 20/148: output_144.csv
Processing deman

### Step 5: Process Demand Details (The Large Dataset - 9 Crore Rows)
#### This is where DuckDB shines - processing without loading into memory

In [6]:
# Get list of demand detail files
start = time.perf_counter()
detail_files = [os.path.join(folder_path_details, f) 
                for f in os.listdir(folder_path_details) if f.endswith('.csv')]

print(f"Found {len(detail_files)} demand detail files")
print("This is the large dataset (9 crore rows)")
print("Processing with aggregation to reduce memory usage...")

end = time.perf_counter()

print(f"Join completed in {end - start:.2f} seconds")

Found 1883 demand detail files
This is the large dataset (9 crore rows)
Processing with aggregation to reduce memory usage...
Join completed in 0.01 seconds


In [7]:
# Create aggregated demand details table
# Instead of loading all raw rows, we aggregate at read time
# con.execute("DROP TABLE demand_details_agg")
# Get list of demand detail files
start = time.perf_counter()
detail_files = [os.path.join(folder_path_details, f) 
                for f in os.listdir(folder_path_details) if f.endswith('.csv')]

print(f"Found {len(detail_files)} demand detail files")
print("This is the large dataset (9 crore rows)")
print("Processing with aggregation to reduce memory usage...")

# Drop tables if they exist (avoid errors)
con.execute("DROP TABLE IF EXISTS demand_details_agg")
con.execute("DROP TABLE IF EXISTS demand_details")

if detail_files:
    print("\nProcessing demand details with in-query aggregation...")
    
    for i, file in enumerate(detail_files, 1):
        print(f"Processing file {i}/{len(detail_files)}: {os.path.basename(file)}")
        
        if i == 1:
            # Create aggregated table with first file
            con.execute(f"""
                CREATE TABLE demand_details_agg AS
                SELECT 
                    demandid,
                    taxheadcode,
                    SUM(taxamount) AS taxamount,
                    SUM(collectionamount) AS collectionamount
                FROM read_csv_auto('{file}', strict_mode=false)
                WHERE demandid IN (SELECT id FROM demand)  -- Filter early
                GROUP BY demandid, taxheadcode
            """)
        else:
            # Process and merge subsequent files
            con.execute(f"""
                INSERT INTO demand_details_agg
                SELECT 
                    demandid,
                    taxheadcode,
                    SUM(taxamount) AS taxamount,
                    SUM(collectionamount) AS collectionamount
                FROM read_csv_auto('{file}', strict_mode=false)
                WHERE demandid IN (SELECT id FROM demand)  -- Filter early
                GROUP BY demandid, taxheadcode
            """)
        
        # Show progress
        if i % 5 == 0:
            current_count = con.execute("SELECT COUNT(*) FROM demand_details_agg").fetchone()[0]
            print(f"  Current aggregated records: {current_count:,}")
    
    # Final aggregation to combine duplicates from different files
    print("\nPerforming final aggregation...")
    con.execute("""
        CREATE TABLE demand_details AS
        SELECT 
            demandid,
            taxheadcode,
            SUM(taxamount) AS taxamount,
            SUM(collectionamount) AS collectionamount
        FROM demand_details_agg
        GROUP BY demandid, taxheadcode
    """)
    
    # Clean up temporary table
    con.execute("DROP TABLE demand_details_agg")
    
    detail_count = con.execute("SELECT COUNT(*) FROM demand_details").fetchone()[0]
    print(f"\nTotal aggregated demand details: {detail_count:,} records")
    print("✅ Successfully processed 9 crore rows with strict_mode=false!")

end = time.perf_counter()

print(f"Join completed in {end - start:.2f} seconds")

Found 1883 demand detail files
This is the large dataset (9 crore rows)
Processing with aggregation to reduce memory usage...

Processing demand details with in-query aggregation...
Processing file 1/1883: output_1359.csv
Processing file 2/1883: output_680.csv
Processing file 3/1883: output_845.csv
Processing file 4/1883: output_1290.csv
Processing file 5/1883: output_1352.csv
  Current aggregated records: 45,510
Processing file 6/1883: output_47.csv
Processing file 7/1883: output_1118.csv
Processing file 8/1883: output_152.csv
Processing file 9/1883: output_443.csv
Processing file 10/1883: output_1644.csv
  Current aggregated records: 91,109
Processing file 11/1883: output_205.csv
Processing file 12/1883: output_500.csv
Processing file 13/1883: output_1408.csv
Processing file 14/1883: output_720.csv
Processing file 15/1883: output_245.csv
  Current aggregated records: 136,637
Processing file 16/1883: output_1844.csv
Processing file 17/1883: output_1065.csv
Processing file 18/1883: out

In [8]:
# Count total rows in all demand detail files (without filter, no aggregation)
start = time.perf_counter()
print("\nCounting total raw demand detail rows (no filter)...")
total_rows = 0
for i, file in enumerate(detail_files, 1):
    row_count = con.execute(f"SELECT COUNT(*) FROM read_csv_auto('{file}', strict_mode=false)").fetchone()[0]
    total_rows += row_count
    if i % 5 == 0:
        print(f"  Processed {i}/{len(detail_files)} files → running total rows: {total_rows:,}")

print(f"\n📊 Total raw demand detail rows (all files): {total_rows:,}")

end = time.perf_counter()

print(f"Join completed in {end - start:.2f} seconds")

# Now your existing aggregation logic runs (keeps only rows with demandid in demand)



Counting total raw demand detail rows (no filter)...
  Processed 5/1883 files → running total rows: 250,000
  Processed 10/1883 files → running total rows: 500,000
  Processed 15/1883 files → running total rows: 750,000
  Processed 20/1883 files → running total rows: 1,000,000
  Processed 25/1883 files → running total rows: 1,250,000
  Processed 30/1883 files → running total rows: 1,500,000
  Processed 35/1883 files → running total rows: 1,750,000
  Processed 40/1883 files → running total rows: 2,000,000
  Processed 45/1883 files → running total rows: 2,250,000
  Processed 50/1883 files → running total rows: 2,500,000
  Processed 55/1883 files → running total rows: 2,750,000
  Processed 60/1883 files → running total rows: 3,000,000
  Processed 65/1883 files → running total rows: 3,250,000
  Processed 70/1883 files → running total rows: 3,500,000
  Processed 75/1883 files → running total rows: 3,750,000
  Processed 80/1883 files → running total rows: 4,000,000
  Processed 85/1883 files

### Step 6: Create Joined and Aggregated View

In [16]:
# Join demand with aggregated demand details
print("Creating joined demand view...")

con.execute("""
    CREATE OR REPLACE TABLE joined_demand AS
    SELECT 
        d.id,
        d.taxperiodfrom,
        d.taxperiodto,
        d.consumercode,
        dd.taxheadcode,
        dd.taxamount,
        dd.collectionamount
    FROM demand d
    LEFT JOIN demand_details dd ON d.id = dd.demandid
""")

joined_count = con.execute("SELECT COUNT(*) FROM joined_demand").fetchone()[0]
print(f"Created joined demand table with {joined_count:,} rows")

Creating joined demand view...
Created joined demand table with 7,043,791 rows


### Step 7: Process Timestamps and Calculate Financial Years

In [17]:
# Get joined data for FY processing
print("Processing timestamps and calculating financial years...")

joined_demand = con.execute("SELECT * FROM joined_demand").df()

# Convert timestamps
ist = pytz.timezone('Asia/Kolkata')
joined_demand['taxperiodfrom'] = pd.to_datetime(joined_demand['taxperiodfrom'], unit='ms', utc=True)
joined_demand['taxperiodto'] = pd.to_datetime(joined_demand['taxperiodto'], unit='ms', utc=True)
joined_demand['taxperiodfrom'] = joined_demand['taxperiodfrom'].dt.tz_convert(ist)
joined_demand['taxperiodto'] = joined_demand['taxperiodto'].dt.tz_convert(ist)

# Calculate financial year
def get_fy(date):
    if pd.isna(date):
        return None
    if date.month >= 4:
        fy_start = date.year
        fy_end = date.year + 1
    else:
        fy_start = date.year - 1
        fy_end = date.year
    return f"{fy_start}-{str(fy_end)[-2:]}"

joined_demand['fy'] = joined_demand['taxperiodfrom'].apply(get_fy)

# Register the dataframe back to DuckDB
con.register('joined_demand_with_fy', joined_demand)

print("Financial years calculated")

Processing timestamps and calculating financial years...
Financial years calculated


### Step 8: Calculate Metrics Using SQL

In [18]:
# Calculate earliest and latest FY per consumer
print("Calculating financial year metrics...")

result = con.execute("""
    SELECT 
        consumercode,
        MIN(fy) as earliest_fy,
        MAX(fy) as latest_fy
    FROM joined_demand_with_fy
    WHERE fy IS NOT NULL
    GROUP BY consumercode
""").df()

print(f"Processed {len(result)} unique consumer codes")

Calculating financial year metrics...
Processed 104125 unique consumer codes


In [19]:
# Calculate latest FY tax amount
print("Calculating latest FY tax amounts...")

latest_fy_tax = con.execute("""
    WITH latest_fy_demand AS (
        SELECT 
            jd.consumercode,
            jd.taxheadcode,
            jd.taxamount,
            jd.fy
        FROM joined_demand_with_fy jd
        INNER JOIN (
            SELECT consumercode, MAX(fy) as latest_fy
            FROM joined_demand_with_fy
            WHERE fy IS NOT NULL
            GROUP BY consumercode
        ) lf ON jd.consumercode = lf.consumercode AND jd.fy = lf.latest_fy
    ),
    pivoted AS (
        SELECT 
            consumercode,
            SUM(CASE WHEN taxheadcode = 'PT_TAX' THEN taxamount ELSE 0 END) as PT_TAX,
            SUM(CASE WHEN taxheadcode = 'PT_CANCER_CESS' THEN taxamount ELSE 0 END) as PT_CANCER_CESS,
            SUM(CASE WHEN taxheadcode = 'PT_FIRE_CESS' THEN taxamount ELSE 0 END) as PT_FIRE_CESS,
            SUM(CASE WHEN taxheadcode = 'PT_ROUNDOFF' THEN taxamount ELSE 0 END) as PT_ROUNDOFF,
            SUM(CASE WHEN taxheadcode = 'PT_OWNER_EXEMPTION' THEN ABS(taxamount) ELSE 0 END) as PT_OWNER_EXEMPTION,
            SUM(CASE WHEN taxheadcode = 'PT_UNIT_USAGE_EXEMPTION' THEN ABS(taxamount) ELSE 0 END) as PT_UNIT_USAGE_EXEMPTION
        FROM latest_fy_demand
        GROUP BY consumercode
    )
    SELECT 
        consumercode,
        (PT_TAX + PT_CANCER_CESS + PT_FIRE_CESS + PT_ROUNDOFF - 
         (PT_OWNER_EXEMPTION + PT_UNIT_USAGE_EXEMPTION)) as latest_fy_taxamount
    FROM pivoted
""").df()

result = result.merge(latest_fy_tax, on='consumercode', how='left')
print("Latest FY tax amounts calculated")

Calculating latest FY tax amounts...
Latest FY tax amounts calculated


In [20]:
# Calculate current year tax (2025-26)
print("Calculating current year tax amounts...")

current_fy_tax = con.execute("""
    WITH current_fy_demand AS (
        SELECT 
            consumercode,
            taxheadcode,
            taxamount
        FROM joined_demand_with_fy
        WHERE fy = '2025-26'
    ),
    pivoted AS (
        SELECT 
            consumercode,
            SUM(CASE WHEN taxheadcode = 'PT_TAX' THEN taxamount ELSE 0 END) as PT_TAX,
            SUM(CASE WHEN taxheadcode = 'PT_CANCER_CESS' THEN taxamount ELSE 0 END) as PT_CANCER_CESS,
            SUM(CASE WHEN taxheadcode = 'PT_FIRE_CESS' THEN taxamount ELSE 0 END) as PT_FIRE_CESS,
            SUM(CASE WHEN taxheadcode = 'PT_ROUNDOFF' THEN taxamount ELSE 0 END) as PT_ROUNDOFF,
            SUM(CASE WHEN taxheadcode = 'PT_OWNER_EXEMPTION' THEN ABS(taxamount) ELSE 0 END) as PT_OWNER_EXEMPTION,
            SUM(CASE WHEN taxheadcode = 'PT_UNIT_USAGE_EXEMPTION' THEN ABS(taxamount) ELSE 0 END) as PT_UNIT_USAGE_EXEMPTION
        FROM current_fy_demand
        GROUP BY consumercode
    )
    SELECT 
        consumercode,
        (PT_TAX + PT_CANCER_CESS + PT_FIRE_CESS + PT_ROUNDOFF - 
         (PT_OWNER_EXEMPTION + PT_UNIT_USAGE_EXEMPTION)) as current_fy_taxamount
    FROM pivoted
""").df()

# Ensure all consumercodes have current_fy_taxamount
all_consumers = pd.DataFrame(result['consumercode'].unique(), columns=['consumercode'])
current_fy_tax_full = all_consumers.merge(current_fy_tax, on='consumercode', how='left')
current_fy_tax_full['current_fy_taxamount'] = current_fy_tax_full['current_fy_taxamount'].fillna(0)

result = result.merge(current_fy_tax_full, on='consumercode', how='left')
print("Current year tax amounts calculated")

Calculating current year tax amounts...
Current year tax amounts calculated


In [21]:
# Calculate arrear demands
print("Calculating arrear demands...")

arrear = con.execute("""
    SELECT 
        consumercode,
        SUM(taxamount) - SUM(collectionamount) as arrear_years_demand_generated
    FROM joined_demand_with_fy
    WHERE fy < '2025-26'
    GROUP BY consumercode
""").df()

result = result.merge(arrear, on='consumercode', how='left')
result['arrear_years_demand_generated'] = result['arrear_years_demand_generated'].fillna(0)
print("Arrear demands calculated")

Calculating arrear demands...
Arrear demands calculated


In [22]:
# Calculate penalties and interest
print("Calculating penalties and interest...")

penalties = con.execute("""
    SELECT 
        consumercode,
        SUM(CASE WHEN taxheadcode = 'PT_TIME_PENALTY' THEN taxamount ELSE 0 END) as PT_TIME_PENALTY,
        SUM(CASE WHEN taxheadcode = 'PT_TIME_INTEREST' THEN taxamount ELSE 0 END) as PT_TIME_INTEREST
    FROM joined_demand_with_fy
    WHERE taxheadcode IN ('PT_TIME_PENALTY', 'PT_TIME_INTEREST')
    GROUP BY consumercode
""").df()

result = result.merge(penalties, on='consumercode', how='left')
result[['PT_TIME_PENALTY', 'PT_TIME_INTEREST']] = result[['PT_TIME_PENALTY', 'PT_TIME_INTEREST']].fillna(0)
print("Penalties and interest calculated")

Calculating penalties and interest...
Penalties and interest calculated


### Step 9: Process Property Data and Merge

In [23]:
# Join property and unit data
print("Processing property and unit data...")

joined_pt_unit = con.execute("""
    SELECT 
        p.*,
        u.occupancytype,
        u.builtuparea,
        u.plintharea
    FROM property p
    LEFT JOIN unit u ON p.id = u.propertyid
""").df()

print(f"Joined {len(joined_pt_unit)} property-unit records")

Processing property and unit data...
Joined 194547 property-unit records


In [24]:
# Load additional data for ownership classification
print("Loading additional property data for ownership classification...")

# Read the full unit data for ownership classification
unit_all_columns_df = pd.read_csv(
    '/home/admin1/Downloads/punjab-data-prod-analysis/8sept/amritsar/eg_pt_unit.csv'
)

# Read property data
property_df = pd.read_csv(
    '/home/admin1/Downloads/punjab-data-prod-analysis/8sept/amritsar/eg_pt_property.csv',
    usecols=['id', 'propertyid', 'tenantid', 'createdtime', 'additionaldetails', 
             'ownershipcategory', 'status', 'usagecategory', 'propertytype']
)
property_df = property_df[property_df['status'] == 'ACTIVE'].copy()

# Merge and classify ownership
merged = property_df.merge(unit_all_columns_df, left_on='id', right_on='propertyid', suffixes=('_property', '_unit'))

def classify_ownership(occupancies):
    unique_types = set(occupancies)
    if 'RENTED' in unique_types:
        if len(unique_types) > 1:
            return 'Mixed'
        else:
            return 'Tenant'
    if 'SELFOCCUPIED' in unique_types:
        return 'Owner'
    if 'UNOCCUPIED' in unique_types:
        return 'Owner'
    return None

ownership = (
    merged.groupby('propertyid_property')['occupancytype']
    .apply(classify_ownership)
    .reset_index()
    .rename(columns={'occupancytype': 'Owned_Rented'})
)

property_df = property_df.merge(ownership, left_on='propertyid', right_on='propertyid_property', how='left')

print("Ownership classification complete")

Loading additional property data for ownership classification...


  unit_all_columns_df = pd.read_csv(


Ownership classification complete


In [25]:
# Calculate area summaries
def clean_numeric(series):
    return pd.to_numeric(series.replace('NULL', 0), errors='coerce').fillna(0)

merged['builtuparea'] = clean_numeric(merged['builtuparea'])
merged['plintharea'] = clean_numeric(merged['plintharea'])

area_summary = (
    merged.groupby('propertyid_property', as_index=False)
    .agg(
        total_builtup_area=('builtuparea', 'sum'),
        total_plinth_area=('plintharea', 'sum')
    )
)

property_df = property_df.merge(area_summary, left_on='propertyid', right_on='propertyid_property', how='left')
property_df['total_builtup_area'] = property_df['total_builtup_area'].fillna(0)
property_df['total_plinth_area'] = property_df['total_plinth_area'].fillna(0)

print("Area calculations complete")

Area calculations complete


In [26]:
# Merge with demand results
property_result_merged = property_df.merge(
    result,
    left_on='propertyid',
    right_on='consumercode',
    how='left'
)

print(f"Merged {len(property_result_merged)} property records with demand data")

Merged 104141 property records with demand data


### Step 10: Add Exemption Status

In [27]:
# Load owner data for exemption status
owner_df = pd.read_csv(
    '/home/admin1/Downloads/punjab-data-prod-analysis/8sept/amritsar/eg_pt_owner.csv',
    usecols=['propertyid', 'ownertype', 'status']
)

owner_df = owner_df[owner_df['status'] == 'ACTIVE'].copy()

# Determine exemption
owner_df['is_exempted'] = owner_df['ownertype'].isin(['WIDOW', 'FREEDOMFIGHTER'])
exempted_status = owner_df.groupby('propertyid')['is_exempted'].any().reset_index()
exempted_status['Is Property Exempted [Yes/ No]'] = exempted_status['is_exempted'].apply(lambda x: 'Yes' if x else 'No')
exempted_status = exempted_status.drop(columns=['is_exempted'])

# Add exemption to merged result
property_result_merged = property_result_merged.merge(
    exempted_status[['propertyid', 'Is Property Exempted [Yes/ No]']],
    left_on='id',
    right_on='propertyid',
    how='left'
)

property_result_merged['Is Property Exempted [Yes/ No]'] = property_result_merged['Is Property Exempted [Yes/ No]'].fillna('No')

if 'propertyid' in property_result_merged.columns:
    property_result_merged.drop(columns=['propertyid'], inplace=True)

if 'propertyid_x' in property_result_merged.columns:
    property_result_merged['propertyid'] = property_result_merged['propertyid_x']

print("Exemption status added")

Exemption status added


### Step 11: Create Final Report

In [28]:
# Rename columns for final report
report = property_result_merged.rename(columns={
    'tenantid': 'ULB',
    'propertyid': 'Property ID',
    'usagecategory': 'Usage',
    'createdtime': 'Date of Creation of the Property in the System',
    'additionaldetails': 'Date of Construction of the Property',
    'ownershipcategory': 'Ownership Type',
    'Is Property Exempted [Yes/ No]': 'Is Property Exempted [Yes/ No]',
    'Owned_Rented': 'Owned_Rented (Owner/ Rented/ Mixed)',
    'earliest_fy': 'Earliest Financial Year for which Demand was Generated',
    'latest_fy': 'Latest Financial Year for which Demand was Generated',
    'latest_fy_taxamount': 'Latest Demand Generated [in Rs.]',
    'current_fy_taxamount': 'Current Years Demand Generated [in Rs.]',
    'PT_TIME_PENALTY': 'Penalty',
    'PT_TIME_INTEREST': 'Interest',
    'arrear_years_demand_generated': 'Arrear Years Demand Generated [in Rs.]',
    'propertytype': 'Property Type[Building/ Vacant]',
    'total_builtup_area': 'Total Builtup Area [Sum of all units/ floors]',
    'total_plinth_area': 'Total Plinth Area [Sum of all units/ floors]'
}).copy()

# Format ULB and date fields
def epoch_to_custom_date(epoch_ms):
    return datetime.fromtimestamp(epoch_ms / 1000).strftime('%d-%b-%Y') if pd.notna(epoch_ms) else None

def get_year_construction(val):
    if pd.isna(val): return None
    try: return json.loads(val).get('yearConstruction')
    except: return None

report['ULB'] = report['ULB'].str.split('.').str[1].str.capitalize()
report['Date of Creation of the Property in the System'] = report['Date of Creation of the Property in the System'].apply(epoch_to_custom_date)
report['Date of Construction of the Property'] = report['Date of Construction of the Property'].apply(get_year_construction)

print("Report formatting complete")

Report formatting complete


In [29]:
# Select final columns
final_report = report[
    [
        'ULB',
        'Property ID',
        'Usage',
        'Date of Creation of the Property in the System',
        'Date of Construction of the Property',
        'Ownership Type',
        'Is Property Exempted [Yes/ No]',
        'Owned_Rented (Owner/ Rented/ Mixed)',
        'Earliest Financial Year for which Demand was Generated',
        'Latest Financial Year for which Demand was Generated',
        'Latest Demand Generated [in Rs.]',
        'Current Years Demand Generated [in Rs.]',
        'Penalty',
        'Interest',
        'Arrear Years Demand Generated [in Rs.]',
        'Property Type[Building/ Vacant]',
        'Total Builtup Area [Sum of all units/ floors]',
        'Total Plinth Area [Sum of all units/ floors]'
    ]
].copy()

print(f"Final report created with {len(final_report)} properties")

Final report created with 104141 properties


### Step 12: Save Results

In [30]:
# Save the final report
output_filename = 'Punjab_Data_Analysis_DuckDB_Result_amritsar.csv'
print(f"Writing CSV to {output_filename}...")
final_report.to_csv(output_filename, index=False)
print(f"✅ Done! CSV generated with {len(final_report)} properties")

Writing CSV to Punjab_Data_Analysis_DuckDB_Result_amritsar.csv...
✅ Done! CSV generated with 104141 properties


### Step 13: Cleanup and Summary

In [31]:
# Close DuckDB connection
con.close()

# Display summary statistics
print("\n" + "="*50)
print("PROCESSING COMPLETE - SUMMARY")
print("="*50)
print(f"Total properties processed: {len(final_report):,}")
print(f"Properties with demands: {final_report['Latest Demand Generated [in Rs.]'].notna().sum():,}")
print(f"Properties with current year demands: {(final_report['Current Years Demand Generated [in Rs.]'] > 0).sum():,}")
print(f"Properties with arrears: {(final_report['Arrear Years Demand Generated [in Rs.]'] > 0).sum():,}")
print(f"\nOutput saved to: {output_filename}")

# Display first few rows
print("\nFirst 5 rows of the final report:")
final_report.head()


PROCESSING COMPLETE - SUMMARY
Total properties processed: 104,141
Properties with demands: 101,485
Properties with current year demands: 68,489
Properties with arrears: 28,710

Output saved to: Punjab_Data_Analysis_DuckDB_Result_amritsar.csv

First 5 rows of the final report:


Unnamed: 0,ULB,Property ID,Usage,Date of Creation of the Property in the System,Date of Construction of the Property,Ownership Type,Is Property Exempted [Yes/ No],Owned_Rented (Owner/ Rented/ Mixed),Earliest Financial Year for which Demand was Generated,Latest Financial Year for which Demand was Generated,Latest Demand Generated [in Rs.],Current Years Demand Generated [in Rs.],Penalty,Interest,Arrear Years Demand Generated [in Rs.],Property Type[Building/ Vacant],Total Builtup Area [Sum of all units/ floors],Total Plinth Area [Sum of all units/ floors]
0,Amritsar,PT-107-961578,MIXED,29-Aug-2020,,INDIVIDUAL.SINGLEOWNER,Yes,Owner,2019-20,2025-26,0.0,0.0,0.0,0.0,0.0,BUILTUP.INDEPENDENTPROPERTY,250.0,0.0
1,Amritsar,PT-107-953591,NONRESIDENTIAL.INDUSTRIAL,22-Dec-2022,,INDIVIDUAL.SINGLEOWNER,No,Owner,2019-20,2025-26,1727.45,1727.45,1429.45,567.89,0.0,BUILTUP.INDEPENDENTPROPERTY,328.0,0.0
2,Amritsar,PT-107-2031133,RESIDENTIAL,13-Dec-2024,2014-15,INDIVIDUAL.SINGLEOWNER,No,Owner,2015-16,2025-26,297.1,297.1,424.55,1756.2,0.0,BUILTUP.INDEPENDENTPROPERTY,152.0,0.0
3,Amritsar,PT-107-968753,MIXED,21-Sep-2021,,INDIVIDUAL.SINGLEOWNER,No,Owner,2019-20,2024-25,1756.1,0.0,135.0,0.0,0.0,BUILTUP.INDEPENDENTPROPERTY,120.0,0.0
4,Amritsar,PT-107-961358,MIXED,29-Aug-2020,,INDIVIDUAL.SINGLEOWNER,No,Owner,2019-20,2025-26,1106.38,1106.38,622.36,1058.31,7488.75,BUILTUP.INDEPENDENTPROPERTY,60.0,0.0


### Memory Usage Information

In [32]:
# Check memory usage
import psutil
import os

process = psutil.Process(os.getpid())
memory_info = process.memory_info()
print(f"\nMemory Usage:")
print(f"RSS (Resident Set Size): {memory_info.rss / 1024 / 1024:.2f} MB")
print(f"VMS (Virtual Memory Size): {memory_info.vms / 1024 / 1024:.2f} MB")
print(f"\nDuckDB efficiently processed 9 crore+ rows without loading everything into memory!")


Memory Usage:
RSS (Resident Set Size): 3004.29 MB
VMS (Virtual Memory Size): 7133.88 MB

DuckDB efficiently processed 9 crore+ rows without loading everything into memory!
