<a href="https://colab.research.google.com/github/wardayX/cyhack/blob/main/productname_to_gst_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup & Imports  
Install and import all required libraries.


In [None]:
!pip install pandas pdfplumber fuzzywuzzy python-Levenshtein sentence-transformers

import pandas as pd
import pdfplumber
from fuzzywuzzy import process as fuzzy_process
from sentence_transformers import SentenceTransformer, util
import torch
import re
from google.colab import files
import io

print("Libraries installed and imported.")

Collecting pdfplumber
  Downloading pdfplumber-0.11.6-py3-none-any.whl.metadata (42 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/42.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting fuzzywuzzy
  Downloading fuzzywuzzy-0.18.0-py2.py3-none-any.whl.metadata (4.9 kB)
Collecting python-Levenshtein
  Downloading python_levenshtein-0.27.1-py3-none-any.whl.metadata (3.7 kB)
Collecting pdfminer.six==20250327 (from pdfplumber)
  Downloading pdfminer_six-20250327-py3-none-any.whl.metadata (4.1 kB)
Collecting pypdfium2>=4.18.0 (from pdfplumber)
  Downloading pypdfium2-4.30.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (48 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.2/48.2 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
Collecting Levenshtein==0.27.1 (from python-Levenshtein)
  Downloading levenshte

## File Upload Functions  
Define helper functions to upload the HSN PDF and GST CSV.

In [None]:
def upload_hsn_pdf():
    print("Please upload your HSN Code PDF (SL NO, HS CODE, DESCRIPTION columns).")
    uploaded = files.upload()
    if not uploaded:
        print("No file uploaded.")
        return None, None
    file_name = list(uploaded.keys())[0]
    print(f"Uploaded '{file_name}'")
    return file_name, uploaded[file_name]

def upload_gst_csv():
    print("\nPlease upload your GST Rates CSV.")
    uploaded = files.upload()
    if not uploaded:
        print("No file uploaded.")
        return None, None
    file_name = list(uploaded.keys())[0]
    print(f"Uploaded '{file_name}'")
    return file_name, uploaded[file_name]

print("File upload functions defined.")

## Parse HSN PDF with Aggregate Hierarchical Descriptions
Extract HS codes and descriptions from the uploaded PDF with enriching parent HSN codes by concatenating child descriptions.


In [None]:
def parse_hsn_pdf(pdf_content):
    data = []
    try:
        with pdfplumber.open(io.BytesIO(pdf_content)) as pdf:
            for i, page in enumerate(pdf.pages):
                print(f"Processing PDF page {i+1}/{len(pdf.pages)}...")
                tables = page.extract_tables()
                if tables:
                    for table in tables:
                        header = table[0]
                        if header and 'HS CODE' in str(header).upper() and 'DESCRIPTION' in str(header).upper():
                            data_rows = table[1:]
                        else:
                            data_rows = table

                        for row in data_rows:
                            if len(row) >= 3:
                                sl_no, hs_code, description = row[0], row[1], row[2]
                                hs_code = str(hs_code).replace('\n', ' ').strip() if hs_code else None
                                description = str(description).replace('\n', ' ').strip() if description else None
                                if hs_code and description:
                                    data.append({'HS_Code_PDF': hs_code, 'Description_PDF': description})
                    continue

                text = page.extract_text()
                if text:
                    lines = text.split('\n')
                    for line in lines:
                        match_hs = re.search(r'^\s*(\d{4,8})\s+(.+)', line)
                        if match_hs:
                            hs_code = match_hs.group(1).strip()
                            description = match_hs.group(2).strip()
                            description = re.sub(r'\s{2,}', ' ', description)
                            if hs_code and description:
                                data.append({'HS_Code_PDF': hs_code, 'Description_PDF': description})
                        else:
                            print(f"Could not parse line: {line}")


        if not data:
            print("Warning: No data extracted from PDF. PDF parsing might need custom logic for your file format.")
            print("Consider using page.extract_text() and custom regex if tables are not well-structured.")
            return pd.DataFrame(columns=['HS_Code_PDF', 'Description_PDF'])

        df_hsn = pd.DataFrame(data)
        df_hsn['HS_Code_PDF'] = df_hsn['HS_Code_PDF'].astype(str).str.replace(r'\W+', '', regex=True).str.strip().str.lower()
        df_hsn.dropna(subset=['HS_Code_PDF'], inplace=True)
        df_hsn.drop_duplicates(subset=['HS_Code_PDF'], keep='first', inplace=True)
        print(f"Extracted {len(df_hsn)} unique HSN entries from PDF.")
        return df_hsn
    except Exception as e:
        print(f"Error parsing PDF: {e}")
        print("Please ensure the PDF is not scanned (image-based) and has extractable text.")
        return pd.DataFrame(columns=['HS_Code_PDF', 'Description_PDF'])

print("PDF parsing function defined.")

def aggregate_hsn_descriptions(df_hsn_input):
    if df_hsn_input.empty:
        print("HSN PDF data is empty. Skipping aggregation.")
        return df_hsn_input.copy()

    print("\nAggregating hierarchical HSN descriptions from PDF data...")
    df_hsn = df_hsn_input.copy()
    df_hsn['HS_Code_PDF'] = df_hsn['HS_Code_PDF'].astype(str)
    df_hsn.sort_values(by='HS_Code_PDF', inplace=True)
    df_hsn.reset_index(drop=True, inplace=True)

    aggregated_descriptions = {}
    unique_hs_codes = sorted(df_hsn['HS_Code_PDF'].unique())

    for parent_hs in unique_hs_codes:
        parent_row = df_hsn[df_hsn['HS_Code_PDF'] == parent_hs]
        if parent_row.empty or pd.isna(parent_row['Description_PDF'].iloc[0]):
            continue

        current_descriptions = [parent_row['Description_PDF'].iloc[0]]
        for child_hs in unique_hs_codes:
            if child_hs.startswith(parent_hs) and len(child_hs) > len(parent_hs):
                child_row = df_hsn[df_hsn['HS_Code_PDF'] == child_hs]
                if not child_row.empty and pd.notna(child_row['Description_PDF'].iloc[0]):
                    current_descriptions.append(child_row['Description_PDF'].iloc[0])
        aggregated_descriptions[parent_hs] = ". ".join(list(dict.fromkeys(current_descriptions)))

    df_hsn['Aggregated_Description_PDF'] = df_hsn['HS_Code_PDF'].map(aggregated_descriptions)
    df_hsn['Aggregated_Description_PDF'].fillna(df_hsn['Description_PDF'], inplace=True)

    print("HSN description aggregation complete.")
    return df_hsn

## Parse GST Rates CSV  
Read the GST CSV with fallback encodings, clean and normalize columns.


In [None]:
def parse_gst_csv(csv_content):
    df_gst = None
    encodings_to_try = ['utf-8', 'latin1', 'cp1252', 'iso-8859-1']

    for encoding in encodings_to_try:
        try:
            csv_file_like_object = io.BytesIO(csv_content)
            df_gst = pd.read_csv(csv_file_like_object, encoding=encoding)
            print(f"GST CSV loaded successfully with encoding: {encoding}")
            break
        except UnicodeDecodeError:
            print(f"Failed to decode CSV with encoding: {encoding}. Trying next...")
        except Exception as e:
            print(f"Error reading CSV with encoding {encoding}: {e}")
            df_gst = None

    if df_gst is None:
        print("Error: Could not read or decode the GST CSV file with common encodings.")
        print("Please ensure your CSV file is saved in a compatible format (e.g., UTF-8, Latin-1, CP1252) or specify the correct encoding.")
        return pd.DataFrame()
    print("GST CSV loaded. Original columns found:", df_gst.columns.tolist())

    column_mapping = {
        'Chapter/Heading/Sub-heading/Tariffitem': 'HS_Code_GST',
        'DescriptionofGoods': 'Description_GST',
        'CGST(%)': 'CGST_Rate',
        'SGST/UTGST(%)': 'SGST_Rate',
        'IGST(%)': 'IGST_Rate',
        'CompensationCess': 'Compensation_Cess_Raw'
    }

    actual_mapping_to_rename = {}
    for csv_header_pattern, internal_name in column_mapping.items():
        for actual_csv_column_name in df_gst.columns:
            if csv_header_pattern.lower().strip() == actual_csv_column_name.lower().strip():
                actual_mapping_to_rename[actual_csv_column_name] = internal_name
                break

    if actual_mapping_to_rename:
        df_gst.rename(columns=actual_mapping_to_rename, inplace=True)
        print("Columns after initial renaming attempt:", df_gst.columns.tolist())
    else:
        print("No column renaming mappings were applied based on column_mapping.")

    required_internal_cols = ['HS_Code_GST', 'Description_GST', 'CGST_Rate', 'SGST_Rate', 'IGST_Rate']

    if 'Compensation_Cess_Raw' not in df_gst.columns:
        found_cess_col = None
        for col_name in df_gst.columns:
            if "compensationcess" == col_name.lower().strip() and col_name not in actual_mapping_to_rename.values():
                found_cess_col = col_name
                break
            elif "cess" in col_name.lower() and "compensation" in col_name.lower() and col_name not in actual_mapping_to_rename.values():
                found_cess_col = col_name
                break
        if found_cess_col:
            df_gst.rename(columns={found_cess_col: 'Compensation_Cess_Raw'}, inplace=True)
            print(f"Renamed '{found_cess_col}' to 'Compensation_Cess_Raw'.")
        else:
            print("Warning: 'Compensation_Cess_Raw' (or a mappable equivalent) not found. Assuming no compensation cess for now.")
            df_gst['Compensation_Cess_Raw'] = "Nil"

    required_internal_cols.append('Compensation_Cess_Raw')

    missing_cols = [col for col in required_internal_cols if col not in df_gst.columns]
    if missing_cols:
        print(f"Error: Missing expected internal columns after all renaming attempts: {missing_cols}")
        print(f"Current available columns in DataFrame: {df_gst.columns.tolist()}")
        print("Please ensure your `column_mapping` in Cell 4 correctly maps your CSV headers to the expected internal names.")
        return pd.DataFrame()

    df_gst = df_gst[required_internal_cols].copy()
    df_gst['HS_Code_GST'] = df_gst['HS_Code_GST'].astype(str).str.replace(r'\W+', '', regex=True).str.strip().str.lower()
    rate_cols_internal = ['CGST_Rate', 'SGST_Rate', 'IGST_Rate']
    for col in rate_cols_internal:
        df_gst[col] = df_gst[col].astype(str).str.replace('%', '').str.strip()
        df_gst[col] = pd.to_numeric(df_gst[col], errors='coerce').fillna(0)

    def parse_cess(value):
        value_str = str(value).lower().strip()
        if not value_str or value_str in ['no', 'false', 'nil', 'exempt', 'exempted', '0', '0%', '0.0', '0.0%','-']:
            return 0.0, False
        match_rate = re.search(r'(\d+\.?\d*)', value_str)
        if match_rate:
            try:
                rate = float(match_rate.group(1))
                return rate, True
            except ValueError:
                pass
        if pd.notna(value) and value_str not in ['no', 'false', 'nil', 'exempt', 'exempted', '0', '0%', '0.0', '0.0%','-']:
            return 0.0, True
        return 0.0, False

    cess_parsed = df_gst['Compensation_Cess_Raw'].apply(parse_cess)
    df_gst['Compensation_Cess_Rate'] = cess_parsed.apply(lambda x: x[0])
    df_gst['Is_Compensation_Cess'] = cess_parsed.apply(lambda x: x[1])
    df_gst.drop(columns=['Compensation_Cess_Raw'], inplace=True)

    df_gst['Is_Exempted'] = (
        (df_gst['CGST_Rate'] == 0) &
        (df_gst['SGST_Rate'] == 0) &
        (df_gst['IGST_Rate'] == 0) &
        ((~df_gst['Is_Compensation_Cess']) | (df_gst['Compensation_Cess_Rate'] == 0))
    )

    df_gst.dropna(subset=['HS_Code_GST'], inplace=True)
    df_gst.drop_duplicates(subset=['HS_Code_GST'], keep='first', inplace=True)
    print(f"Processed {len(df_gst)} unique HSN entries from GST CSV.")
    print("Final columns in GST DataFrame after processing:", df_gst.columns.tolist())
    return df_gst

print("GST CSV parsing function (CORRECTED RENAMING LOGIC + ENCODING HANDLING) defined.")

## Load & Parse Inputs  
1. Upload HSN PDF → parse → aggregate  
2. Upload GST CSV → parse  


In [None]:
hsn_pdf_name, hsn_pdf_content = upload_hsn_pdf()
df_hsn_pdf = pd.DataFrame()
if hsn_pdf_content:
    df_hsn_pdf = parse_hsn_pdf(hsn_pdf_content)
    if not df_hsn_pdf.empty:
        print("\n--- Parsed HSN PDF Data (Sample BEFORE Aggregation) ---")
        print(df_hsn_pdf.head())
        df_hsn_pdf_aggregated = aggregate_hsn_descriptions(df_hsn_pdf)
    else:
        df_hsn_pdf_aggregated = pd.DataFrame()
else:
    df_hsn_pdf_aggregated = pd.DataFrame()
    print("\nHSN PDF content not available. Skipping HSN PDF processing and aggregation.")

gst_csv_name, gst_csv_content = upload_gst_csv()
df_gst_rates = pd.DataFrame()
if gst_csv_content:
    df_gst_rates = parse_gst_csv(gst_csv_content)

gst_csv_name, gst_csv_content = upload_gst_csv()
df_gst_rates = pd.DataFrame()
if gst_csv_content:
    df_gst_rates = parse_gst_csv(gst_csv_content)
    if not df_gst_rates.empty:
        print("\n--- Parsed GST Rates CSV Data (Sample) ---")
        print(df_gst_rates.head())
        print("\nGST Data Columns:", df_gst_rates.columns)

if df_hsn_pdf.empty and df_gst_rates.empty:
    print("\nERROR: Neither HSN PDF nor GST CSV data could be loaded. Cannot proceed.")
elif df_gst_rates.empty:
    print("\nERROR: GST CSV data could not be loaded. Tax rates are essential. Cannot proceed.")

## Merge HSN & GST Data  
Left-join on HS codes, prioritize aggregated descriptions.


In [None]:
df_merged = pd.DataFrame()

hsn_data_to_merge = (
    df_hsn_pdf_aggregated
    if 'df_hsn_pdf_aggregated' in locals() and not df_hsn_pdf_aggregated.empty
    else df_hsn_pdf
)

if not df_gst_rates.empty:
    if not hsn_data_to_merge.empty:
        df_merged = pd.merge(
            df_gst_rates,
            hsn_data_to_merge,
            left_on='HS_Code_GST',
            right_on='HS_Code_PDF',
            how='left'
        )

        if 'Aggregated_Description_PDF' in df_merged.columns:
            df_merged['Description_From_PDF_Source'] = df_merged['Aggregated_Description_PDF']
        else:
            df_merged['Description_From_PDF_Source'] = df_merged['Description_PDF']

        df_merged['Combined_Description'] = df_merged['Description_From_PDF_Source'].fillna(df_merged['Description_GST'])
        df_merged['Combined_Description'].fillna('', inplace=True)

        df_merged.rename(columns={'HS_Code_GST': 'HS_Code'}, inplace=True)

        final_columns = [
            'HS_Code',
            'Combined_Description',
            'Description_GST',
            'Description_PDF',
            'Aggregated_Description_PDF',
            'CGST_Rate',
            'SGST_Rate',
            'IGST_Rate',
            'Is_Compensation_Cess',
            'Compensation_Cess_Rate',
            'Is_Exempted'
        ]
        final_columns = [col for col in final_columns if col in df_merged.columns]
        df_merged = df_merged[final_columns]

        print(df_merged.head())

    else:
        df_merged = df_gst_rates.copy()
        df_merged.rename(
            columns={
                'HS_Code_GST': 'HS_Code',
                'Description_GST': 'Combined_Description'
            },
            inplace=True
        )
        if 'Description_PDF' not in df_merged.columns:
            df_merged['Description_PDF'] = None
        if 'Aggregated_Description_PDF' not in df_merged.columns:
            df_merged['Aggregated_Description_PDF'] = None
        if 'Is_Exempted' not in df_merged.columns:
            df_merged['Is_Exempted'] = (
                (df_merged['CGST_Rate'] == 0) &
                (df_merged['SGST_Rate'] == 0) &
                (df_merged['IGST_Rate'] == 0) &
                (~df_merged['Is_Compensation_Cess'] | (df_merged['Compensation_Cess_Rate'] == 0))
            )

    if 'Combined_Description' in df_merged.columns:
        df_merged['Combined_Description'] = (
            df_merged['Combined_Description']
            .astype(str)
            .str.lower()
            .str.strip()
        )
    else:
        df_merged['Combined_Description'] = ""

    if not df_merged.empty:
        print(f"Final merged dataset has {len(df_merged)} entries.")
        print("Columns in final merged data:", df_merged.columns.tolist())
        if 'Combined_Description' in df_merged.columns:
            all_descriptions_for_fuzzy = (
                df_merged['Combined_Description']
                .fillna('')
                .astype(str)
                .unique()
                .tolist()
            )
            print(f"Created 'all_descriptions_for_fuzzy' with {len(all_descriptions_for_fuzzy)} items.")
        else:
            all_descriptions_for_fuzzy = []
    else:
        all_descriptions_for_fuzzy = []
        print("ERROR: Merged data is empty.")
else:
    print("Essential GST rate data is missing. Cannot create merged dataset.")


## Semantic Search Initialization  
Load Sentence-Transformer model and compute embeddings.


In [None]:
model = None
corpus_embeddings = None

if 'df_merged' in locals() and not df_merged.empty and \
   'Combined_Description' in df_merged.columns and not df_merged['Combined_Description'].dropna().empty:
    try:
        print("\nLoading sentence transformer model for semantic search...")
        model_name = 'all-mpnet-base-v2'

        model = SentenceTransformer(model_name)
        print(f"Model '{model_name}' loaded.")

        print("Computing embeddings for product descriptions...")
        descriptions_to_embed = df_merged['Combined_Description'].fillna('').astype(str).tolist()

        if descriptions_to_embed:
            corpus_embeddings = model.encode(descriptions_to_embed, convert_to_tensor=True, show_progress_bar=True)
            print("Embeddings computed.")
        else:
            print("No descriptions to embed.")
            corpus_embeddings = None
            model = None

    except Exception as e:
        print(f"Error initializing model or computing embeddings: {e}")
        model = None
        corpus_embeddings = None
else:
    print("\nSkipping semantic search setup.")
    model = None
    corpus_embeddings = None


### TF-IDF Setup & QA Pipeline Setup  


In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

tfidf_vectorizer = None
tfidf_matrix = None
tfidf_feature_names = None

if 'df_merged' in locals() and not df_merged.empty and \
   'Combined_Description' in df_merged.columns and not df_merged['Combined_Description'].dropna().empty:
    try:
        print("\nSetting up TF-IDF Vectorizer...")
        corpus_for_tfidf = df_merged['Combined_Description'].fillna('').astype(str).tolist()

        if corpus_for_tfidf:
            tfidf_vectorizer = TfidfVectorizer(stop_words='english', ngram_range=(1, 2))
            tfidf_matrix = tfidf_vectorizer.fit_transform(corpus_for_tfidf)
            tfidf_feature_names = tfidf_vectorizer.get_feature_names_out()
            print(f"TF-IDF matrix computed. Shape: {tfidf_matrix.shape}")
            print(f"Number of TF-IDF features: {len(tfidf_feature_names)}")
        else:
            print("No descriptions available for TF-IDF setup.")
            tfidf_vectorizer = None
            tfidf_matrix = None

    except Exception as e:
        print(f"Error during TF-IDF setup: {e}")
        tfidf_vectorizer = None
        tfidf_matrix = None
else:
    print("\nSkipping TF-IDF setup.")

from transformers import pipeline

qa_pipeline = None
qa_model_name = 'deepset/roberta-base-squad2'

try:
    print(f"\nLoading QA pipeline with model: {qa_model_name}...")
    qa_pipeline = pipeline('question-answering', model=qa_model_name, tokenizer=qa_model_name)
    print("QA pipeline loaded successfully.")
except Exception as e:
    print(f"Error loading QA pipeline: {e}")


## Search & Format Helpers  
Functions for fuzzy, TF-IDF, semantic search, and result formatting.


In [None]:
def find_gst_by_description_tfidf(description_query, top_k=5, similarity_threshold=0.2):
    if df_merged.empty or tfidf_vectorizer is None or tfidf_matrix is None or tfidf_matrix.shape[0] == 0:
        print("TF-IDF not available or data empty.")
        return []

    query_vector = tfidf_vectorizer.transform([str(description_query).lower().strip()])
    cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten()
    relevant_indices = np.where(cosine_similarities >= similarity_threshold)[0]
    if len(relevant_indices) == 0:
        return []

    scores = cosine_similarities[relevant_indices]
    sorted_indices = relevant_indices[np.argsort(scores)[::-1]][:top_k]

    matched = []
    seen = set()
    for idx in sorted_indices:
        row = df_merged.iloc[idx]
        code = row['HS_Code']
        if code not in seen:
            matched.append({'row': row, 'score': cosine_similarities[idx] * 100, 'match_type': 'TF-IDF'})
            seen.add(code)
            if len(matched) == top_k:
                break
    return matched

def format_gst_info(row):
    if row is None or row.empty:
        return "No product information found."
    hs = row.get('HS_Code', 'N/A')
    desc = row.get('Combined_Description', 'N/A').capitalize()
    cg = row.get('CGST_Rate', 0)
    sg = row.get('SGST_Rate', 0)
    ig = row.get('IGST_Rate', 0)
    cess = row.get('Is_Compensation_Cess', False)
    cr = row.get('Compensation_Cess_Rate', 0)
    ex = row.get('Is_Exempted', False)

    info = "\n--- Product GST Details ---\n"
    info += f"HS Code: {hs}\nDescription: {desc}\nCGST: {cg}%\nSGST/UTGST: {sg}%\nIGST: {ig}%\n"
    info += f"Compensation Cess Applicable: {'Yes' if cess else 'No'}\n"
    if cess:
        info += f"Compensation Cess Rate: {cr}%\n"
    info += f"Exempted from Tax: {'Yes' if ex else 'No'}\n"
    info += "--------------------------\n"
    return info

def find_gst_by_hs_code(hs_code_query):
    if df_merged.empty:
        return []
    query = str(hs_code_query).strip().lower()
    return [row for _, row in df_merged[df_merged['HS_Code'] == query].iterrows()]

def find_gst_by_description_fuzzy(description_query, threshold=80, limit=5):
    if df_merged.empty or not all_descriptions_for_fuzzy:
        return []
    matches = fuzzy_process.extract(str(description_query).lower().strip(),
                                    all_descriptions_for_fuzzy, limit=limit*2)
    matched = []
    seen = set()
    for desc_match, score in matches:
        if score >= threshold:
            for _, row in df_merged[df_merged['Combined_Description'] == desc_match].iterrows():
                code = row['HS_Code']
                if code not in seen:
                    matched.append({'row': row, 'score': score, 'match_type': 'fuzzy'})
                    seen.add(code)
                    if len(matched) == limit:
                        break
        if len(matched) == limit:
            break
    return sorted(matched, key=lambda x: x['score'], reverse=True)

def find_gst_by_description_semantic(description_query, top_k=5, similarity_threshold=0.55):
    if df_merged.empty or model is None or corpus_embeddings is None or corpus_embeddings.nelement() == 0:
        return []
    query_embedding = model.encode(str(description_query).lower().strip(), convert_to_tensor=True)
    cos_scores = util.cos_sim(query_embedding, corpus_embeddings)[0]
    top = torch.topk(cos_scores, k=min(top_k*2, len(cos_scores)))
    matched = []
    seen = set()
    for score, idx in zip(top.values.tolist(), top.indices.tolist()):
        if score >= similarity_threshold:
            row = df_merged.iloc[idx]
            code = row['HS_Code']
            if code not in seen:
                matched.append({'row': row, 'score': score, 'match_type': 'semantic'})
                seen.add(code)
                if len(matched) == top_k:
                    break
        else:
            break
    return sorted(matched, key=lambda x: x['score'], reverse=True)

print("Search functions defined.")


## Transformer QA System  
Define the interactive QA loop.


In [None]:
def format_extracted_gst_details(answer_text, item_context_row=None):
    details = f"QA Model Answer: '{answer_text}'\n"
    if item_context_row is not None:
        hs = item_context_row.get('HS_Code', 'N/A')
        desc = item_context_row.get('Combined_Description', 'N/A')
        cg = item_context_row.get('CGST_Rate', 'N/A')
        sg = item_context_row.get('SGST_Rate', 'N/A')
        ig = item_context_row.get('IGST_Rate', 'N/A')
        cess_app = item_context_row.get('Is_Compensation_Cess', False)
        cr = item_context_row.get('Compensation_Cess_Rate', 'N/A')
        ex = item_context_row.get('Is_Exempted', False)

        details += f"Based on context for HS Code {hs}: {desc}\n"
        details += f"  CGST: {cg}%, SGST: {sg}%, IGST: {ig}%\n"
        details += f"  Cess Applicable: {'Yes' if cess_app else 'No'}\n"
        if cess_app:
            details += f"  Cess Rate: {cr}%\n"
        details += f"  Exempted: {'Yes' if ex else 'No'}\n"
    return details


def get_gst_with_transformer_qa():
    if 'df_merged' not in globals() or df_merged.empty:
        print("ERROR: df_merged is not available or empty."); return
    if 'qa_pipeline' not in globals() or qa_pipeline is None:
        print("ERROR: qa_pipeline is not available."); return
    if 'model' not in globals() or model is None or \
       'corpus_embeddings' not in globals() or corpus_embeddings is None or corpus_embeddings.nelement() == 0:
        print("ERROR: Semantic search components not available."); return

    print("\n--- GST QA with Transformer ---")
    print("Type 'exit' to quit.")

    while True:
        user_query = input("\nEnter your product description or HS code: ").strip()
        if user_query.lower() == 'exit':
            print("Exiting QA system."); break
        if not user_query:
            continue

        print(f"Original query: '{user_query}'")
        embedding = model.encode(user_query, convert_to_tensor=True)
        cos_scores = util.cos_sim(embedding, corpus_embeddings)[0]
        top_results = torch.topk(cos_scores, k=min(3, len(df_merged)))

        retrieved = []
        for score, idx in zip(top_results.values.tolist(), top_results.indices.tolist()):
            if score > 0.3:
                retrieved.append({'row': df_merged.iloc[idx], 'score': score})

        if not retrieved:
            print("No relevant items found."); continue

        print(f"Found {len(retrieved)} relevant items.")
        questions = {
            "CGST": "What is the CGST rate?",
            "SGST": "What is the SGST or UTGST rate?",
            "IGST": "What is the IGST rate?",
            "Cess_Applicable": "Is compensation cess applicable?",
            "Cess_Rate": "What is the compensation cess rate?",
            "Is_Exempted": "Is the product exempted from tax?"
        }

        for item in retrieved[:1]:
            row = item['row']
            desc = row.get('Combined_Description', 'N/A')
            hs = row.get('HS_Code', 'N/A')
            context = (
                f"The product is '{desc}' with HS Code {hs}. "
                f"CGST: {row.get('CGST_Rate', 'unknown')}%. "
                f"SGST/UTGST: {row.get('SGST_Rate', 'unknown')}%. "
                f"IGST: {row.get('IGST_Rate', 'unknown')}%. "
                f"Compensation cess is {'applicable' if row.get('Is_Compensation_Cess') else 'not applicable'}. "
                f"Cess rate: {row.get('Compensation_Cess_Rate', '0')}%. "
                f"Exempted: {'yes' if row.get('Is_Exempted') else 'no'}."
            )

            print(f"\n--- Analyzing: {desc} (HS: {hs}) ---")
            print(f"Retrieval Score: {item['score']:.2f}")

            details = {}
            for key, q in questions.items():
                try:
                    res = qa_pipeline({'question': q, 'context': context})
                    details[key] = {'answer': res['answer'], 'score': res['score']}
                except:
                    details[key] = {'answer': 'Error', 'score': 0.0}

            print(f"GST Details for HS {hs}:")
            print(f"  CGST: {details['CGST']['answer']} (Conf: {details['CGST']['score']:.2f})")
            print(f"  SGST/UTGST: {details['SGST']['answer']} (Conf: {details['SGST']['score']:.2f})")
            print(f"  IGST: {details['IGST']['answer']} (Conf: {details['IGST']['score']:.2f})")
            print(f"  Cess Applicable: {details['Cess_Applicable']['answer']} (Conf: {details['Cess_Applicable']['score']:.2f})")
            if 'applicable' in details['Cess_Applicable']['answer'].lower():
                print(f"  Cess Rate: {details['Cess_Rate']['answer']} (Conf: {details['Cess_Rate']['score']:.2f})")
            print(f"  Exempted: {details['Is_Exempted']['answer']} (Conf: {details['Is_Exempted']['score']:.2f})")
            print("---")


## Pre-QA Sanity Check & Launch  
Verify all components are ready and start the QA loop.


In [None]:
print("--- Pre-QA Sanity Check ---")
all_systems_go = True

if 'df_merged' in globals() and not df_merged.empty:
    print(f"df_merged is READY for QA. Shape: {df_merged.shape}")
else:
    print("CRITICAL: df_merged is empty or not defined.")
    if 'df_merged' in globals():
        print(f"df_merged.empty check result: {df_merged.empty}")
    all_systems_go = False

if 'model' in globals() and model is not None and \
   'corpus_embeddings' in globals() and corpus_embeddings is not None and corpus_embeddings.nelement() > 0:
    print("Semantic model and corpus_embeddings are READY for retrieval.")
else:
    print("CRITICAL: Semantic model or corpus_embeddings are NOT ready.")
    if 'model' not in globals() or model is None:
        print("model is not ready.")
    if 'corpus_embeddings' not in globals() or corpus_embeddings is None:
        print("corpus_embeddings is not defined.")
    elif corpus_embeddings.nelement() == 0:
        print("corpus_embeddings is empty.")
    all_systems_go = False

if 'qa_pipeline' in globals() and qa_pipeline is not None:
    print("QA pipeline is READY.")
else:
    print("CRITICAL: QA pipeline is NOT ready.")
    all_systems_go = False

print("--- End of Pre-QA Sanity Check ---")

if all_systems_go:
    print("\nAll checks passed. Starting QA system...")
    get_gst_with_transformer_qa()
else:
    print("\nOne or more critical components are not ready. Cannot start QA system.")
    print("Please review the CRITICAL messages above.")
