# **Project Samarth**
**Project Samarth** is an AI-powered data integration and Q&A prototype built over real datasets from data.gov.in
. It connects agricultural production and climate (rainfall) data using DuckDB and Parquet pipelines, enabling natural-language queries about India’s agricultural economy. The system automatically interprets questions, generates SQL, executes them in real-time, and provides accurate, cited insights for evidence-based policymaking.

**Architecture** : download → clean → store as Parquet → load into DuckDB → natural-language planner → SQL execution → interactive answers with citations.

###1. **Environment and Base Directory Setup**

1. **Environment Setup** for clean and reproducible workspace for data workflow. Installing specific lib for reproducing the result later on same or diff machine and organizing the raw vs processed data reducing the risk of overwriting.
2. **Base Directory** for centralized place of data storage. Single root for raw, intermediate results, processed outputs. Easy cleanup and miantainance.



In [10]:
# Cell 1 — Environment setup
import os, requests
# os: for interacting with operating system
# requests: for making HTTP requests (data downloads)

# Define base directory
base = "/content/data"
os.makedirs(f"{base}/raw", exist_ok=True)
os.makedirs(f"{base}/processed", exist_ok=True)
os.makedirs("/content/db", exist_ok=True)

# Install libraries
!pip install -q duckdb pandas pyarrow requests seaborn plotly
# duckdb (in-process SQL analytics)
# pandas (data manipulation)
# pyarrow (Apache Arrow, often used with Parquet)
# requests (already imported, for downloads)
# seaborn (visualization)
# plotly (interactive plots)

print("Environment ready. Base dir:", base)


Environment ready. Base dir: /content/data


**Optional (For understanding)**

Stores **direct CSV links** from official source URL's for Rainfall and Crop datasets
or left **blank** for manual upload.


*   **Global constants** in pipeline from where we fetch the data.



In [11]:
# Dataset URLs
RAINFALL_CSV_URL = "https://www.data.gov.in/files/ogdpv2dms/s3fs-public/datafile/Sub_Division_IMD_2017.csv"
CROP_CSV_URL     = "https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Prodution-Under-Different-Crops_during-2019-20.csv"
# TEMP_CSV_URL     = "https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Seasonal_and_Annual_Mean_Temp_Series_India_1901-2017.csv"
# SOIL_CSV_URL     = "https://www.data.gov.in/files/ogdpv2dms/s3fs-public/State_UT_wise_Soil_Health_Cards_Issued_to_Farmers_2021.csv"

print("Rainfall:", bool(RAINFALL_CSV_URL.strip()))
print("Crop:", bool(CROP_CSV_URL.strip()))
# print("Temperature:", bool(TEMP_CSV_URL.strip()))
# print("Soil:", bool(SOIL_CSV_URL.strip()))


Rainfall: True
Crop: True


**Static Method** used when the dataset link is known. It doesn't record dataset details(metadata, checksum, etc.)


*   Directly downloads the dataset immediately one-time. Useful for **verification** and **fallback**.



In [12]:
# Downloading Agricultural Crop Dataset
agri_url = "https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Prodution-Under-Different-Crops_during-2019-20.csv"
agri_path = f"{base}/raw/agri_production.csv" # path of downloaded file storage

r = requests.get(agri_url) # Sends HTTP GET request to fetch the CSV content from URL.
open(agri_path, "wb").write(r.content) # writes raw bytes (write binary) to specific path
print("Downloaded Agriculture Production CSV ")


Downloaded Agriculture Production CSV 


2. Robust **downloader** + **catalog** helper to track what has been downloaded where it is stored and how to verify integrity.


*   Central data catalog to keep record od every datasets used.
*   Automatically download any datasets( rainfall, crop, etc.) and generating a checksum(digital fingerprint) to detect any future tampering.



In [13]:
# Downloader + catalog append helper
import requests, hashlib, time, csv, os

#defining catalog path
catalog_path = f"{base}/data_catalog.csv"
if not os.path.exists(catalog_path): # if doesn't exist then create it
    with open(catalog_path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["title","resource_id","source_url","local_path","checksum","downloaded_at"])

# define downloader helper function
# to download a file from the given url to the local path with a timeout
# Compute MD5 checksum to verify that the file is not corrupted or modified.
def download_to(path, url, timeout=180):
    print("Downloading:", url)
    r = requests.get(url, allow_redirects=True, timeout=timeout)
    r.raise_for_status()
    open(path, "wb").write(r.content)
    chksum = hashlib.md5(r.content).hexdigest() # for detecting tampering or changes in source data for better integrity
    print("Saved", path, "size:", len(r.content), "bytes checksum:", chksum)
    return chksum

# creating final catalog with all necessary information about the datasets
# every dataset we will download will appear as a new line with all its metadata.
def append_catalog(title, resource_id, source_url, local_path, checksum):
    with open(catalog_path, "a", newline="") as f:
        writer = csv.writer(f)
        writer.writerow([title, resource_id, source_url, local_path, checksum, time.strftime("%Y-%m-%d")])


3. Download or Upload **rainfall CSV** manually, then log it in a catalog with its integrity checksum.

In [14]:
# Get rainfall CSV (download or manual upload)

# Defining local path where rainfall CSV will be saved locally
rain_raw = f"{base}/raw/rainfall.csv"

if RAINFALL_CSV_URL.strip():
    try:
        chksum = download_to(rain_raw, RAINFALL_CSV_URL) # download_to is a helper that downloads the file to rain_raw and returns its checksum.
        append_catalog("Rainfall Dataset (downloaded)", "rainfall_resource", RAINFALL_CSV_URL, rain_raw, chksum)
    except Exception as e: # if download fails print error and ask for manual uploading of dataset
        print("Download failed:", e)
        print("Please use manual upload cell below.")
else:
    print("No rainfall URL provided. Please upload the file using the cell below.")


Downloading: https://www.data.gov.in/files/ogdpv2dms/s3fs-public/datafile/Sub_Division_IMD_2017.csv
Download failed: 403 Client Error: Forbidden for url: https://www.data.gov.in/files/ogdpv2dms/s3fs-public/datafile/Sub_Division_IMD_2017.csv
Please use manual upload cell below.


4. Download or upload **crop CSV** then log it in catalog with its integrity checksum.

In [15]:
# Get crop CSV (download or manual upload)
crop_raw = f"{base}/raw/crop_data.csv"

if CROP_CSV_URL.strip():
    try:
        chksum = download_to(crop_raw, CROP_CSV_URL)
        append_catalog("Crop Dataset (downloaded)", "crop_resource", CROP_CSV_URL, crop_raw, chksum)
    except Exception as e:
        print("Download failed:", e)
        print("Please use manual upload cell below.")
else:
    print("No crop URL provided. Please upload the file using the cell below.")


Downloading: https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Prodution-Under-Different-Crops_during-2019-20.csv
Download failed: 403 Client Error: Forbidden for url: https://www.data.gov.in/files/ogdpv2dms/s3fs-public/Prodution-Under-Different-Crops_during-2019-20.csv
Please use manual upload cell below.


5. Download or upload **Temp dataset** then log it in catalog with integrity checksum.

In [16]:
# # Download temperature dataset
# temp_raw = f"{base}/raw/temp.csv"
# if TEMP_CSV_URL.strip():
#     try:
#         chksum = download_to(temp_raw, TEMP_CSV_URL)
#         append_catalog("Temperature Dataset", "temp_resource", TEMP_CSV_URL, temp_raw, chksum)
#     except Exception as e:
#         print("Temperature download failed:", e)

6. Download or upload **soil dataset** then log it in catalog integrity checksum.

In [17]:
# # Download soil dataset
# soil_raw = f"{base}/raw/soil.csv"
# if SOIL_CSV_URL.strip():
#     try:
#         chksum = download_to(soil_raw, SOIL_CSV_URL)
#         append_catalog("Soil Dataset", "soil_resource", SOIL_CSV_URL, soil_raw, chksum)
#     except Exception as e:
#         print("Soil download failed:", e)

5. **Manually upload fallback (rainfall)**
 Data.gov.in datasets are hosted dynamically — links often expire, change names, or redirect through JavaScript

So, the automated download code may fail if the link is no longer accessible
through direct HTTP requests.

Thus keeping why we keep a manual upload cell as a backup option — so the system still works even if the live API or file link stops responding.

In [18]:
# Cell 5 — Manual upload fallback for rainfall (run only if you did not download)
from google.colab import files
uploaded = files.upload()  # select your rainfall csv file
for name in uploaded:
    src = f"/content/{name}"
    dst = f"{base}/raw/rainfall.csv"
    os.rename(src, dst)
    import hashlib
    chksum = hashlib.md5(open(dst,'rb').read()).hexdigest()
    append_catalog("Rainfall Dataset (manual)", "rainfall_manual", "manual_upload", dst, chksum)
    print("Uploaded rainfall to", dst)


Saving Sub_Division_IMD_2017 (2).csv to Sub_Division_IMD_2017 (2).csv
Uploaded rainfall to /content/data/raw/rainfall.csv


7. Manually upload fallback (crop)

In [19]:
# Cell 7 — Manual upload fallback for crop (if needed)
from google.colab import files
uploaded = files.upload()
for name in uploaded:
    src = f"/content/{name}" # src is the temp colab path
    dst = f"{base}/raw/crop_data.csv" # dst is the target path
    os.rename(src, dst) # moves the file to the desired location
    import hashlib
    chksum = hashlib.md5(open(dst,'rb').read()).hexdigest()
    # checksum is used to detect that the file has been altered or corrupted. to ensure that working with the same directory.
    append_catalog("Crop Dataset (manual)", "crop_manual", "manual_upload", dst, chksum)
    print("Uploaded crop file to", dst)


Saving Prodution-Under-Different-Crops_during-2019-20 (1).csv to Prodution-Under-Different-Crops_during-2019-20 (1).csv
Uploaded crop file to /content/data/raw/crop_data.csv


Manual upload fallback (soil)

In [None]:
# # Cell 7 — Manual upload fallback for soil (if needed)
# from google.colab import files
# uploaded = files.upload()
# for name in uploaded:
#     src = f"/content/{name}" # src is the temp colab path
#     dst = f"{base}/raw/soil_data.csv" # dst is the target path
#     os.rename(src, dst) # moves the file to the desired location
#     import hashlib
#     chksum = hashlib.md5(open(dst,'rb').read()).hexdigest()
#     # checksum is used to detect that the file has been altered or corrupted. to ensure that working with the same directory.
#     append_catalog("Soil Dataset (manual)", "soil_manual", "manual_upload", dst, chksum)
#     print("Uploaded soil file to", dst)


Manual Upload fallback (Temp)

In [None]:
# # Cell 7 — Manual upload fallback for temp (if needed)
# from google.colab import files
# uploaded = files.upload()
# for name in uploaded:
#     src = f"/content/{name}" # src is the temp colab path
#     dst = f"{base}/raw/temp_data.csv" # dst is the target path
#     os.rename(src, dst) # moves the file to the desired location
#     import hashlib
#     chksum = hashlib.md5(open(dst,'rb').read()).hexdigest()
#     # checksum is used to detect that the file has been altered or corrupted. to ensure that working with the same directory.
#     append_catalog("Temp Dataset (manual)", "temp_manual", "manual_upload", dst, chksum)
#     print("Uploaded temp file to", dst)


8. **Quick peek**: The files downloaded or uploaded in the previous steps are actually CSV files.They are not HTML error pages, redirects, or corrupted text.

In [21]:
# Cell 8 — Peek the first 12 lines to ensure files are CSV (not HTML)
def peek(path):
    print("---- peek", path) # print header showing which file is being inspected
    if not os.path.exists(path): # if does not exist print not found
        print("File not found:", path); return
    with open(path, 'r', encoding='utf-8', errors='ignore') as f: # open with utf-8 encoding
        for i in range(12):
            line = f.readline()
            if not line:
                break
            print(i+1, ":", line.strip()[:300]) # printing each line with upto 300 characters
peek(rain_raw) # printing initial content of rainfall and crop dataset for quick parsing
# peek("/content/data/raw/temp_data.csv")
peek(crop_raw)
# peek("/content/data/raw/soil_data.csv")




---- peek /content/data/raw/rainfall.csv
1 : SUBDIVISION,YEAR,JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC,ANNUAL,JF,MAM,JJAS,OND
2 : Andaman & Nicobar Islands,1901,49.2,87.1,29.2,2.3,528.8,517.5,365.1,481.1,332.6,388.5,558.2,33.6,3373.2,136.3,560.3,1696.3,980.3
3 : Andaman & Nicobar Islands,1902,0,159.8,12.2,0,446.1,537.1,228.9,753.7,666.2,197.2,359,160.5,3520.7,159.8,458.3,2185.9,716.7
4 : Andaman & Nicobar Islands,1903,12.7,144,0,1,235.1,479.9,728.4,326.7,339,181.2,284.4,225,2957.4,156.7,236.1,1874,690.6
5 : Andaman & Nicobar Islands,1904,9.4,14.7,0,202.4,304.5,495.1,502,160.1,820.4,222.2,308.7,40.1,3079.6,24.1,506.9,1977.6,571
6 : Andaman & Nicobar Islands,1905,1.3,0,3.3,26.9,279.5,628.7,368.7,330.5,297,260.7,25.4,344.7,2566.7,1.3,309.7,1624.9,630.8
7 : Andaman & Nicobar Islands,1906,36.6,0,0,0,556.1,733.3,247.7,320.5,164.3,267.8,128.9,79.2,2534.4,36.6,556.1,1465.8,475.9
8 : Andaman & Nicobar Islands,1907,110.7,0,113.3,21.6,616.3,305.2,443.9,377.6,200.4,264.4,648.9,245.6,3347.9,

9. **Robust CSV reader**:


*   Custom Python fuction (robust_read_csv) that can successfully read messy, irregular, or corrupted government datasets from data.gov.in.
*   Gov datasets are often inconsistent ( commas, semicolons, etc), contains metadata rows, extra headers, or change data overtime.




In [23]:
# Cell 9 — Robust CSV parsing function

# Method 1 : (normal Pandas)
import pandas as pd

def robust_read_csv(path, max_skip=6):
    # Try default
    try:
        df = pd.read_csv(path, low_memory=False)
        print("Default read OK:", df.shape)
        return df
    except Exception as e:
        print("Default read failed:", e)

  # Method 2 : Python Engine "Sniffer". Pandas built in delimiter detection("sniffer")
  # Automatically detects some delimeters.
    try:
        df = pd.read_csv(path, engine='python', sep=None, low_memory=False)
        print("Sniffer read OK:", df.shape)
        return df
    except Exception as e:
        print("Sniffer failed:", e)

  # Method 3 : Manual Seperator Search (brute-forces)
  # Try common separators & skiprows
    seps = [',',';','\t','|']
    for skip in range(0, max_skip):
        for sep in seps:
            try:
                df = pd.read_csv(path, sep=sep, skiprows=skip, engine='python', low_memory=False)
                if df.shape[1] >= 2 and df.shape[0] > 0:
                    print(f"Success read with sep='{sep}', skiprows={skip}", df.shape)
                    return df
            except Exception:
                pass
    raise ValueError("Could not parse CSV. Inspect the file manually with peek().") # if none succeed , gives clear indication

# Read both rainfall and crop CSVs safely using robust function
df_rain = robust_read_csv(rain_raw)
df_crop = robust_read_csv(crop_raw)
# df_soil = robust_read_csv("/content/data/raw/soil_data.csv")
# df_temp = robust_read_csv("/content/data/raw/temp_data.csv")
print("Rain shape:", df_rain.shape, "Crop shape:", df_crop.shape)

# The function aims to provide a single, reusable way to parse CSVs that might not conform to a single standard.
# By trying multiple approaches, we can compare results, detect inconsistencies, and choose the most reliable interpretation.


Default read OK: (4188, 19)
Default read OK: (13, 13)
Rain shape: (4188, 19) Crop shape: (13, 13)


10. **Normalize** the columns.To make the column name consistent and predictable in both datasets before cleaning, merging and querying.


*   Col names: "State/UT", "Year ", "Annual Rainfall (mm)"
"District ", "Crop Name", "Wheat (in Metric Tonnes)"
*   These are incosistent, contain spaces, capital letters, and special characters.


*   Huge prob while merging tables(ON state), Query with SQL in DuckDB, etc.





In [25]:
# Cell 10 — Normalize column names for both dataframes for better consistency, compatibility, predictability
# Make column names consistent and filesystem-friendly by converting them to lowercase, replacing spaces and certain characters with underscores, and trimming whitespace.
def normalize_cols(df):
    df = df.copy()
    df.columns = [str(c).strip().lower().replace(" ", "_").replace(".", "").replace("/","_") for c in df.columns]
    return df

df_rain = normalize_cols(df_rain)
df_crop = normalize_cols(df_crop)
# df_temp = normalize_cols(df_temp)
# df_soil = normalize_cols(df_soil)
print("Rain cols:", df_rain.columns.tolist()[:30]) # print first 30 col names for Rain and Crop to verify the transformation
print("Crop cols:", df_crop.columns.tolist()[:30])
# print("Temp cols:", df_temp.columns.tolist()[:30])
# print("Soil cols:", df_soil.columns.tolist()[:30])


Rain cols: ['subdivision', 'year', 'jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec', 'annual', 'jf', 'mam', 'jjas', 'ond']
Crop cols: ['state', 'district', 'wheat_(_in_metric_tonnes)', 'maize_(_in_metric_tonnes)', 'rice_(_in_metric_tonnes)', 'barley_(_in_metric_tonnes)', 'ragi_(_in_metric_tonnes)', 'pulses_(_in_metric_tonnes)', 'common_millets_(_in_metric_tonnes)', 'total_food_grains_(_in_metric_tonnes)', 'chillies_(_in_metric_tonnes)', 'ginger_(_in_metric_tonnes)', 'oil__seeds_(_in_metric_tonnes)']


11. **Semantic Readiness**: Map/rename common rainfall columns and cleaning. Detect column representing administrative area (state, subdivision, ut, division) → rename to state. Detect rainfall column (rainfall or precip) → coerce to numeric rainfall_mm; extract 4-digit year from messy year text. Drop rows with no state.

In [26]:
# Cell 11 — Rainfall rename & cleaning (handles missing 'state'):
# This cell normalizes column names, detects which column is the state/subdivision, finds
# the rainfall and year columns, converts them to numeric, renames them to standard names (state, rainfall_mm, year), removes rows without a state, and prints a quick status. These steps make the IMD CSV compatible for joining and analysis.

import pandas as pd

# Normalize column names again for safety
df_rain.columns = [str(c).strip().lower().replace(" ", "_").replace("-", "_").replace("/", "_") for c in df_rain.columns]
# Government CSVs use inconsistent column names like State/UT, Year, Annual Rainfall (mm). Normalization creates predictable field names so later code can match them reliably (e.g., state, annual, rainfall_mm).

# Identify possible column candidates for 'state'
# creating the list of col names that look like they could represent a state or administrative division, based on keywords
possible_state_cols = [c for c in df_rain.columns if any(x in c for x in ["state", "ut", "subdiv", "division"])]
# checking for multiple keywords because the datasets might use subdivision instead of state

# If we found col, then pick the first one and rename it to "state". If not found, print available cols.
if possible_state_cols:
    state_col = possible_state_cols[0]
    print(f"Found '{state_col}' column for state mapping.")
    df_rain = df_rain.rename(columns={state_col: "state"})
else:
    print("No state/subdivision column found. Showing available columns:")
    print(df_rain.columns.tolist())

# Identify rainfall and year columns
# Looks for columns that likely contain rainfall values by searching for keywords rainfall or precip.
possible_rain_cols = [c for c in df_rain.columns if "rainfall" in c or "precip" in c]
rain_col = possible_rain_cols[0] if possible_rain_cols else None
# IMD files may have columns called annual, annual_rainfall_(mm), rainfall_mm, or similar. This picks the first matching column if any.

# If a rainfall-like column exists:
# Convert its values to numeric (strip commas first), coercing invalid entries to NaN.
# Rename the column to rainfall_mm.
if rain_col:
    df_rain[rain_col] = pd.to_numeric(df_rain[rain_col].astype(str).str.replace(",",""), errors='coerce')
    df_rain = df_rain.rename(columns={rain_col: "rainfall_mm"})
else:
    print("No rainfall column found!")


# Sometimes year cells may contain extra text like Year: 2017 or 2017-2018; str.extract(r"(\d{4})") isolates the actual year number. Converting to numeric makes it usable for time-based queries (min, max, BETWEEN).
if "year" in df_rain.columns:
    df_rain["year"] = pd.to_numeric(df_rain["year"].astype(str).str.extract(r"(\d{4})")[0], errors="coerce")
else:
    print("No year column found!")

# Drop missing state rows if state now exists
if "state" in df_rain.columns:
    df_rain = df_rain.dropna(subset=["state"]).reset_index(drop=True)
    # Removes rows where the state is missing, then resets the index.
    # Rows without a state cannot be joined or used for regional analysis, so they’re removed to avoid noise or join failures.

print(" After rainfall cleaning:", df_rain.shape)
df_rain.head(3)


Found 'subdivision' column for state mapping.
No rainfall column found!
 After rainfall cleaning: (4188, 19)


Unnamed: 0,state,year,jan,feb,mar,apr,may,jun,jul,aug,sep,oct,nov,dec,annual,jf,mam,jjas,ond
0,Andaman & Nicobar Islands,1901,49.2,87.1,29.2,2.3,528.8,517.5,365.1,481.1,332.6,388.5,558.2,33.6,3373.2,136.3,560.3,1696.3,980.3
1,Andaman & Nicobar Islands,1902,0.0,159.8,12.2,0.0,446.1,537.1,228.9,753.7,666.2,197.2,359.0,160.5,3520.7,159.8,458.3,2185.9,716.7
2,Andaman & Nicobar Islands,1903,12.7,144.0,0.0,1.0,235.1,479.9,728.4,326.7,339.0,181.2,284.4,225.0,2957.4,156.7,236.1,1874.0,690.6


In [31]:
# Cell 12 (Final) — Convert wide-format crop data into long format for unified Q&A

import pandas as pd

# Normalize columns
# Cleans all column headers:
# removes spaces, dashes, and slashes, and converts them to lowercase.
df_crop.columns = [str(c).strip().lower().replace(" ", "_").replace("-", "_").replace("/", "_") for c in df_crop.columns]

# Ensure 'state' column exists
if "state" not in df_crop.columns:
    raise KeyError("No 'state' column found!")
# Ensures your dataset includes a state column.
# If this fails, we can’t group or join the crop data with rainfall later.

# Detect possible crop production columns dynamically
# Automatically picks out all columns containing crop names.
# Each dataset may have slightly different crop spellings or order.
# This line ensures our code adapts automatically to those changes.
crop_columns = [c for c in df_crop.columns if any(x in c for x in ["wheat", "rice", "maize", "barley", "ragi", "pulses", "millets", "chillies", "ginger", "oil"])]
print(f"Detected crop columns: {crop_columns}")

# Coverts crop cols into rows. Standardize the structure having same format for all crops.
# Easier to quesry with SQL or in DuckDB. like "Top crops by state"
df_crop_long = df_crop.melt(id_vars=["state", "district"], value_vars=crop_columns,
                            var_name="crop", value_name="production_mt")

# Clean crop names
# Make crop name readable and consistent. Remove suffix _in_metric_tonnes, _ with space. coverts Capitalizing
df_crop_long["crop"] = df_crop_long["crop"].str.replace("_in_metric_tonnes", "", regex=False)
df_crop_long["crop"] = df_crop_long["crop"].str.replace("_", " ").str.strip().str.title()

# Convert production values to numeric
df_crop_long["production_mt"] = pd.to_numeric(df_crop_long["production_mt"], errors="coerce")
# Some CSV cells might contain text or commas (e.g., "1,234" or "NA").
# This converts them safely into numbers or NaN.
# Ensures aggregation (SUM, AVG) won’t fail.

# Remove rows without production data
df_crop_long = df_crop_long.dropna(subset=["production_mt"]).reset_index(drop=True)

print("After transformation:", df_crop_long.shape)
df_crop_long.head(10)


Detected crop columns: ['wheat_(_in_metric_tonnes)', 'maize_(_in_metric_tonnes)', 'rice_(_in_metric_tonnes)', 'barley_(_in_metric_tonnes)', 'ragi_(_in_metric_tonnes)', 'pulses_(_in_metric_tonnes)', 'common_millets_(_in_metric_tonnes)', 'chillies_(_in_metric_tonnes)', 'ginger_(_in_metric_tonnes)', 'oil__seeds_(_in_metric_tonnes)']
After transformation: (130, 4)


Unnamed: 0,state,district,crop,production_mt
0,Himachal Pradesh,Bilaspur,Wheat (),48096
1,Himachal Pradesh,Chamba,Wheat (),41545
2,Himachal Pradesh,Hamirpur,Wheat (),58886
3,Himachal Pradesh,Kangra,Wheat (),190519
4,Himachal Pradesh,Kinnaur,Wheat (),196
5,Himachal Pradesh,Kullu,Wheat (),21992
6,Himachal Pradesh,Lahaul-spiti,Wheat (),125
7,Himachal Pradesh,Mandi,Wheat (),117665
8,Himachal Pradesh,Shimla,Wheat (),13819
9,Himachal Pradesh,Sirmaur,Wheat (),40567


In [32]:
# Step 1 — Final cleanup of crop names and structure

df_crop_long["crop"] = (
    df_crop_long["crop"]
    .str.replace(r"\(\)", "", regex=True)
    .str.replace(r"\s+", " ", regex=True)
    .str.strip()
    .str.title()
)

# Add a dummy 'year' column to match rainfall data for merging
# (We can later update this with actual years if available)
df_crop_long["year"] = 2022  # or any consistent year range if dataset is static

# Save cleaned version
base_dir = "/content/data"
df_crop_long.to_parquet(f"{base_dir}/processed/crop_production.parquet", index=False)
# Parquet is columnar, meaning:
# Faster queries (especially for large datasets).
# Uses less storage space.
# Keeps schema and type information intact.
# It’s also natively compatible with DuckDB (used in your later cells).

print("Cleaned crop dataset ready for analysis:", df_crop_long.shape)
df_crop_long.head(10)


Cleaned crop dataset ready for analysis: (130, 5)


Unnamed: 0,state,district,crop,production_mt,year
0,Himachal Pradesh,Bilaspur,Wheat,48096,2022
1,Himachal Pradesh,Chamba,Wheat,41545,2022
2,Himachal Pradesh,Hamirpur,Wheat,58886,2022
3,Himachal Pradesh,Kangra,Wheat,190519,2022
4,Himachal Pradesh,Kinnaur,Wheat,196,2022
5,Himachal Pradesh,Kullu,Wheat,21992,2022
6,Himachal Pradesh,Lahaul-spiti,Wheat,125,2022
7,Himachal Pradesh,Mandi,Wheat,117665,2022
8,Himachal Pradesh,Shimla,Wheat,13819,2022
9,Himachal Pradesh,Sirmaur,Wheat,40567,2022


**Temp Data** Cleaning

In [None]:
# # Cell — Temperature Data Cleaning
# import pandas as pd

# # Normalize column names again
# df_temp.columns = [str(c).strip().lower().replace(" ", "_").replace("-", "_") for c in df_temp.columns]

# # Check the column names
# print("Temperature columns:", df_temp.columns.tolist())

# # Ensure year column exists and clean it
# if "year" in df_temp.columns:
#     df_temp["year"] = pd.to_numeric(df_temp["year"].astype(str).str.extract(r"(\d{4})")[0], errors="coerce")
# else:
#     raise KeyError("No 'year' column found in temperature dataset!")

# # Convert all temperature-related columns to numeric
# temp_cols = [c for c in df_temp.columns if c != "year"]
# for c in temp_cols:
#     df_temp[c] = pd.to_numeric(df_temp[c], errors="coerce")

# # Rename 'annual' to a consistent name
# if "annual" in df_temp.columns:
#     df_temp = df_temp.rename(columns={"annual": "mean_temp_c"})

# # Drop missing or invalid years
# df_temp = df_temp.dropna(subset=["year"]).reset_index(drop=True)

# # Show summary
# print("Temperature cleaning complete:", df_temp.shape)
# df_temp.head(5)


**Soil Dataset** cleaning

In [None]:
# # Cell — Soil Health Data Cleaning
# import pandas as pd

# # Normalize columns
# df_soil.columns = [str(c).strip().lower().replace(" ", "_").replace("-", "_").replace("/", "_") for c in df_soil.columns]
# print("Soil columns:", df_soil.columns.tolist())

# # Identify the 'state' column
# possible_state_cols = [c for c in df_soil.columns if any(x in c for x in ["state", "ut"])]
# if possible_state_cols:
#     state_col = possible_state_cols[0]
#     print(f"Found '{state_col}' as the state column.")
#     df_soil = df_soil.rename(columns={state_col: "state"})
# else:
#     raise KeyError("No state column found in soil dataset!")

# # Convert SHC count columns to numeric
# count_cols = [c for c in df_soil.columns if "shc" in c or "total" in c]
# for c in count_cols:
#     df_soil[c] = pd.to_numeric(df_soil[c].astype(str).str.replace(",", ""), errors="coerce")

# # Drop rows with missing state names
# df_soil = df_soil.dropna(subset=["state"]).reset_index(drop=True)

# # Clean up state names (e.g., remove leading/trailing spaces, standardize case)
# df_soil["state"] = df_soil["state"].str.strip().str.title()

# # Show quick stats
# print("Soil Health cleaning complete:", df_soil.shape)
# df_soil.head(5)


In [None]:
# # ============================================================
# # ✅ Cell — Expand Soil Cycle Values Into Yearly Data
# # ============================================================

# import pandas as pd

# df_soil = df_soil.copy()

# # STEP 1 — Identify cycle columns
# cycle_cols = {
#     "no_of_shcs_issued_to_farmers___cycle_i_(2015_17)": (2015, 2017),
#     "no_of_shcs_issued_to_farmers___cycle_ii_(2017_19)": (2017, 2019),
#     "no_of_shcs_issued_to_farmers___model_village_programme_(2019_20)": (2019, 2020),
#     "no_of_shcs_issued_to_farmers___2020_21": (2020, 2021)
# }

# expanded_rows = []

# # STEP 2 — Expand cycles into per-year values
# for _, row in df_soil.iterrows():
#     state_name = row["state"]

#     for col, (start, end) in cycle_cols.items():
#         total_value = row[col]
#         years = list(range(start, end + 1))  # e.g., 2015–2017 → [2015, 2016, 2017]
#         per_year_value = total_value / len(years)  # distribute evenly

#         for y in years:
#             expanded_rows.append({
#                 "state": state_name,
#                 "year": y,
#                 "soil_shc_issued": per_year_value
#             })

# # STEP 3 — Create full expanded dataframe
# df_soil_yearly = pd.DataFrame(expanded_rows)

# print("✅ Soil yearly expanded dataset shape:", df_soil_yearly.shape)
# df_soil_yearly.head(10)


In [27]:
# Save cleaned IMD rainfall data properly
base_dir = "/content/data"
df_rain.to_parquet(f"{base_dir}/processed/imd_rainfall.parquet", index=False)
print("Rainfall parquet saved successfully at:", f"{base_dir}/processed/imd_rainfall.parquet")


Rainfall parquet saved successfully at: /content/data/processed/imd_rainfall.parquet


In [None]:
# df_temp.to_parquet(f"{base}/processed/temp_data.parquet", index=False)
# df_soil.to_parquet(f"{base}/processed/soil_data.parquet", index=False)


In [None]:
# append_catalog("Temperature Dataset (IMD)", "temp", "data.gov.in", f"{base}/processed/temp_data.parquet", "checksum_here")
# append_catalog("Soil Health Dataset (MoA)", "soil", "data.gov.in", f"{base}/processed/soil_data.parquet", "checksum_here")


In [33]:
df_crop_long.to_parquet("/content/data/processed/crop_production.parquet", index=False)


13. Listing out both the files before DuckDB load.

In [34]:
# Verify all processesed data files have been saved correctly.

import os

!ls -R /content/data/processed


/content/data/processed:
crop_production.parquet  imd_rainfall.parquet


In [35]:
base_dir = "/content/data"
# base directory where all project data files are stored.
# keeps path organized and reusable instead of hardcoding them.

df_crop_long.to_parquet(f"{base_dir}/processed/crop_production.parquet", index=False)
print("Crop parquet saved successfully at:", f"{base_dir}/processed/crop_production.parquet")
# for verification

Crop parquet saved successfully at: /content/data/processed/crop_production.parquet


In [None]:
# df_soil_yearly.to_parquet("/content/data/processed/soil_data.parquet", index=False)
# print("✅ Soil yearly parquet updated successfully.")


##**Merging Two Datasets**

1. **DuckDB** is an memory analytical **database engine** for Python. It lets to run powerful SQL queries directly on CSV/Parquet files without needing a seperate database server like MySQL. Connect to local DuckDB, create tables rainfall and crop from Parquet via parquet_scan.
2. Works seemlesly with Parquet and Pandas with large datasets from **data.gov.in**  
3. **Data Integration**: It combines both datasets into a single unified analytical table using DuckDB.

In [38]:
import duckdb

con = duckdb.connect("/content/db/samarth.duckdb", read_only=False)

con.execute("CREATE OR REPLACE TABLE crop AS SELECT * FROM read_parquet('/content/data/processed/crop_production.parquet');")
con.execute("CREATE OR REPLACE TABLE rainfall AS SELECT * FROM read_parquet('/content/data/processed/imd_rainfall.parquet');")
# con.execute("CREATE OR REPLACE TABLE temp AS SELECT * FROM read_parquet('/content/data/processed/temp_data.parquet');")
# con.execute("CREATE OR REPLACE TABLE soil AS SELECT * FROM read_parquet('/content/data/processed/soil_data.parquet');")

print(con.execute("SHOW TABLES").df())
print("\nRainfall:", con.execute("PRAGMA table_info(rainfall)").df())
# print("\nTemp:", con.execute("PRAGMA table_info(temp)").df())
# print("\nSoil:", con.execute("PRAGMA table_info(soil)").df())
print("\nCrop:", con.execute("PRAGMA table_info(crop)").df())

       name
0      crop
1  rainfall

Rainfall:     cid    name     type  notnull dflt_value     pk
0     0   state  VARCHAR    False       None  False
1     1    year   BIGINT    False       None  False
2     2     jan   DOUBLE    False       None  False
3     3     feb   DOUBLE    False       None  False
4     4     mar   DOUBLE    False       None  False
5     5     apr   DOUBLE    False       None  False
6     6     may   DOUBLE    False       None  False
7     7     jun   DOUBLE    False       None  False
8     8     jul   DOUBLE    False       None  False
9     9     aug   DOUBLE    False       None  False
10   10     sep   DOUBLE    False       None  False
11   11     oct   DOUBLE    False       None  False
12   12     nov   DOUBLE    False       None  False
13   13     dec   DOUBLE    False       None  False
14   14  annual   DOUBLE    False       None  False
15   15      jf   DOUBLE    False       None  False
16   16     mam   DOUBLE    False       None  False
17   17    jjas  

In [39]:
unified_query = """
CREATE OR REPLACE VIEW unified AS
SELECT
    c.state,
    c.crop,
    c.year,
    c.production_mt,
    r.annual AS rainfall_mm
FROM crop c
LEFT JOIN rainfall r
    ON lower(c.state) = lower(r.state)
   AND c.year = r.year;
"""

con.execute(unified_query)

# Preview
df_u = con.execute("SELECT * FROM unified LIMIT 10").df()
df_u


Unnamed: 0,state,crop,year,production_mt,rainfall_mm
0,Himachal Pradesh,Wheat,2022,48096,
1,Himachal Pradesh,Wheat,2022,41545,
2,Himachal Pradesh,Wheat,2022,58886,
3,Himachal Pradesh,Wheat,2022,190519,
4,Himachal Pradesh,Wheat,2022,196,
5,Himachal Pradesh,Wheat,2022,21992,
6,Himachal Pradesh,Wheat,2022,125,
7,Himachal Pradesh,Wheat,2022,117665,
8,Himachal Pradesh,Wheat,2022,13819,
9,Himachal Pradesh,Wheat,2022,40567,


14. Building **Interactive Q&A prototype**:
traceability and citation system for Project Samarth prototype.
It ensures that every answer in model can cite the exact dataset source, title, URL

In [40]:
import pandas as pd, json, time
from IPython.display import display, Markdown
# pandas → for reading and managing the dataset catalog.
# json → for any future structured reading (metadata files).
# time → could be used later for timestamping operations.
# IPython.display → allows you to print clean Markdown output in Colab (e.g., formatted titles).

# Loads your dataset catalog (metadata for all data files used)
# Earlier, helper functions(append_catalog) created this file while downloading(rainfall+crop)
catalog_path = "/content/data/data_catalog.csv"
catalog = pd.read_csv(catalog_path)
display(catalog)

# Finds dataset info when a user’s question or query mentions it
def find_source_by_name(keyword):
    """Find catalog rows that match a keyword (case-insensitive)."""
    mask = catalog.apply(lambda row: keyword.lower() in str(row['title']).lower() or keyword.lower() in str(row['source_url']).lower(), axis=1)
    return catalog[mask]

# Returns a proper citation (title + official data.gov.in link)
# Calls the previous function to find the dataset, returns a clean, short citation (title + link).
# If nothing is found, defaults to “data.gov.in (dataset not found…)”.
# Searches the catalog for a dataset whose title or URL matches a given keyword.

def cite_dataset_by_keyword(keyword):
    """Return a short citation (title + source_url) for a dataset matching keyword."""
    df = find_source_by_name(keyword)
    if not df.empty:
        r = df.iloc[0]
        return f"{r['title']} — {r['source_url']}"
    return "data.gov.in (dataset not found in local catalog)"


Unnamed: 0,title,resource_id,source_url,local_path,checksum,downloaded_at
0,Rainfall Dataset (manual),rainfall_manual,manual_upload,/content/data/raw/rainfall.csv,9d4829133f031c90257e5be409c7fd42,2025-11-07
1,Crop Dataset (manual),crop_manual,manual_upload,/content/data/raw/crop_data.csv,9675a5c8205f494ba23474562286a7ba,2025-11-07


15. **Expanded Query Planner**: Brain of Project Samarth prototype - Natural Language Understanding (NLU layer). Converts plain English questions into SQL queries that run throughs DuckDB databse(rainfall + crop data).



**Execution Steps:**
1. Takes a user question in natural language (like “Top 5 crops in Himachal Pradesh”).

2. Detects the type of question (intent).

3. Extracts keywords (state, crop, years).

4. Builds dynamic SQL queries to fetch the correct answer.

5. Returns a dictionary with everything needed to execute those queries.

In [50]:
import re

def samarth_plan(question: str):
    q = question.strip().lower()

    # ---------------------------------------------------
    # 1️⃣ Compare rainfall + crops between two states
    # Example:
    #   "Compare rainfall in Himachal Pradesh and Punjab for last 5 years"
    #   "Compare rainfall and top 3 crops in Kerala and Tamil Nadu for last 10 years"
    # ---------------------------------------------------
    m = re.search(r"compare.*rainfall.*in\s+([\w\s]+)\s+and\s+([\w\s]+).*last\s+(\d+)\s+years", q)
    if m:
        s1, s2, n = [x.strip() for x in m.groups()]
        n = int(n)

        # year window based on rainfall table
        max_year = con.execute("SELECT MAX(year) FROM rainfall").fetchone()[0]
        min_year = max_year - n + 1

        # SQL: main climate + production aggregation
        sql_main = f"""
        SELECT
            lower(c.state) AS state,
            SUM(c.production_mt) AS total_production,
            AVG(r.annual) AS avg_rainfall_mm
        FROM crop c
        LEFT JOIN rainfall r
            ON lower(c.state) = lower(r.state)
           AND c.year = r.year
        WHERE lower(c.state) IN ('{s1.lower()}', '{s2.lower()}')
          AND c.year BETWEEN {min_year} AND {max_year}
        GROUP BY c.state;
        """

        # SQL: top M crops (default: 5)
        M = 5
        m2 = re.search(r"top\s+(\d+)", q)
        if m2:
            M = int(m2.group(1))

        sql_top = f"""
        SELECT
            lower(state) AS state,
            crop,
            SUM(production_mt) AS total_prod
        FROM crop
        WHERE lower(state) IN ('{s1.lower()}', '{s2.lower()}')
          AND year BETWEEN {min_year} AND {max_year}
        GROUP BY state, crop
        ORDER BY total_prod DESC;
        """

        return {
            "intent": "compare_rain_and_crops",
            "states": [s1, s2],
            "years": (min_year, max_year),
            "M": M,
            "sql_main": sql_main,
            "sql_top": sql_top
        }

    # ---------------------------------------------------
    # 2️⃣ Trend of crop over last N years in a region
    # Example:
    #   "Trend of wheat last 5 years in Himachal Pradesh"
    # ---------------------------------------------------
    m = re.search(r"trend.*of\s+([\w\s]+).*last\s+(\d+)\s+years.*in\s+([\w\s]+)", q)
    if m:
        crop, n, region = m.groups()
        n = int(n)

        max_year = con.execute("SELECT MAX(year) FROM crop").fetchone()[0]
        min_year = max_year - n + 1

        sql_prod = f"""
        SELECT year, SUM(production_mt) AS total_prod
        FROM crop
        WHERE lower(crop) LIKE '%{crop.strip().lower()}%'
          AND lower(state) LIKE '%{region.strip().lower()}%'
          AND year BETWEEN {min_year} AND {max_year}
        GROUP BY year ORDER BY year;
        """

        sql_rain = f"""
        SELECT year, AVG(annual) AS avg_rain
        FROM rainfall
        WHERE lower(state) LIKE '%{region.strip().lower()}%'
          AND year BETWEEN {min_year} AND {max_year}
        GROUP BY year ORDER BY year;
        """

        return {
            "intent": "trend_and_corr",
            "crop": crop.strip(),
            "region": region.strip(),
            "years": (min_year, max_year),
            "sql_prod": sql_prod,
            "sql_rain": sql_rain
        }

    # ---------------------------------------------------
    # 3️⃣ Top N crops in a state
    # Example:
    #   "Top 5 crops in Himachal Pradesh"
    # ---------------------------------------------------
    m = re.search(r"top\s+(\d+)\s+crops\s+in\s+([\w\s]+)", q)
    if m:
        n, state = m.groups()
        n = int(n)

        sql = f"""
        SELECT crop, SUM(production_mt) AS total_prod
        FROM crop
        WHERE lower(state) LIKE '%{state.strip().lower()}%'
        GROUP BY crop
        ORDER BY total_prod DESC
        LIMIT {n};
        """

        return {"intent": "top_crops_state", "sql": sql, "state": state.strip(), "n": n}

    # ---------------------------------------------------
    # 4️⃣ If nothing matched
    # ---------------------------------------------------
    return {"intent": "unknown"}


16. **Executor + Synthesizer(run SQL, format answer, attach citations):** it tkaes the plan generated by your earlier samarth_plan() function, executes the SQL queries on DuckDB, and produces human-readable answers with proper dataset citations.

In [51]:
def execute_and_synthesize(plan):
    intent = plan.get("intent")

    # ------------------------------------------
    # 1. Unknown intent
    # ------------------------------------------
    if intent == "unknown":
        return ("⚠️ I couldn't interpret that.\n"
                "Try examples like:\n"
                "- Compare rainfall between X and Y for last N years\n"
                "- Trend of <crop> in <state> for last N years\n"
                "- Top N crops in <state>"), None

    # ------------------------------------------
    # 2. Compare rainfall + crops between two states
    # ------------------------------------------
    if intent == "compare_rain_and_crops":
        df_main = con.execute(plan["sql_main"]).df()
        df_top = con.execute(plan["sql_top"]).df()

        if df_main.empty:
            return "No matching data for those states/years.", None

        lines = []
        lines.append(f"### Comparison between **{plan['states'][0].title()}** and **{plan['states'][1].title()}**")
        lines.append(f"Period: **{plan['years'][0]}–{plan['years'][1]}**\n")

        for _, r in df_main.iterrows():
            lines.append(
                f"- **{r['state'].title()}** → "
                f"Avg Rainfall: **{round(r['avg_rainfall_mm'],1)} mm**, "
                f"Total Production: **{int(r['total_production'])} t**"
            )

        # Top M crops per state
        lines.append("\n### Top Crops:\n")
        for st in plan["states"]:
            subset = df_top[df_top["state"].str.lower() == st.strip().lower()]
            best = subset.head(plan["M"])
            formatted = ", ".join(
                f"{row['crop'].title()} ({int(row['total_prod'])} t)"
                for _, row in best.iterrows()
            )
            lines.append(f"- **{st.title()}**: {formatted}")

        # Citations
        lines.append("\n### Sources")
        lines.append("- Rainfall → " + cite_dataset_by_keyword("rain"))
        lines.append("- Crop → " + cite_dataset_by_keyword("crop"))

        return "\n".join(lines), (df_main, df_top)

    # ------------------------------------------
    # 3. Trend + correlation (crop vs rainfall)
    # ------------------------------------------
    if intent == "trend_and_corr":
        df_prod = con.execute(plan["sql_prod"]).df()
        df_rain = con.execute(plan["sql_rain"]).df()

        merged = df_prod.merge(df_rain, on="year", how="inner")

        if merged.empty:
            return "No overlapping data available for trend analysis.", None

        corr = merged["total_prod"].corr(merged["avg_rain"])

        lines = []
        lines.append(f"### Trend for **{plan['crop'].title()}** in **{plan['region'].title()}**")
        lines.append(f"Period: **{plan['years'][0]}–{plan['years'][1]}**\n")
        lines.append(f"- Data points: {len(merged)}")
        lines.append(f"- Correlation between rainfall and production: **{round(corr, 3) if corr==corr else 'N/A'}**")
        lines.append("  (1 strong positive, -1 strong negative, 0 weak)\n")

        lines.append("### Sources")
        lines.append("- Crop → " + cite_dataset_by_keyword("crop"))
        lines.append("- Rainfall → " + cite_dataset_by_keyword("rain"))

        return "\n".join(lines), merged

    # ------------------------------------------
    # 4. Simple 'Top N crops in a state'
    # ------------------------------------------
    if intent == "top_crops_state":
        df = con.execute(plan["sql"]).df()

        if df.empty:
            return f"No crop data found for {plan['state'].title()}.", None

        lines = []
        lines.append(f"### Top {plan['n']} Crops in **{plan['state'].title()}**")
        for _, r in df.iterrows():
            lines.append(f"- {r['crop'].title()}: **{int(r['total_prod'])} t**")

        lines.append("\nSource → " + cite_dataset_by_keyword("crop"))

        return "\n".join(lines), df

    # ------------------------------------------
    # 5. Fallback
    # ------------------------------------------
    return "⚠️ Unhandled intent. Try a different query.", None


17. **Interactive loop for Colab (type questions):** That’s the final interactive Q&A loop — the user-facing part of your Project Samarth.
This cell transforms everything you built before into a fully functioning conversational data assistant.Runs the logic and creates final human-readable answers.This cell makes your project look like a chatbot-style intelligent data system built on top of Indian government datasets.

1. Parse your question (samarth_plan)
2. Generate relevant SQL queries
3. Execute them on your DuckDB datasets
4. Format and display the results with proper citations
5. Log everything for traceability

In [52]:
# ✅ Check unified view structure
cols = con.execute("PRAGMA table_info(unified)").df()["name"].tolist()
print("Unified columns:", cols)

# ✅ Ensure required fields exist
required_cols = ["state", "crop", "year", "production_mt", "rainfall_mm"]
assert set(required_cols).issubset(set(cols)), "Unified schema mismatch — missing required columns"

# ✅ Check total rows available in unified view
print(con.execute("SELECT COUNT(*) AS rows FROM unified").df())

# ✅ Show min and max year (to understand data coverage)
print(con.execute("SELECT MIN(year) AS min_year, MAX(year) AS max_year FROM unified").df())

# ✅ Show top states by record count
print(con.execute("""
SELECT state, COUNT(*) AS count
FROM unified
GROUP BY state
ORDER BY count DESC
LIMIT 10;
""").df())

# ✅ Show top crops by record count
print(con.execute("""
SELECT crop, COUNT(*) AS count
FROM unified
GROUP BY crop
ORDER BY count DESC
LIMIT 10;
""").df())


Unified columns: ['state', 'crop', 'year', 'production_mt', 'rainfall_mm']
   rows
0   130
   min_year  max_year
0      2022      2022
              state  count
0  Himachal Pradesh    130
             crop  count
0           Wheat     13
1          Barley     13
2            Ragi     13
3          Ginger     13
4       Oil Seeds     13
5           Maize     13
6            Rice     13
7          Pulses     13
8  Common Millets     13
9        Chillies     13


In [None]:
from IPython.display import Markdown, display
print("🌾 Project Samarth — Interactive Q&A (type 'exit' to stop)\n")
while True:
    q = input("Ask a question: ").strip()
    if q.lower() in ("exit","quit"): break
    plan = samarth_plan(q)
    ans, data = execute_and_synthesize(plan)
    display(Markdown(ans))
    if isinstance(data, pd.DataFrame) and not data.empty:
        display(data.head(5))
    elif isinstance(data, tuple):
        for d in data:
            if hasattr(d,"head"): display(d.head(5))
    print("-"*70)


🌾 Project Samarth — Interactive Q&A (type 'exit' to stop)

Ask a question: Top 5 crops in Himachal Pradesh


### Top 5 Crops in **Himachal Pradesh**
- Maize: **1489206 t**
- Wheat: **1239378 t**
- Rice: **233758 t**
- Pulses: **110456 t**
- Barley: **70826 t**

Source → Crop Dataset (manual) — manual_upload

Unnamed: 0,crop,total_prod
0,Maize,1489206.0
1,Wheat,1239378.0
2,Rice,233758.0
3,Pulses,110456.0
4,Barley,70826.0


----------------------------------------------------------------------
Ask a question: Top 5 crops in Himachal Pradesh


### Top 5 Crops in **Himachal Pradesh**
- Maize: **1489206 t**
- Wheat: **1239378 t**
- Rice: **233758 t**
- Pulses: **110456 t**
- Barley: **70826 t**

Source → Crop Dataset (manual) — manual_upload

Unnamed: 0,crop,total_prod
0,Maize,1489206.0
1,Wheat,1239378.0
2,Rice,233758.0
3,Pulses,110456.0
4,Barley,70826.0


----------------------------------------------------------------------
Ask a question: Compare rainfall in Himachal Pradesh and Punjab for last 5 years and top 3 crops


No matching data for those states/years.

----------------------------------------------------------------------
Ask a question: Show average rainfall in Himachal Pradesh for last 10 years
