# **Project Samarth**
**Project Samarth** is an AI-powered data integration and Q&A prototype built over real datasets from data.gov.in
. It connects agricultural production and climate (rainfall) data using DuckDB and Parquet pipelines, enabling natural-language queries about India’s agricultural economy. The system automatically interprets questions, generates SQL, executes them in real-time, and provides accurate, cited insights for evidence-based policymaking.

###1. **Environment and Base Directory Setup**

1. **Environment Setup** for clean and reproducible workspace for data data workflow. Installing specific lib for reproducing the result later on same or diff machine and organizing the raw vs processed data reducing the risk of overwriting.
2. **Base Directory** for centralized place of data storage. Single root for raw, intermediate results, processed outputs. Easy cleanup and miantainance.



In [None]:
# Cell 1 — Environment setup
import os, requests
# os: for interacting with operating system
# requests: for making HTTP requests (data downloads)

# Define base directory
base = "/content/data"
os.makedirs(f"{base}/raw", exist_ok=True)
os.makedirs(f"{base}/processed", exist_ok=True)
os.makedirs("/content/db", exist_ok=True)

# Install libraries
!pip install -q duckdb pandas pyarrow requests seaborn plotly
# duckdb (in-process SQL analytics)
# pandas (data manipulation)
# pyarrow (Apache Arrow, often used with Parquet)
# requests (already imported, for downloads)
# seaborn (visualization)
# plotly (interactive plots)

print("Environment ready. Base dir:", base)


Environment ready. Base dir: /content/data


2. **Download Crop Dataset**(Production-Under-Different-Crops_during-2029-20.csv).

In [None]:
# Setting Dataset URL's
RAINFALL_CSV_URL = "https://www.data.gov.in/files/ogdpv2dms/s3fs-public/datafile/Sub_Division_IMD_2017.csv"
CROP_CSV_URL     = "https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Prodution-Under-Different-Crops_during-2019-20.csv"

print("Rainfall URL set:", bool(RAINFALL_CSV_URL.strip()))
print("Crop URL set:", bool(CROP_CSV_URL.strip()))


Rainfall URL set: True
Crop URL set: True


**Static Method** used when the dataset link is known. It doesn't record dataset details(metadata, checksum, etc.)

In [None]:
# Downloading Agricultural Crop Dataset
agri_url = "https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Prodution-Under-Different-Crops_during-2019-20.csv"
agri_path = f"{base}/raw/agri_production.csv" # path of downloaded file storage

r = requests.get(agri_url) # Sends HTTP GET request to fetch the CSV content from URL.
open(agri_path, "wb").write(r.content) # writes raw bytes (write binary) to specific path
print("Downloaded Agriculture Production CSV ")


Downloaded Agriculture Production CSV ✅


3. Robust downloader + catalog helper to track what has been downloaded where it is stored and how to verify integrity.

In [None]:
# Cell 3 — Downloader + catalog append helper
import requests, hashlib, time, csv, os

#defining catalog path
catalog_path = f"{base}/data_catalog.csv"
if not os.path.exists(catalog_path): # if doesn't exist then create it
    with open(catalog_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["title","resource_id","source_url","local_path","checksum","downloaded_at"])

#define downloader helper function
# to download a file from the given url to the local path with a timeout
def download_to(path, url, timeout=60):
    print("Downloading:", url)
    r = requests.get(url, allow_redirects=True, timeout=timeout)
    r.raise_for_status()
    open(path, "wb").write(r.content)
    chksum = hashlib.md5(r.content).hexdigest() # for detecting tampering or changes in source data for better integrity
    print("Saved", path, "size:", len(r.content), "bytes checksum:", chksum)
    return chksum

# creating final catalog with all necessary information about the datasets
def append_catalog(title, resource_id, source_url, local_path, checksum):
    with open(catalog_path, "a", newline="") as f:
        writer = csv.writer(f)
        writer.writerow([title, resource_id, source_url, local_path, checksum, time.strftime("%Y-%m-%d")])


4. Download or Upload rainfall CSV manually, then log it in a catalog with its integrity checksum.

In [None]:
# Cell 4 — Get rainfall CSV (download or manual upload)

# Defining local path where rainfall CSV will be saved locally
rain_raw = f"{base}/raw/rainfall.csv"

if RAINFALL_CSV_URL.strip():
    try:
        chksum = download_to(rain_raw, RAINFALL_CSV_URL) # download_to is a helper that downloads the file to rain_raw and returns its checksum.
        append_catalog("Rainfall Dataset (downloaded)", "rainfall_resource", RAINFALL_CSV_URL, rain_raw, chksum)
    except Exception as e: # if download fails print error and ask for manual uploading of dataset
        print("Download failed:", e)
        print("Please use manual upload cell below.")
else:
    print("No rainfall URL provided. Please upload the file using the cell below.")


Downloading: https://www.data.gov.in/files/ogdpv2dms/s3fs-public/datafile/Sub_Division_IMD_2017.csv
Saved /content/data/raw/rainfall.csv size: 445369 bytes checksum: 9d4829133f031c90257e5be409c7fd42


5. Manually upload fallback (rainfall)
 Data.gov.in datasets are hosted dynamically — links often expire, change names, or redirect through JavaScript

So, the automated download code may fail if the link is no longer accessible through direct HTTP requests.

Thus keeping why we keep a manual upload cell as a backup option — so the system still works even if the live API or file link stops responding.

In [None]:
# Cell 5 — Manual upload fallback for rainfall (run only if you did not download)
from google.colab import files
uploaded = files.upload()  # select your rainfall csv file
for name in uploaded:
    src = f"/content/{name}"
    dst = f"{base}/raw/rainfall.csv"
    os.rename(src, dst)
    import hashlib
    chksum = hashlib.md5(open(dst,'rb').read()).hexdigest()
    append_catalog("Rainfall Dataset (manual)", "rainfall_manual", "manual_upload", dst, chksum)
    print("Uploaded rainfall to", dst)


Saving Sub_Division_IMD_2017.csv to Sub_Division_IMD_2017.csv
Uploaded rainfall to /content/data/raw/rainfall.csv


6. Download or upload crop CSV

In [None]:
# Cell 6 — Get crop CSV (download or manual upload)
crop_raw = f"{base}/raw/crop_data.csv"

if CROP_CSV_URL.strip():
    try:
        chksum = download_to(crop_raw, CROP_CSV_URL)
        append_catalog("Crop Dataset (downloaded)", "crop_resource", CROP_CSV_URL, crop_raw, chksum)
    except Exception as e:
        print("Download failed:", e)
        print("Please use manual upload cell below.")
else:
    print("No crop URL provided. Please upload the file using the cell below.")


Downloading: https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Prodution-Under-Different-Crops_during-2019-20.csv
Saved /content/data/raw/crop_data.csv size: 1296 bytes checksum: 9675a5c8205f494ba23474562286a7ba


7. Manually upload fallback (crop)

In [None]:
# Cell 7 — Manual upload fallback for crop (if needed)
from google.colab import files
uploaded = files.upload()
for name in uploaded:
    src = f"/content/{name}" # src is the temp colab path
    dst = f"{base}/raw/crop_data.csv" # dst is the target path
    os.rename(src, dst) # moves the file to the desired location
    import hashlib
    chksum = hashlib.md5(open(dst,'rb').read()).hexdigest()
    # checksum is used to detect that the file has been altered or corrupted. to ensure that working with the same directory.
    append_catalog("Crop Dataset (manual)", "crop_manual", "manual_upload", dst, chksum)
    print("Uploaded crop file to", dst)


Saving Prodution-Under-Different-Crops_during-2019-20.csv to Prodution-Under-Different-Crops_during-2019-20.csv
Uploaded crop file to /content/data/raw/crop_data.csv


8. Quick peek (ensure files are not HTML)

In [None]:
# Cell 8 — Peek the first 12 lines to ensure files are CSV (not HTML)
def peek(path):
    print("---- peek", path) # print header showing which file is being inspected
    if not os.path.exists(path): # if does not exist print not found
        print("File not found:", path); return
    with open(path, 'r', encoding='utf-8', errors='ignore') as f: # open with utf-8 encoding
        for i in range(12):
            line = f.readline()
            if not line:
                break
            print(i+1, ":", line.strip()[:300]) # printing each line with upto 300 characters
peek(rain_raw) # printing initial content of rainfall and crop dataset for quick parsing
peek(crop_raw)


---- peek /content/data/raw/rainfall.csv
1 : SUBDIVISION,YEAR,JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC,ANNUAL,JF,MAM,JJAS,OND
2 : Andaman & Nicobar Islands,1901,49.2,87.1,29.2,2.3,528.8,517.5,365.1,481.1,332.6,388.5,558.2,33.6,3373.2,136.3,560.3,1696.3,980.3
3 : Andaman & Nicobar Islands,1902,0,159.8,12.2,0,446.1,537.1,228.9,753.7,666.2,197.2,359,160.5,3520.7,159.8,458.3,2185.9,716.7
4 : Andaman & Nicobar Islands,1903,12.7,144,0,1,235.1,479.9,728.4,326.7,339,181.2,284.4,225,2957.4,156.7,236.1,1874,690.6
5 : Andaman & Nicobar Islands,1904,9.4,14.7,0,202.4,304.5,495.1,502,160.1,820.4,222.2,308.7,40.1,3079.6,24.1,506.9,1977.6,571
6 : Andaman & Nicobar Islands,1905,1.3,0,3.3,26.9,279.5,628.7,368.7,330.5,297,260.7,25.4,344.7,2566.7,1.3,309.7,1624.9,630.8
7 : Andaman & Nicobar Islands,1906,36.6,0,0,0,556.1,733.3,247.7,320.5,164.3,267.8,128.9,79.2,2534.4,36.6,556.1,1465.8,475.9
8 : Andaman & Nicobar Islands,1907,110.7,0,113.3,21.6,616.3,305.2,443.9,377.6,200.4,264.4,648.9,245.6,3347.9,

9. Robust CSV reader (handles delimiters, skiprows)

In [None]:
# Cell 9 — Robust CSV parsing function
import pandas as pd

def robust_read_csv(path, max_skip=6):
    # Try default
    try:
        df = pd.read_csv(path, low_memory=False)
        print("Default read OK:", df.shape)
        return df
    except Exception as e:
        print("Default read failed:", e)

    # Try python engine sniff
    try:
        df = pd.read_csv(path, engine='python', sep=None, low_memory=False)
        print("Sniffer read OK:", df.shape)
        return df
    except Exception as e:
        print("Sniffer failed:", e)

    # Try common separators & skiprows
    seps = [',',';','\t','|']
    for skip in range(0, max_skip):
        for sep in seps:
            try:
                df = pd.read_csv(path, sep=sep, skiprows=skip, engine='python', low_memory=False)
                if df.shape[1] >= 2 and df.shape[0] > 0:
                    print(f"Success read with sep='{sep}', skiprows={skip}", df.shape)
                    return df
            except Exception:
                pass
    raise ValueError("Could not parse CSV. Inspect the file manually with peek().")

# Use it
df_rain = robust_read_csv(rain_raw)
df_crop = robust_read_csv(crop_raw)
print("Rain shape:", df_rain.shape, "Crop shape:", df_crop.shape)

# The function aims to provide a single, reusable way to parse CSVs that might not conform to a single standard.
# By trying multiple approaches, we can compare results, detect inconsistencies, and choose the most reliable interpretation.


Default read OK: (4188, 19)
Default read OK: (13, 13)
Rain shape: (4188, 19) Crop shape: (13, 13)


10. Normalize the columns

In [None]:
# Cell 10 — Normalize column names for both dataframes for better consistency, compatibility, predictability
# Make column names consistent and filesystem-friendly by converting them to lowercase, replacing spaces and certain characters with underscores, and trimming whitespace.
def normalize_cols(df):
    df = df.copy()
    df.columns = [str(c).strip().lower().replace(" ", "_").replace(".", "").replace("/","_") for c in df.columns]
    return df

df_rain = normalize_cols(df_rain)
df_crop = normalize_cols(df_crop)
print("Rain cols:", df_rain.columns.tolist()[:30]) # print first 30 col names for Rain and Crop to verify the transformation
print("Crop cols:", df_crop.columns.tolist()[:30])


Rain cols: ['subdivision', 'year', 'jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec', 'annual', 'jf', 'mam', 'jjas', 'ond']
Crop cols: ['state', 'district', 'wheat_(_in_metric_tonnes)', 'maize_(_in_metric_tonnes)', 'rice_(_in_metric_tonnes)', 'barley_(_in_metric_tonnes)', 'ragi_(_in_metric_tonnes)', 'pulses_(_in_metric_tonnes)', 'common_millets_(_in_metric_tonnes)', 'total_food_grains_(_in_metric_tonnes)', 'chillies_(_in_metric_tonnes)', 'ginger_(_in_metric_tonnes)', 'oil__seeds_(_in_metric_tonnes)']


11. Map/rename common rainfall columns and cleaning

In [None]:
# Cell 11 — Rainfall rename & cleaning (handles missing 'state'):
# This cell normalizes column names, detects which column is the state/subdivision, finds
# the rainfall and year columns, converts them to numeric, renames them to standard names (state, rainfall_mm, year), removes rows without a state, and prints a quick status. These steps make the IMD CSV compatible for joining and analysis.

import pandas as pd

# Normalize column names again for safety
df_rain.columns = [str(c).strip().lower().replace(" ", "_").replace("-", "_").replace("/", "_") for c in df_rain.columns]
# Government CSVs use inconsistent column names like State/UT, Year, Annual Rainfall (mm). Normalization creates predictable field names so later code can match them reliably (e.g., state, annual, rainfall_mm).

# Identify possible column candidates for 'state'
# creating the list of col names that look like they could represent a state or administrative division, based on keywords
possible_state_cols = [c for c in df_rain.columns if any(x in c for x in ["state", "ut", "subdiv", "division"])]
# checking for multiple keywords because the datasets might use subdivision instead of state

# If we found col, then pick the first one and rename it to "state". If not found, print available cols.
if possible_state_cols:
    state_col = possible_state_cols[0]
    print(f"Found '{state_col}' column for state mapping.")
    df_rain = df_rain.rename(columns={state_col: "state"})
else:
    print("No state/subdivision column found. Showing available columns:")
    print(df_rain.columns.tolist())

# Identify rainfall and year columns
# Looks for columns that likely contain rainfall values by searching for keywords rainfall or precip.
possible_rain_cols = [c for c in df_rain.columns if "rainfall" in c or "precip" in c]
rain_col = possible_rain_cols[0] if possible_rain_cols else None
# IMD files may have columns called annual, annual_rainfall_(mm), rainfall_mm, or similar. This picks the first matching column if any.

# If a rainfall-like column exists:
# Convert its values to numeric (strip commas first), coercing invalid entries to NaN.
# Rename the column to rainfall_mm.
if rain_col:
    df_rain[rain_col] = pd.to_numeric(df_rain[rain_col].astype(str).str.replace(",",""), errors='coerce')
    df_rain = df_rain.rename(columns={rain_col: "rainfall_mm"})
else:
    print("No rainfall column found!")


# Sometimes year cells may contain extra text like Year: 2017 or 2017-2018; str.extract(r"(\d{4})") isolates the actual year number. Converting to numeric makes it usable for time-based queries (min, max, BETWEEN).
if "year" in df_rain.columns:
    df_rain["year"] = pd.to_numeric(df_rain["year"].astype(str).str.extract(r"(\d{4})")[0], errors="coerce")
else:
    print("No year column found!")

# Drop missing state rows if state now exists
if "state" in df_rain.columns:
    df_rain = df_rain.dropna(subset=["state"]).reset_index(drop=True)
    # Removes rows where the state is missing, then resets the index.
    # Rows without a state cannot be joined or used for regional analysis, so they’re removed to avoid noise or join failures.

print(" After rainfall cleaning:", df_rain.shape)
df_rain.head(3)


✅ Found 'subdivision' column for state mapping.
⚠️ No rainfall column found!
✅ After rainfall cleaning: (4188, 19)


Unnamed: 0,state,year,jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec,annual,jf,mam,jjas,ond
0,Andaman & Nicobar Islands,1901,49.2,87.1,29.2,2.3,528.8,517.5,365.1,481.1,332.6,388.5,558.2,33.6,3373.2,136.3,560.3,1696.3,980.3
1,Andaman & Nicobar Islands,1902,0.0,159.8,12.2,0.0,446.1,537.1,228.9,753.7,666.2,197.2,359.0,160.5,3520.7,159.8,458.3,2185.9,716.7
2,Andaman & Nicobar Islands,1903,12.7,144.0,0.0,1.0,235.1,479.9,728.4,326.7,339.0,181.2,284.4,225.0,2957.4,156.7,236.1,1874.0,690.6


In [None]:
# Save cleaned IMD rainfall data properly
base_dir = "/content/data"
df_rain.to_parquet(f"{base_dir}/processed/imd_rainfall.parquet", index=False)
print("Rainfall parquet saved successfully at:", f"{base_dir}/processed/imd_rainfall.parquet")


✅ Rainfall parquet saved successfully at: /content/data/processed/imd_rainfall.parquet


12. Map/rename common crop columns and cleaning

In [None]:
# Cell 12 (Final) — Convert wide-format crop data into long format for unified Q&A

import pandas as pd

# Normalize columns
# Cleans all column headers:
# removes spaces, dashes, and slashes, and converts them to lowercase.
df_crop.columns = [str(c).strip().lower().replace(" ", "_").replace("-", "_").replace("/", "_") for c in df_crop.columns]

# Ensure 'state' column exists
if "state" not in df_crop.columns:
    raise KeyError("No 'state' column found!")
# Ensures your dataset includes a state column.
# If this fails, we can’t group or join the crop data with rainfall later.

# Detect possible crop production columns dynamically
# Automatically picks out all columns containing crop names.
# Each dataset may have slightly different crop spellings or order.
# This line ensures our code adapts automatically to those changes.
crop_columns = [c for c in df_crop.columns if any(x in c for x in ["wheat", "rice", "maize", "barley", "ragi", "pulses", "millets", "chillies", "ginger", "oil"])]
print(f"Detected crop columns: {crop_columns}")

# Coverts crop cols into rows. Standardize the structure having same format for all crops.
# Easier to quesry with SQL or in DuckDB. like "Top crops by state"
df_crop_long = df_crop.melt(id_vars=["state", "district"], value_vars=crop_columns,
                            var_name="crop", value_name="production_mt")

# Clean crop names
# Make crop name readable and consistent. Remove suffix _in_metric_tonnes, _ with space. coverts Capitalizing
df_crop_long["crop"] = df_crop_long["crop"].str.replace("_in_metric_tonnes", "", regex=False)
df_crop_long["crop"] = df_crop_long["crop"].str.replace("_", " ").str.strip().str.title()

# Convert production values to numeric
df_crop_long["production_mt"] = pd.to_numeric(df_crop_long["production_mt"], errors="coerce")
# Some CSV cells might contain text or commas (e.g., "1,234" or "NA").
# This converts them safely into numbers or NaN.
# Ensures aggregation (SUM, AVG) won’t fail.

# Remove rows without production data
df_crop_long = df_crop_long.dropna(subset=["production_mt"]).reset_index(drop=True)

print("After transformation:", df_crop_long.shape)
df_crop_long.head(10)


Detected crop columns: ['wheat_(_in_metric_tonnes)', 'maize_(_in_metric_tonnes)', 'rice_(_in_metric_tonnes)', 'barley_(_in_metric_tonnes)', 'ragi_(_in_metric_tonnes)', 'pulses_(_in_metric_tonnes)', 'common_millets_(_in_metric_tonnes)', 'chillies_(_in_metric_tonnes)', 'ginger_(_in_metric_tonnes)', 'oil__seeds_(_in_metric_tonnes)']
✅ After transformation: (130, 4)


Unnamed: 0,state,district,crop,production_mt
0,Himachal Pradesh,Bilaspur,Wheat (),48096
1,Himachal Pradesh,Chamba,Wheat (),41545
2,Himachal Pradesh,Hamirpur,Wheat (),58886
3,Himachal Pradesh,Kangra,Wheat (),190519
4,Himachal Pradesh,Kinnaur,Wheat (),196
5,Himachal Pradesh,Kullu,Wheat (),21992
6,Himachal Pradesh,Lahaul-spiti,Wheat (),125
7,Himachal Pradesh,Mandi,Wheat (),117665
8,Himachal Pradesh,Shimla,Wheat (),13819
9,Himachal Pradesh,Sirmaur,Wheat (),40567


In [None]:
# Step 1 — Final cleanup of crop names and structure

df_crop_long["crop"] = (
    df_crop_long["crop"]
    .str.replace(r"\(\)", "", regex=True)
    .str.replace(r"\s+", " ", regex=True)
    .str.strip()
    .str.title()
)

# Add a dummy 'year' column to match rainfall data for merging
# (We can later update this with actual years if available)
df_crop_long["year"] = 2022  # or any consistent year range if dataset is static

# Save cleaned version
base_dir = "/content/data"
df_crop_long.to_parquet(f"{base_dir}/processed/crop_production.parquet", index=False)
# Parquet is columnar, meaning:
# Faster queries (especially for large datasets).
# Uses less storage space.
# Keeps schema and type information intact.
# It’s also natively compatible with DuckDB (used in your later cells).

print("Cleaned crop dataset ready for analysis:", df_crop_long.shape)
df_crop_long.head(10)


✅ Cleaned crop dataset ready for analysis: (130, 5)


Unnamed: 0,state,district,crop,production_mt,year
0,Himachal Pradesh,Bilaspur,Wheat,48096,2022
1,Himachal Pradesh,Chamba,Wheat,41545,2022
2,Himachal Pradesh,Hamirpur,Wheat,58886,2022
3,Himachal Pradesh,Kangra,Wheat,190519,2022
4,Himachal Pradesh,Kinnaur,Wheat,196,2022
5,Himachal Pradesh,Kullu,Wheat,21992,2022
6,Himachal Pradesh,Lahaul-spiti,Wheat,125,2022
7,Himachal Pradesh,Mandi,Wheat,117665,2022
8,Himachal Pradesh,Shimla,Wheat,13819,2022
9,Himachal Pradesh,Sirmaur,Wheat,40567,2022


13. Merge both the datasets

In [None]:
import os

!ls -R /content/data/processed


/content/data/processed:
crop_production.parquet  imd_rainfall.parquet


In [None]:
base_dir = "/content/data"
df_crop_long.to_parquet(f"{base_dir}/processed/crop_production.parquet", index=False)
print("Crop parquet saved successfully at:", f"{base_dir}/processed/crop_production.parquet")
# for verification

✅ Crop parquet saved successfully at: /content/data/processed/crop_production.parquet


Merging Two Datasets

1. **DuckDB** is an memory analytical **database engine** for Python. It lets to run powerful SQL queries directly on CSV/Parquet files without needing a seperate database server like MySQL.
2. Works seemlesly with Parquet and Pandas with large datasets from **data.gov.in**  

In [None]:
# Step 2 — Merge Rainfall + Crop Data
import duckdb

# Load both datasets into DuckDB
con = duckdb.connect(database='/content/db/samarth.duckdb', read_only=False)
# local database file
# Stores table for reuse (dont't have to relaod file each time)
# Query multiple Parquet datasets together efficiently.
con.execute("CREATE OR REPLACE TABLE rainfall AS SELECT * FROM parquet_scan('/content/data/processed/imd_rainfall.parquet');")
con.execute("CREATE OR REPLACE TABLE crop AS SELECT * FROM parquet_scan('/content/data/processed/crop_production.parquet');")
# Reads both Parquet files directly (no need to load into Pandas first).It’s optimized for columnar access, so SQL aggregations (like AVG) are lightning-fast.
# Creates DuckDB tables rainfall and crop.

# Example Query: Compare crop production vs rainfall
query = """
SELECT
    c.state,
    c.crop,
    AVG(c.production_mt) AS avg_production_mt,
    AVG(r.annual) AS avg_rainfall_mm
FROM crop c
JOIN rainfall r
ON LOWER(c.state) = LOWER(r.state)
   AND c.year = r.year
GROUP BY c.state, c.crop
ORDER BY avg_production_mt DESC
LIMIT 10;
"""

merged_results = con.execute(query).df()
print("Combined Rainfall + Crop Data:")
merged_results


✅ Combined Rainfall + Crop Data:


Unnamed: 0,state,crop,avg_production_mt,avg_rainfall_mm


14. Building **Interactive Q&A prototype**:
traceability and citation system for your Project Samarth prototype.
It ensures that every answer your model gives can cite the exact dataset source, fulfilling one of the core evaluation criteria:

In [None]:
import pandas as pd, json, time
from IPython.display import display, Markdown
# pandas → for reading and managing the dataset catalog.
# json → for any future structured reading (metadata files).
# time → could be used later for timestamping operations.
# IPython.display → allows you to print clean Markdown output in Colab (e.g., formatted titles).

# Loads your dataset catalog (metadata for all data files used)
catalog_path = "/content/data/data_catalog.csv"
catalog = pd.read_csv(catalog_path)
display(catalog)

# Finds dataset info when a user’s question or query mentions it
def find_source_by_name(keyword):
    """Find catalog rows that match a keyword (case-insensitive)."""
    mask = catalog.apply(lambda row: keyword.lower() in str(row['title']).lower() or keyword.lower() in str(row['source_url']).lower(), axis=1)
    return catalog[mask]

# Returns a proper citation (title + official data.gov.in link)
# Calls the previous function to find the dataset, returns a clean, short citation (title + link).
# If nothing is found, defaults to “data.gov.in (dataset not found…)”.
def cite_dataset_by_keyword(keyword):
    """Return a short citation (title + source_url) for a dataset matching keyword."""
    df = find_source_by_name(keyword)
    if not df.empty:
        r = df.iloc[0]
        return f"{r['title']} — {r['source_url']}"
    return "data.gov.in (dataset not found in local catalog)"


Unnamed: 0,title,resource_id,source_url,local_path,checksum,downloaded_at
0,Rainfall Dataset (downloaded),rainfall_resource,https://www.data.gov.in/files/ogdpv2dms/s3fs-p...,/content/data/raw/rainfall.csv,9d4829133f031c90257e5be409c7fd42,2025-10-22
1,Rainfall Dataset (manual),rainfall_manual,manual_upload,/content/data/raw/rainfall.csv,9d4829133f031c90257e5be409c7fd42,2025-10-22
2,Crop Dataset (downloaded),crop_resource,https://www.data.gov.in/files/ogdpv2dms/s3fs-p...,/content/data/raw/crop_data.csv,9675a5c8205f494ba23474562286a7ba,2025-10-22
3,Crop Dataset (manual),crop_manual,manual_upload,/content/data/raw/crop_data.csv,9675a5c8205f494ba23474562286a7ba,2025-10-22


15. **Expanded Query Planner**: Brain of Project Samarth prototype - Natural Language Understanding (NLU layer). Converts plain English questions into SQL queries that run throughs DuckDB databse(rainfall + crop data).



**Execution Steps:**
1. Takes a user question in natural language (like “Top 5 crops in Himachal Pradesh”).

2. Detects the type of question (intent).

3. Extracts keywords (state, crop, years).

4. Builds dynamic SQL queries to fetch the correct answer.

5. Returns a dictionary with everything needed to execute those queries.

In [None]:
import re

def samarth_plan(question: str):
    q = question.strip().lower() # lowercasing for easier regex matching

    # 1) Compare avg annual rainfall in two states for last N years
    m = re.search(r"compare.*rainfall.*in\s+([\w\s]+)\s+and\s+([\w\s]+)\s+for\s+last\s+(\d+)\s+years", q)
    if m:
        s1, s2, n = m.groups() # extracts state1, state2 and N(number of years)
        n = int(n)
        # compute year range from available data: use MAX(year) from rainfall
        max_year = con.execute("SELECT MAX(year) FROM rainfall").fetchone()[0]
        min_year = max_year - n + 1
        sql_rain = f"""
        SELECT lower(state) as state, AVG(annual) AS avg_rain, MIN(year) as from_year, MAX(year) as to_year
        FROM rainfall
        WHERE lower(state) IN ('{s1.strip()}', '{s2.strip()}') AND year BETWEEN {min_year} AND {max_year}
        GROUP BY lower(state);
        """

        # top M crops of Crop_Type_C (by volume) in each state: try to find crop type in question
        m2 = re.search(r"top\s+(\d+)\s+most produced crops of\s+([\w\s]+)", question.lower())
        if m2:
            M, crop_type = m2.groups()
            M = int(M)
            sql_crop = f"""
            SELECT lower(state) as state, crop, SUM(production_mt) as total_prod
            FROM crop
            WHERE lower(crop) LIKE '%{crop_type.strip()}%' AND year BETWEEN {min_year} AND {max_year}
            GROUP BY lower(state), crop
            ORDER BY lower(state), total_prod DESC;
            """
        else:
            # if no crop_type specified, top M crops overall
            m3 = re.search(r"top\s+(\d+)\s+most produced crops", question.lower())
            if m3:
                M = int(m3.group(1))
                sql_crop = f"""
                SELECT lower(state) as state, crop, SUM(production_mt) as total_prod
                FROM crop
                WHERE lower(state) IN ('{s1.strip()}', '{s2.strip()}') AND year BETWEEN {min_year} AND {max_year}
                GROUP BY lower(state), crop
                ORDER BY lower(state), total_prod DESC;
                """
            else:
                sql_crop = None
        return {"intent":"compare_rain_and_crops","sql_rain": sql_rain, "sql_crop": sql_crop, "states":[s1.strip(),s2.strip()], "years":(min_year,max_year), "M": M if 'M' in locals() else None}

    # 2) District with highest production of Crop_Z in State_X (most recent year)
    m = re.search(r"district.*highest.*production.*of\s+([\w\s]+).*in\s+([\w\s]+)", q)
    if m:
        crop_z, state_x = m.groups()
        # get latest year from crop table for that crop & state
        latest = con.execute(f"SELECT MAX(year) FROM crop WHERE lower(crop) LIKE '%{crop_z.strip()}%' AND lower(state) LIKE '%{state_x.strip()}%'").fetchone()[0]
        sql = f"""
        SELECT district, SUM(production_mt) as prod
        FROM crop
        WHERE lower(crop) LIKE '%{crop_z.strip()}%' AND lower(state) LIKE '%{state_x.strip()}%' AND year = {latest}
        GROUP BY district
        ORDER BY prod DESC
        LIMIT 1;
        """
        # similarly min
        sql_min = f"""
        SELECT district, SUM(production_mt) as prod
        FROM crop
        WHERE lower(crop) LIKE '%{crop_z.strip()}%' AND lower(state) LIKE '%{state_x.strip()}%' AND year = {latest}
        GROUP BY district
        ORDER BY prod ASC
        LIMIT 1;
        """
        return {"intent":"district_diff","crop":crop_z.strip(),"state":state_x.strip(),"year":latest,"sql_max":sql, "sql_min": sql_min}

    # 3) Trend of crop over last N years in region
    # sql_prod: yearly crop production totals.
    # sql_rain: yearly average rainfall for the same region and years.
    m = re.search(r"trend.*of\s+([\w\s]+).*last\s+(\d+)\s+years.*in\s+([\w\s]+)", q)
    if m:
        crop, n, region = m.groups()
        n = int(n)
        max_year = con.execute("SELECT MAX(year) FROM crop").fetchone()[0]
        min_year = max_year - n + 1
        sql = f"""
        SELECT year, SUM(production_mt) as total_prod
        FROM crop
        WHERE lower(crop) LIKE '%{crop.strip()}%' AND lower(state) LIKE '%{region.strip()}%' AND year BETWEEN {min_year} AND {max_year}
        GROUP BY year
        ORDER BY year;
        """
        # also get rainfall for same years
        sql_rain = f"""
        SELECT year, AVG(annual) as avg_rain
        FROM rainfall
        WHERE lower(state) LIKE '%{region.strip()}%' AND year BETWEEN {min_year} AND {max_year}
        GROUP BY year
        ORDER BY year;
        """
        return {"intent":"trend_and_corr","sql_prod":sql,"sql_rain":sql_rain,"crop":crop.strip(),"region":region.strip(),"years":(min_year,max_year)}

    # 4) Simple template: top N crops in a state
    m = re.search(r"top\s+(\d+)\s+crops\s+in\s+([\w\s]+)", q)
    if m:
        n, state = m.groups(); n=int(n)
        sql = f"""
        SELECT crop, SUM(production_mt) as total_prod
        FROM crop
        WHERE lower(state) LIKE '%{state.strip()}%'
        GROUP BY crop
        ORDER BY total_prod DESC
        LIMIT {n};
        """
        return {"intent":"top_crops_state","sql":sql,"state":state.strip(),"n":n}

    return {"intent":"unknown"} # if no regex matches - the system couldn't interpret the question


16. **Cell C — Executor + Synthesizer(run SQL, format answer, attach citations):** it tkaes the plan generated by your earlier samarth_plan() function, executes the SQL queries on DuckDB, and produces human-readable answers with proper dataset citations.

In [None]:
def execute_and_synthesize(plan):
    intent = plan.get("intent") # detected question type
    if intent == "unknown":
        return "Sorry — I couldn't interpret the question. Try a simpler phrasing.", None

    # Executes both rainfall and crop SQL queries
    if intent == "compare_rain_and_crops":
        # run rainfall SQL
        df_rain = con.execute(plan['sql_rain']).df()
        # run crops SQL (if present)
        df_crop = con.execute(plan['sql_crop']).df() if plan.get('sql_crop') else None

        # build answer text
        lines = []
        lines.append(f"Comparison for years {plan['years'][0]} to {plan['years'][1]}:\n")
        for _, row in df_rain.iterrows():
            lines.append(f"- {row['state'].title()}: avg annual rainfall = {round(row['avg_rain'],1)} mm (years {int(row['from_year'])}-{int(row['to_year'])}).")
        # include crops summary if present
        if df_crop is not None and not df_crop.empty:
            lines.append("\nTop crops (per state):")
            # group top M by state
            grouped = df_crop.groupby('state')
            for st, g in grouped:
                top = g.sort_values('total_prod', ascending=False).head(plan.get('M') or 5)
                items = "; ".join([f"{r['crop'].title()} ({int(r['total_prod'])} t)" for _, r in top.iterrows()])
                lines.append(f"  • {st.title()}: {items}")

        # citations
        # These automatically fetch dataset metadata from your catalog — ensuring that every output cites its official source,
        # fulfilling core values: accuracy and traceability.
        cit_rain = cite_dataset_by_keyword("rain")
        cit_crop = cite_dataset_by_keyword("crop")
        lines.append("\nSources:")
        lines.append(f"- Rainfall: {cit_rain}")
        lines.append(f"- Crop: {cit_crop}")
        return "\n".join(lines), (df_rain, df_crop)

# Runs two queries — one for the highest district, one for the lowest.
# Retrieves both results and prints a concise summary.
    if intent == "district_diff":
        df_max = con.execute(plan['sql_max']).df()
        df_min = con.execute(plan['sql_min']).df()
        cit_crop = cite_dataset_by_keyword("crop")
        text = f"In {plan['state'].title()} for crop '{plan['crop'].title()}' in {plan['year']}: \n"
        if not df_max.empty:
            text += f"Highest: {df_max.iloc[0]['district']} — {int(df_max.iloc[0]['prod'])} t.\n"
        if not df_min.empty:
            text += f"Lowest: {df_min.iloc[0]['district']} — {int(df_min.iloc[0]['prod'])} t.\n"
        text += f"\nSource: {cit_crop}"
        return text, (df_max, df_min)

# Runs two queries: crop production trend + rainfall trend for a region.
# Merges both on year.
# Calculates the correlation between production and rainfall.
# Returns a summary with correlation value.
    if intent == "trend_and_corr":
        df_prod = con.execute(plan['sql_prod']).df()
        df_rain = con.execute(plan['sql_rain']).df()
        # simple correlation
        merged = df_prod.merge(df_rain, on='year', how='inner')
        corr = None
        if not merged.empty:
            corr = merged['total_prod'].corr(merged['avg_rain'])
        text = f"Trend for {plan['crop'].title()} in {plan['region'].title()} from {plan['years'][0]} to {plan['years'][1]}:\n"
        text += f"- {len(df_prod)} yearly points returned. Correlation between production and rainfall: {round(corr,2) if corr is not None else 'N/A'}.\n"
        text += "\nSource:\n- " + cite_dataset_by_keyword("crop") + "\n- " + cite_dataset_by_keyword("rain")
        return text, (df_prod, df_rain, merged)

    # Runs the top crop SQL query and prints a ranked list
    # simplest intent but gives data summarization, ranking, etc.
    if intent == "top_crops_state":
        df = con.execute(plan['sql']).df()
        lines = [f"Top {plan['n']} crops in {plan['state'].title()}:"]
        for _, r in df.iterrows():
            lines.append(f"- {r['crop'].title()}: {int(r['total_prod'])} t")
        lines.append("\nSource: " + cite_dataset_by_keyword("crop"))
        return "\n".join(lines), df

    return "Unhandled intent", None


17. **Cell D — Interactive loop for Colab (type questions):** That’s the final interactive Q&A loop — the user-facing part of your Project Samarth.
This cell transforms everything you built before into a fully functioning conversational data assistant.

1. Parse your question (samarth_plan)
2. Generate relevant SQL queries
3. Execute them on your DuckDB datasets
4. Format and display the results with proper citations
5. Log everything for traceability

In [None]:
print("Project Samarth — interactive Q&A (type 'exit' to stop)\n")
while True:
    q = input("Ask a question: ").strip() # user question
    if q.lower() in ("exit","quit"): # Runs infinitely until you type "exit"
        print("Exiting interactive session.")
        break
    plan = samarth_plan(q) # Understands your question (intent detection + SQL generation)
    ans, data = execute_and_synthesize(plan) # Executes the SQL, merges data, and formats the final readable answer.

    # Display answer Nicely
    display(Markdown("**Answer:**")) # Uses Markdown formatting for clean, bold, and readable answers inside Google Colab.
    display(Markdown(ans if isinstance(ans, str) else str(ans))) # shows bullets, tables
    # show first few rows of dataframes if any
    if data is not None:
        if isinstance(data, tuple):
            for d in data:
                if d is None: continue
                if hasattr(d, "head"):
                    display(d.head(5)) # If your query generated any data tables, this code will show the first 5 rows.
                    # Handles both: Multiple datasets (tuple of DataFrames like rainfall + crop)
                    # Single dataset (just one DataFrame)
        else:
            display(data.head(5))

    # log the query with timestamp and plan for provenance
    # Audit trail — every user question, its SQL plan, and the generated answer are saved.

    log = {
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
        "question": q,
        "plan": plan,
        "answer_snippet": ans if isinstance(ans,str) else str(ans)[:300]
    }
    with open("/content/data/query_logs.jsonl","a") as f: # Stores JSON Lines formats satisfying "provenance and accountability"
        f.write(json.dumps(log) + "\n")
    print("-" * 60) # prints line between answers for better reading.


Project Samarth — interactive Q&A (type 'exit' to stop)

Ask a question: Show trend of rice over last 5 years in Maharashtra


**Answer:**

Trend for Rice Over in Maharashtra from 2018 to 2022:
- 0 yearly points returned. Correlation between production and rainfall: N/A.

Source:
- Crop Dataset (downloaded) — https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Prodution-Under-Different-Crops_during-2019-20.csv
- Rainfall Dataset (downloaded) — https://www.data.gov.in/files/ogdpv2dms/s3fs-public/datafile/Sub_Division_IMD_2017.csv

Unnamed: 0,year,total_prod


Unnamed: 0,year,avg_rain


Unnamed: 0,year,total_prod,avg_rain


------------------------------------------------------------
Ask a question: Top 5 crops in Maharashtra


**Answer:**

Top 5 crops in Maharashtra:

Source: Crop Dataset (downloaded) — https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Prodution-Under-Different-Crops_during-2019-20.csv

Unnamed: 0,crop,total_prod


------------------------------------------------------------
Ask a question: Top 5 crops in Himachal Pradesh


**Answer:**

Top 5 crops in Himachal Pradesh:
- Maize: 1489206 t
- Wheat: 1239378 t
- Rice: 233758 t
- Pulses: 110456 t
- Barley: 70826 t

Source: Crop Dataset (downloaded) — https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Prodution-Under-Different-Crops_during-2019-20.csv

Unnamed: 0,crop,total_prod
0,Maize,1489206.0
1,Wheat,1239378.0
2,Rice,233758.0
3,Pulses,110456.0
4,Barley,70826.0


------------------------------------------------------------


KeyboardInterrupt: Interrupted by user