# HSN Code Classification System: A Production-Ready RAG & Knowledge Graph Solution

## 1. Introduction and Setup

### 1.1. Executive Summary

This notebook presents an end-to-end, enterprise-grade system for Harmonized System of Nomenclature (HSN) code classification. The solution leverages a sophisticated Retrieval-Augmented Generation (RAG) architecture, enhanced by a Knowledge Graph (KG), to provide accurate, context-aware, and explainable classifications for product descriptions.
The system is designed with production deployment in mind, featuring a modular architecture, comprehensive testing, performance monitoring, and intelligent, user-centric query processing. It addresses the core business need of accurately and efficiently classifying products for international trade, ensuring compliance and streamlining operations.

### 1.2. System Architecture
The system is built on a layered, modular architecture that separates concerns and promotes maintainability and scalability.

    graph TD
        subgraph User Interaction Layer
            A[Jupyter Notebook / API Endpoint] --> B(HSNQueryProcessor);
        end

        subgraph Intelligence & Orchestration Layer
            B -- Manages Conversation & Ambiguity --> C{RAG System};
                B -- Enriches Context --> D{Knowledge Graph};
        end

        subgraph Core RAG Engine
            C -- Retrieves --> E[Vector Store];
            C -- Generates --> F[LLM Backend];
        end

        subgraph Data Foundation Layer
            G[PDF Data Source] --> H(HSNDataProcessor);
            H --> I[Structured Documents];
            I --> E;
            I --> D;
        end

        style B fill:#bbf,stroke:#333,stroke-width:2px
        style C fill:#bbf,stroke:#333,stroke-width:2px
        style D fill:#bbf,stroke:#333,stroke-width:2px


### Key Components:

1. Data Foundation: A robust ETL pipeline (HSNDataProcessor) ingests raw PDF data, cleans it, and transforms it into structured JSON documents optimized for both semantic search and knowledge graph construction.
2. Knowledge Graph: A KnowledgeGraphBuilder constructs a graph of HSN codes, capturing the explicit hierarchy (Chapter → Heading → Subheading → Code) and semantic relationships (e.g., sibling products). This provides deep, queryable context.
3. RAG Engine: The HSNRAGSystem orchestrates the core RAG loop. It uses a state-of-the-art Re-ranking Retrieval Strategy for high-accuracy document retrieval from a vector store (ChromaDB) and interfaces with a pluggable LLM backend (Gemini/Mock) for response generation.
4. Intelligence Layer: The HSNQueryProcessor acts as the system's "brain." It manages conversational state, detects user intent, and, most importantly, handles ambiguity by initiating an interactive disambiguation flow when retrieval results are too close to call.

### 1.3. Environment Setup

The following cells will install all necessary dependencies. This project uses pip for package management, and all dependencies are tracked in requirements.txt.

In [23]:
%%capture
!pip install pandas PyYAML pydantic pydantic-settings pandera colorlog networkx pyvis sentence-transformers scikit-learn chromadb openai google-generativeai async-lru pybreaker spacy==3.7.5 pytest pytest-asyncio pytest-benchmark respx sentence-transformers cross_encoders
!python -m spacy download en_core_web_sm

In [24]:
import asyncio
import json
from pathlib import Path
import pandas as pd
from IPython.display import display, HTML, IFrame
from src.config_loader import settings
from src.data_processor import HSNDataProcessor
from src.graph_backends import NetworkXBackend
from src.graph_builder import KnowledgeGraphBuilder
from src.rag_system import HSNRAGSystem
from src.query_processor import HSNQueryProcessor
from src.conversation_manager import ConversationState
from src.rag_backends import ChromaBackend, MockGeneratorBackend, GeminiGeneratorBackend
from src.retrieval_strategies import GraphContextualStrategy
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
import nest_asyncio
nest_asyncio.apply()
print("Environment setup complete.")

Environment setup complete.


### 1.4. Configuration Management

Configuration is managed via YAML files and loaded into a type-safe Pydantic Settings object. This allows for easy switching between environments (e.g., development, production) by setting an environment variable. For this notebook, we will use the default development configuration.

In [25]:
print("--- RAG System Configuration ---")
print(json.dumps(settings.rag_system.model_dump(), indent=2))

--- RAG System Configuration ---
{
  "vector_store": {
    "backend": "chroma",
    "path": "data/vector_store",
    "collection_name": "hsn_codes",
    "embedding_model": "BAAI/bge-small-en-v1.5"
  },
  "retrieval": {
    "strategy": "graph_contextual",
    "top_k": 5
  },
  "generator": {
    "backend": "gemini",
    "model": "gemini-2.0-flash",
    "openai_api_key": null,
    "gemini_api_key": "AIzaSyD0AJxyzpj0OVQDWtWycgXDR45F2S1MkHw",
    "temperature": 0.2,
    "timeout": 30
  },
  "caching": {
    "query_cache_ttl": 3600
  },
  "circuit_breaker": {
    "fail_max": 3,
    "reset_timeout": 60
  }
}


## 2. Data Processing and Enhancement

This section demonstrates the ETL pipeline that transforms raw, unstructured data from the source PDF into clean, structured documents ready for the RAG system.

In [26]:
print("--- Starting Data Processing Pipeline ---")
try:
    processor = HSNDataProcessor(file_path=str(settings.data_paths.raw_hsn_data))
    processor.load_hsn_dataset()
    print(f"\nSuccessfully loaded and validated {len(processor.df)} records.")
    if processor.validate_data_quality():
        print("All data quality checks passed.")
    processor.enhance_hierarchy()
    print("HSN hierarchy map built successfully.")
    structured_documents = processor.create_structured_documents()
    print(f"Created {len(structured_documents)} structured documents.")
    processor.save_documents(
        documents=structured_documents,
        output_path=str(settings.data_paths.processed_docs)
    )
    print(f"\nProcessed documents saved to: {settings.data_paths.processed_docs}")

    print("\n--- Sample of Processed DataFrame ---")
    display(processor.df.head())

    print("\n--- Sample of a Final Structured Document ---")
    print(json.dumps(structured_documents[0], indent=2))
    
except Exception as e:
    print(f"\nAn error occurred during data processing: {e}")

--- Starting Data Processing Pipeline ---
2025-09-14 19:32:28 - [32mINFO    [0m - src.data_processor:load_hsn_dataset:108 - Loading HSN dataset from data\raw\hsn_codes.json...[0m


2025-09-14 19:32:28 - [32mINFO    [0m - src.data_processor:_clean_and_impute_data:79 - Cleaning and imputing missing hierarchical descriptions...[0m
2025-09-14 19:32:28 - [32mINFO    [0m - src.data_processor:_clean_and_impute_data:93 - Data cleaning complete.[0m
2025-09-14 19:32:28 - [32mINFO    [0m - src.performance_monitor:wrapper:28 - Execution of '_clean_and_impute_data' took 10.53 ms.[0m
2025-09-14 19:32:28 - [32mINFO    [0m - src.data_processor:load_hsn_dataset:122 - Validating data schema...[0m
2025-09-14 19:32:28 - [32mINFO    [0m - src.data_processor:load_hsn_dataset:124 - Data schema validation successful.[0m
2025-09-14 19:32:28 - [32mINFO    [0m - src.data_processor:load_hsn_dataset:137 - Successfully loaded, cleaned, and validated 1667 records.[0m
2025-09-14 19:32:28 - [32mINFO    [0m - src.performance_monitor:wrapper:28 - Execution of 'load_hsn_dataset' took 89.34 ms.[0m

Successfully loaded and validated 1667 records.
2025-09-14 19:32:28 - [32mINFO  

Unnamed: 0,ChapterNumber,HSN Code,Description,FinalHSN,Chapter_Description,Heading_Description,Subheading_Description
0,40,40011010,Prevulcanised,Free,Rubber And Articles Thereof.,"Natural rubber, balata, gutta-percha, guayule,...","Natural rubber latex, whether or not pre- vulc..."
1,40,40011020,Other than prevulcanised,Free,Rubber And Articles Thereof.,"Natural rubber, balata, gutta-percha, guayule,...","Natural rubber latex, whether or not pre- vulc..."
2,40,40012100,Natural rubber in other forms : -- Smoked sheets,Free,Rubber And Articles Thereof.,"Natural rubber, balata, gutta-percha, guayule,...","Natural rubber latex, whether or not pre- vulc..."
3,40,40012200,Natural rubber in other forms : -- Technically...,Free,Rubber And Articles Thereof.,"Natural rubber, balata, gutta-percha, guayule,...","Natural rubber latex, whether or not pre- vulc..."
4,40,40012910,Hevea,Free,Rubber And Articles Thereof.,"Natural rubber, balata, gutta-percha, guayule,...",Natural rubber in other forms : -- Other



--- Sample of a Final Structured Document ---
{
  "document_id": "hsn_40011010",
  "text": "Product: Prevulcanised. Category: Natural rubber latex, whether or not pre- vulcanised. Broader Group: Natural rubber, balata, gutta-percha, guayule, chicle and similar natural gums, in primary forms or in plates, sheets or strip.. General Chapter: Rubber And Articles Thereof.. HSN Code is 40011010.",
  "metadata": {
    "hsn_code": "40011010",
    "chapter": "40",
    "heading": "4001",
    "subheading": "400110",
    "item_description": "Prevulcanised",
    "chapter_description": "Rubber And Articles Thereof.",
    "heading_description": "Natural rubber, balata, gutta-percha, guayule, chicle and similar natural gums, in primary forms or in plates, sheets or strip.",
    "subheading_description": "Natural rubber latex, whether or not pre- vulcanised",
    "source": "data\\raw\\hsn_codes.json"
  }
}


## 3. Knowledge Graph Construction

The structured documents are now used to build a Knowledge Graph. This graph models the explicit hierarchical relationships between HSN codes, enabling advanced contextual reasoning.

In [27]:
print("--- Starting Knowledge Graph Construction ---")
try:
    kg_backend = NetworkXBackend()
    kg_builder = KnowledgeGraphBuilder(kg_backend)
    kg_builder.load_documents(Path(settings.data_paths.processed_docs))
    kg_builder.build_hsn_knowledge_graph()
    kg_builder.enrich_with_semantic_relationships(settings.knowledge_graph.llm_enrichment.model_dump())
    kg_builder.validate_graph_integrity()
    stats = kg_builder.generate_graph_statistics()
    print(f"\nKnowledge Graph Statistics: {stats}")
    print("\n--- Demonstrating Graph Traversal for HSN 40011010 (Ancestors) ---")
    ancestors = kg_builder.traverse_hierarchy(hsn_code="40011010", direction='up')
    print(json.dumps(ancestors, indent=2))
    print("\n--- Generating Interactive Subgraph Visualization ---")
    subgraph_viz_path = Path("data/processed/subgraph_visualization.html")
    subgraph_nx = kg_builder.get_context_subgraph(hsn_code="40011010", depth=2)
    temp_backend = NetworkXBackend()
    temp_backend.graph = subgraph_nx
    temp_builder = KnowledgeGraphBuilder(temp_backend)
    temp_builder.visualize_graph_structure(subgraph_viz_path)
    
    print(f"Interactive visualization saved to {subgraph_viz_path}")
    
except Exception as e:
    print(f"An error occurred during Knowledge Graph construction: {e}")

--- Starting Knowledge Graph Construction ---
2025-09-14 19:32:29 - [32mINFO    [0m - src.graph_backends:__init__:50 - Initializing NetworkX backend.[0m
2025-09-14 19:32:29 - [32mINFO    [0m - src.graph_builder:load_documents:31 - Loading structured documents from data\processed\structured_hsn_documents.json...[0m
2025-09-14 19:32:29 - [32mINFO    [0m - src.graph_builder:load_documents:34 - Loaded 1667 documents.[0m
2025-09-14 19:32:29 - [32mINFO    [0m - src.graph_builder:build_hsn_knowledge_graph:45 - Building the hierarchical HSN knowledge graph...[0m
2025-09-14 19:32:29 - [32mINFO    [0m - src.graph_builder:build_hsn_knowledge_graph:50 - Hierarchical graph construction complete.[0m
2025-09-14 19:32:29 - [32mINFO    [0m - src.performance_monitor:wrapper:28 - Execution of 'build_hsn_knowledge_graph' took 13.26 ms.[0m
2025-09-14 19:32:29 - [32mINFO    [0m - src.graph_builder:enrich_with_semantic_relationships:87 - Enriching graph with semantic relationships...[0m


### Interactive Subgraph for HSN 40011010
The cell below embeds the interactive visualization of the local neighborhood around the HSN code for "Prevulcanised" rubber. You can pan, zoom, and click on nodes.

In [28]:
IFrame(src=subgraph_viz_path, width=800, height=600)

## 4. RAG System Implementation
With the data processed and the knowledge graph built, we now initialize the core RAG system. This involves setting up the vector store, the retrieval strategy, and the generator.

In [29]:
print("--- Initializing HSN RAG System ---")
print(f"Using Generator Backend: {settings.rag_system.generator.backend}")
print(f"Using Retrieval Strategy: {settings.rag_system.retrieval.strategy}")

async def initialize_rag():
    try:
        global rag_system # Make rag_system available to other cells
        vector_store = ChromaBackend()
        
        if settings.rag_system.generator.backend == "gemini":
            generator = GeminiGeneratorBackend()
        else:
            generator = MockGeneratorBackend()
            
        retrieval_strategy = GraphContextualStrategy(kg_builder)
        rag_system = HSNRAGSystem(vector_store, generator, retrieval_strategy)
        await rag_system.initialize_vector_store(structured_documents)
        
        print("\n--- RAG System Initialized Successfully ---")
        return rag_system
    except Exception as e:
        print(f"An error occurred during RAG system initialization: {e}")
        return None

rag_system = asyncio.run(initialize_rag())

--- Initializing HSN RAG System ---
Using Generator Backend: gemini
Using Retrieval Strategy: graph_contextual
2025-09-14 19:32:30 - [32mINFO    [0m - src.rag_backends:__init__:89 - Initialized GeminiGeneratorBackend with model: gemini-2.0-flash[0m
2025-09-14 19:32:33 - [32mINFO    [0m - src.rag_backends:initialize:39 - Initializing ChromaDB with 1667 documents...[0m
2025-09-14 19:33:12 - [32mINFO    [0m - src.rag_backends:initialize:48 - ChromaDB initialization complete.[0m

--- RAG System Initialized Successfully ---


## 5. Query Processing and Testing
This section demonstrates the HSNQueryProcessor in action. We will run our required test cases to showcase its ability to handle different query types, manage ambiguity, and provide intelligent responses.

In [30]:
print("--- Running Assignment Test Case Scenarios ---")

async def run_assignment_scenarios():
    if not rag_system:
        print("RAG system not initialized. Aborting scenarios.")
        return

    query_processor = HSNQueryProcessor(rag_system)
    conversation = ConversationState()

    # Test cases directly from the assignment PDF
    scenarios = [
        {"id": "TC1 & TC4", "query": "natural rubber latex", "description": "Direct Product & Similar Products Disambiguation"},
        {"id": "TC2", "query": "HSN code for prevulcanised rubber", "description": "Specific Product Type"},
        {"id": "TC3", "query": "Rubber products classification", "description": "Broad Category Query"},
        {"id": "TC5", "query": "Tell me about HSN 40011010", "description": "Direct HSN Lookup"}
    ]

    for scenario in scenarios:
        print(f"\n" + "="*70)
        print(f"EXECUTING: {scenario['id']} ({scenario['description']})")
        print(f"USER QUERY: \"{scenario['query']}\"")
        print("="*70)
        
        response = await query_processor.process_query(scenario['query'], conversation)
        
        print(f"\nSYSTEM RESPONSE:\n{response['summary']}")
        
        if response.get("type") == "disambiguation":
            user_choice = "1"
            print(f"\n" + "-"*70)
            print(f"SIMULATING USER FOLLOW-UP (Choosing Option 1)")
            print(f"USER QUERY: \"{user_choice}\"")
            print("-"*70)
            
            follow_up_response = await query_processor.process_query(user_choice, conversation)
            print(f"\nSYSTEM RESPONSE:\n{follow_up_response['summary']}")

asyncio.run(run_assignment_scenarios())

--- Running Assignment Test Case Scenarios ---

EXECUTING: TC1 & TC4 (Direct Product & Similar Products Disambiguation)
USER QUERY: "natural rubber latex"
2025-09-14 19:33:12 - [32mINFO    [0m - src.performance_monitor:wrapper:28 - Execution of 'process_query' took 0.15 ms.[0m
2025-09-14 19:33:12 - [37mDEBUG   [0m - src.query_processor:_parse_natural_language:102 - Query 'natural rubber latex' parsed with intent: classification[0m
2025-09-14 19:33:13 - [37mDEBUG   [0m - src.utils:record:27 - Metric recorded: retrieval_time_ms = 582.4240999936592[0m
2025-09-14 19:33:13 - [32mINFO    [0m - src.query_processor:_identify_ambiguous_cases:83 - Ambiguity detected. Scores: 8.1995 vs 8.0733[0m

SYSTEM RESPONSE:
I found a few possible matches. To give you the most accurate HSN code, please help me clarify:

**Option 1: HSN Code 40011010**
- Description: Prevulcanised
- Context: This code is for products under the category of 'Chapter: Rubber And Articles Thereof.. Heading: Natural ru

## 6. System Evaluation and Metrics
A production system requires automated, rigorous testing. This final section executes our pytest framework from within the notebook to provide a comprehensive report on the system's quality, correctness, and performance.
### 6.1. Functional & Integration Testing
This command runs our unit and integration tests. It validates the core logic and the end-to-end flow of the system against our pre-defined test scenarios.

In [31]:
!pytest -v

platform win32 -- Python 3.13.2, pytest-8.3.5, pluggy-1.5.0 -- C:\Users\AAYUSH\AppData\Local\Programs\Python\Python313\python.exe
cachedir: .pytest_cache
benchmark: 5.1.0 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000)
rootdir: d:\SHREYAS\WORKABLEAI
configfile: pytest.ini
plugins: anyio-4.9.0, hydra-core-1.3.2, langsmith-0.3.45, asyncio-0.26.0, asyncio-cooperative-0.40.0, benchmark-5.1.0, socket-0.7.0, syrupy-4.9.1, typeguard-4.4.4
asyncio: mode=Mode.STRICT, asyncio_default_fixture_loop_scope=None, asyncio_default_test_loop_scope=function
[1mcollecting ... [0mcollected 2 items

tests/test_01_unit/test_query_processor_unit.py::test_parse_natural_language_intent_detection [32mPASSED[0m[32m [ 50%][0m
tests/test_02_integration/test_rag_system_integration.py::test_full_system_scenarios [32mPASSED[0m[32m [100%][0m



The event loop scope for asynchronous fixtures will default to the fixture caching scope. Future versions of pytest-asyncio will default the loop scope for asynchronous fixtures to function scope. Set the default fixture loop scope explicitly in order to avoid unexpected behavior in the future. Valid fixture loop scopes are: "function", "class", "module", "package", "session"



### **Interpretation of Functional Test Results**

The test suite above executes all unit and integration tests. A **"PASSED"** result for all tests indicates that:
1.  The core logic of the `HSNQueryProcessor` (intent detection, relevance checks) is working correctly in isolation.
2.  The end-to-end system correctly handles all specified scenarios, including direct answers, disambiguation, and graceful failure for out-of-scope queries.
3.  The data pipelines, knowledge graph, and RAG system are correctly integrated and functioning as a cohesive unit.

A "green" test suite provides high confidence in the system's correctness and readiness for deployment.

### 6.2. Performance Benchmarking
This command runs the isolated performance tests. It measures the execution time of key operations, providing a baseline for performance monitoring and future optimization.

In [32]:
!pytest -v tests/test_03_performance/

platform win32 -- Python 3.13.2, pytest-8.3.5, pluggy-1.5.0 -- C:\Users\AAYUSH\AppData\Local\Programs\Python\Python313\python.exe
cachedir: .pytest_cache
benchmark: 5.1.0 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000)
rootdir: d:\SHREYAS\WORKABLEAI
configfile: pytest.ini
plugins: anyio-4.9.0, hydra-core-1.3.2, langsmith-0.3.45, asyncio-0.26.0, asyncio-cooperative-0.40.0, benchmark-5.1.0, socket-0.7.0, syrupy-4.9.1, typeguard-4.4.4
asyncio: mode=Mode.STRICT, asyncio_default_fixture_loop_scope=None, asyncio_default_test_loop_scope=function
[1mcollecting ... [0mcollected 2 items

tests/test_03_performance/test_performance_benchmarks.py::test_benchmark_simple_query [32mPASSED[0m[32m [ 50%][0m
tests/test_03_performance/test_performance_benchmarks.py::test_benchmark_ambiguous_query [32mPASSED[0m[32m [100%][0m


[33m-------------------------------------------------------

The event loop scope for asynchronous fixtures will default to the fixture caching scope. Future versions of pytest-asyncio will default the loop scope for asynchronous fixtures to function scope. Set the default fixture loop scope explicitly in order to avoid unexpected behavior in the future. Valid fixture loop scopes are: "function", "class", "module", "package", "session"



### **Interpretation of Performance Test Results**

The benchmark tests measure the latency (in milliseconds) for two key scenarios: a simple, direct query and a more complex, ambiguous query. The results provide a quantitative baseline for system performance under ideal conditions (using a mock LLM).

In a production environment, these benchmarks would be run regularly to:
-   Detect performance regressions after code changes.
-   Evaluate the impact of new models or architectural changes.
-   Establish Service Level Objectives (SLOs) for API response times.

## 6.3. Recommendations for Production Deployment
The system has been successfully built and validated. To move to a full production environment, the following steps are recommended:
1. Switch to Production Backends:
- LLM: Update the configuration to use the gemini backend and store the GEMINI_API_KEY in a secure secrets manager (e.g., AWS Secrets Manager, HashiCorp Vault).
- Vector Store: For high-availability and scalability, migrate from the local ChromaDB instance to a managed vector database like Pinecone, Weaviate, or Chroma's own managed service.
- Knowledge Graph: For larger datasets and complex graph queries, migrate from the in-memory NetworkX backend to a persistent graph database like Neo4j.
2. Containerization & Deployment:
- Package the entire application (including all src modules) into a Docker container.
- Deploy the container as a scalable service using Kubernetes or a serverless platform like AWS Fargate or Google Cloud Run.
- Expose the HSNQueryProcessor's functionality via a REST API (e.g., using FastAPI) for integration with frontend applications.
3. CI/CD Pipeline:
- Implement a full CI/CD pipeline (e.g., using GitHub Actions) that automatically runs the pytest suite on every code change, ensuring that no regressions are introduced.
4. Monitoring & Observability:
- Integrate a monitoring stack like Prometheus and Grafana to track performance metrics (query latency, throughput) and system health.
- Use a tool like LangSmith or Arize AI to monitor the quality of LLM inputs and outputs, track token usage, and detect data drift or performance degradation over time.