**Please select the T4-GPU runtime before proceeding, as the embedding generation model requires a GPU for fast inference.**

# Project Overview

---

This project focuses on developing a Retrieval-Augmented Generation (RAG) based AI chatbot built on top of the Python Stack Overflow Question Answering Dataset. The chatbot is designed to answer any query related to the Python programming language. It matches user input queries with questions available in the dataset, retrieves relevant questions and metadata, and then passes the retrieved answers as context to a language model (LLM) to generate accurate responses.

### Core Functionality

The bot's main functionality involves generating embeddings of cleaned questions, which are stored in a vector database. When a user asks a question related to Python, the bot generates an embedding of the input query, matches it with questions stored in the database, and retrieves the answers to the top three matched questions. These answers are then passed to the LLM as context to generate an accurate response.

### Pipelines

The project consists of three main pipelines:

1. **Data Processing Pipeline:**
   - **Cleaning and Transformation:** This pipeline is responsible for cleaning and transforming the data into the required format. It removes HTML tags to improve readability and context, handles null values, standardizes date formats, and performs text cleaning.
   - **Storage:** The cleaned data is stored in an SQLite database for persistent storage. SQLite database is selected for simplicty according to the scope of this project. For questions with multiple answers, each answer is stored in a separate row, grouped by the question, to reduce redundancy and dataset size.
   - **Optimization:** Parallel processing is used to clean and store data quickly, making the solution scalable for larger datasets. SQLite is used in WAL mode to handle data efficiently and in parallel. Indexes are created for efficient querying from the database.

2. **Vector Database Pipeline:**
   - **Embedding Generation:** This pipeline generates embeddings using an open-source small 450M parameter model and proprietary VoyageAI models. While the open-source model is good for testing, the VoyageAI model provides better results for production use.
   - **Storage:** ChromaDB is used as the vector database. Batch processing is employed for efficient and fast embedding generation and upserting vectors into the database.

3. **Inference Pipeline:**
   - **Query Processing:** This pipeline receives user queries, generates embeddings using the same model used for dataset embedding generation, and queries the vector database for the top relevant questions.
   - **Answer Retrieval:** It retrieves the question IDs from the metadata of the matched vectors and fetches the corresponding answers from the database. These answers are then passed as context to the LLM to generate the final response.

By utilizing these pipelines, the chatbot can provide accurate and relevant answers to Python programming queries, enhancing the user's experience and ensuring reliable information retrieval.

Please install the required dependencies. The kernel will automatically prompt you for a restart; kindly restart the kernel when prompted.

In [2]:
# !pip install pinecone voyageai protoc_gen_openapiv2 numpy==1.24.4 langchain_chroma langchain_voyageai langchain_huggingface
# exit(0)

In [None]:
import os
VoyageAI_API_KEY = os.getenv("VOYAGEAI_API_KEY") 
hf_client_API_KEY = os.getenv("hf_clent_API_KEY")


In [None]:
import pandas as pd
import sqlite3
from datetime import datetime
import re
from bs4 import BeautifulSoup
import ast
from typing import Dict, List, Any, Union, Tuple, Iterator
import json
import numpy as np
from urllib.parse import unquote
import logging
from dataclasses import dataclass
from enum import Enum
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing
from functools import partial
import os
from tqdm import tqdm
import concurrent.futures
import time
import csv
import gdown
import warnings


# Ignore all warnings
warnings.filterwarnings('ignore')

# **Stack Overflow Python Questions Dataset**
A collection of Python-related questions and answers from Stack Overflow's. The dataset contains technical discussions covering Python programming topics. Dataset includes a mix of structured and unstructured data.
Each entry includes:

* Question title
* Question Body
* Question_Id
* Answer content with code examples
* Answer_id
* Timestamps for posts
* Community voting scores
* Technical tags
* Unique identifiers

The data represents authentic programming challenges and solutions from the Python community during Stack Overflow's initial launch period, offering insights into early Python development practices and knowledge sharing.

Originally, dataset is consists of more than 1 Million sample points, but for the simplicty of the problem i reduced it to 25000 sample points and store it into CSV file.

This cell will download the CSV File from the google drive and save it

In [4]:
dataset_file_path = "/content/python_stackover_flow_dataset.csv"
url = 'https://drive.google.com/uc?id=1VuXXeiMlGn0utMZWrEXpDSkoqx8hC630'
gdown.download(url, dataset_file_path, quiet=False)

Downloading...
From: https://drive.google.com/uc?id=1VuXXeiMlGn0utMZWrEXpDSkoqx8hC630
To: /content/python_stackover_flow_dataset.csv
100%|██████████| 41.2M/41.2M [00:00<00:00, 116MB/s]


'/content/python_stackover_flow_dataset.csv'

Cell reading the csv file

In [5]:
df =pd.read_csv(dataset_file_path)
print(f"Shape of the dataset: {df.shape}")
df.head()

Shape of the dataset: (25000, 10)


Unnamed: 0,title,question_id,question_body,question_score,question_date,answer_id,answer_body,answer_score,answer_date,tags
0,How can I find the full path to a font from it...,469,<p>I am using the Photoshop's javascript API t...,21,2008-08-02T15:11:16Z,497,<p>open up a terminal (Applications-&gt;Utilit...,4,2008-08-02T16:56:53Z,"['python', 'osx', 'fonts', 'photoshop']"
1,How can I find the full path to a font from it...,469,<p>I am using the Photoshop's javascript API t...,21,2008-08-02T15:11:16Z,518,<p>I haven't been able to find anything that d...,2,2008-08-02T17:42:28Z,"['python', 'osx', 'fonts', 'photoshop']"
2,How can I find the full path to a font from it...,469,<p>I am using the Photoshop's javascript API t...,21,2008-08-02T15:11:16Z,3040,<p>Unfortunately the only API that isn't depre...,12,2008-08-06T03:01:23Z,"['python', 'osx', 'fonts', 'photoshop']"
3,How can I find the full path to a font from it...,469,<p>I am using the Photoshop's javascript API t...,21,2008-08-02T15:11:16Z,195170,<p>There must be a method in Cocoa to get a li...,1,2008-10-12T07:02:40Z,"['python', 'osx', 'fonts', 'photoshop']"
4,Get a preview JPEG of a PDF on Windows?,502,<p>I have a cross-platform (Python) applicatio...,27,2008-08-02T17:01:58Z,536,<p>You can use ImageMagick's convert utility f...,9,2008-08-02T18:49:07Z,"['python', 'windows', 'image', 'pdf']"


# DataPreprocessor functions

A data cleaning and transformation pipeline for StackOverflow question/answer data.


### handle_missing_values(df)
Fills missing values based on data type:
- Numbers → 0
- Text → empty string
- Dates → minimum timestamp

### clean_html(html_text)
Returns `(cleaned_text, code_blocks)`:
- Extracts `<code>` blocks (Removes code from the questions, so that better contextual embeddings can be learned and better retrieval results can be achieved)
- Removes scripts/styles
- Replaces URLs with "[URL]" (URLs also removed becasue it may reduces the retrival accuracy)
- Cleans whitespace

### transform_stackoverflow_data(df)
Transforms data to question-centric format:
- Groups by question_id
- Keeps top 2 answers per question (On average 4-5 answers are available for each question, for simplicity i keep only top 2 answer on the basis of score.
- Returns one row per question


- Parallel processing with ThreadPoolExecutor (max number of workers can be configured depending on the system cores, it helps to precess the larger files parallely and and enhance the scalabilty of the pipeline)
- Comprehensive error logging (logging is added os that any issue raised in the pipeline can be detected through the logs)

In [6]:
# Define possible data quality issues
class DataQualityIssue(Enum):
    MISSING_VALUE = "missing_value"
    INVALID_FORMAT = "invalid_format"
    INVALID_DATE = "invalid_date"
    HTML_PARSING_ERROR = "html_parsing_error"
    JSON_PARSING_ERROR = "json_parsing_error"


# Data structure for quality reports
@dataclass
class DataQualityReport:
    total_rows: int
    missing_values: Dict[str, int]
    invalid_formats: Dict[str, int]
    cleaning_actions: List[str]



class DataPreprocessor:
    def __init__(self):

        """Initialize the preprocessor with common regex patterns and configs."""
        self.url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
        self.code_block_pattern = re.compile(r'<code>(.*?)</code>', re.DOTALL)
        self.special_chars_pattern = re.compile(r'[^\w\s]')

        # Setup logging configuration
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)

    def handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Handle missing values based on column type and business rules.
        Returns DataFrame with handled missing values.
        """
        # Create copy to avoid modifying original
        df = df.copy()

        # Fill missing values based on data type
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        text_cols = df.select_dtypes(include=['object']).columns
        date_cols = [col for col in df.columns if 'date' in col.lower()]

        # Handle numeric columns
        df[numeric_cols] = df[numeric_cols].fillna(0)

        # Handle text columns
        df[text_cols] = df[text_cols].fillna("")

        # Handle date columns
        for col in date_cols:
            df[col] = pd.to_datetime(df[col], errors='coerce')
            df[col] = df[col].fillna(pd.Timestamp.min)

        return df

    def clean_html(self, html_text: str) -> Tuple[str, List[str]]:
        """
        HTML cleaning with code block preservation and metadata extraction.
        Returns cleaned text and list of extracted code blocks.
        """
        if pd.isna(html_text):
            return "", []

        try:
            # Extract code blocks before cleaning
            code_blocks = self.code_block_pattern.findall(html_text)

            # Use BeautifulSoup for HTML parsing
            soup = BeautifulSoup(html_text, 'html.parser')

            # Remove script and style elements
            for script in soup(["script", "style"]):
                script.decompose()

            # Handle URLs - replace with placeholder
            text = soup.get_text()
            text = self.url_pattern.sub("[URL]", text)

            # Clean whitespace
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            text = ' '.join(chunk for chunk in chunks if chunk)

            return text, code_blocks

        except Exception as e:
            self.logger.error(f"HTML cleaning error: {str(e)}")
            return "", []

    def validate_dates(self, date_str: str) -> bool:
        """Validate date string format and range."""
        try:
            date = pd.to_datetime(date_str)
            min_date = pd.Timestamp('2008-01-01')
            max_date = pd.Timestamp.now()
            return min_date <= date <= max_date
        except:
            return False


    def transform_stackoverflow_data(self,df):
      """
      Transform StackOverflow data to have answers as JSON for each unique question.

      Args:
          df (pandas.DataFrame): Input DataFrame with question and answer data

      Returns:
          pandas.DataFrame: Transformed DataFrame with one row per question and answers as JSON
      """
      # Create a list to store the transformed data
      transformed_data = []

      # Group by question_id
      grouped = df.groupby('question_id')

      for question_id, group in grouped:
          # Get question details (will be same for all rows in group)
          question_data = {
              'question_id': int(question_id),
              'title': group['title'].iloc[0],
              'question_body': group['question_body'].iloc[0],
              'question_score': group['question_score'].iloc[0],
              'question_date': group['question_date'].iloc[0],
              'tags': group['tags'].iloc[0]
          }

          # Create answers list
          answers,answer_ids = [],[]
          for _, row in group.iterrows():
              answer = {
                  'answer_score': row['answer_score'],
                  'answer_id': int(row['answer_id']),
                  'answer_body': row['answer_body'],
                  'answer_date': row['answer_date']
              }
              answer_ids.append(str(row['answer_id']))
              answers.append(answer)

          # Add answers to question data
          sorted_answer = list(sorted(answers, key=lambda x: x['answer_score'], reverse=True))
          question_data['answers'] = sorted_answer[:2]
          question_data["answer_ids"] = answer_ids[:2]

          transformed_data.append(question_data)

      # Create new DataFrame from transformed data
      result_df = pd.DataFrame(transformed_data)

      # Sort by question_id for consistency
      result_df = result_df.sort_values('question_id').reset_index(drop=True)

      return result_df

    def clean_chunk(self, df_chunk: pd.DataFrame) -> pd.DataFrame:
        """Process a single chunk of data."""
        try:
            # Handle missing values
            df_chunk = self.handle_missing_values(df_chunk)

            # Process text columns in parallel using ThreadPoolExecutor
            with ThreadPoolExecutor() as executor:
                for col in ['question_body', 'answer_body']:
                    # Clean HTML and extract code blocks

                    cleaned_results = list(executor.map(self.clean_html, df_chunk[col]))
                    df_chunk[f'{col}_cleaned'] = [result[0] for result in cleaned_results]
                    df_chunk[f'{col}_code_blocks'] = [result[1] for result in cleaned_results]

                    # Calculate metrics
                    df_chunk[f'{col}_url_count'] = df_chunk[col].str.count(self.url_pattern)
                    df_chunk[f'{col}_length'] = df_chunk[f'{col}_cleaned'].str.len()

            # Process dates
            date_cols = ['question_date', 'answer_date']
            for col in date_cols:
                df_chunk[col] = pd.to_datetime(df_chunk[col], errors='coerce')

            # Create preprocessing metadata
            df_chunk['preprocessing_metadata'] = df_chunk.apply(
                lambda row: json.dumps({
                    'question_urls': row['question_body_url_count'],
                    'answer_urls': row['answer_body_url_count'],
                    'question_length': row['question_body_length'],
                    'answer_length': row['answer_body_length'],
                    'has_code': bool(row['question_body_code_blocks'] or row['answer_body_code_blocks'])
                }), axis=1
            )

            return df_chunk

        except Exception as e:
            self.logger.error(f"Error processing chunk: {str(e)}")
            raise

# StackOverflowPipeline

## Initialization

- **`__init__(self, db_path: str = 'stackoverflow.db', chunk_size: int = 10000)`**
  - Initializes the pipeline with a database path and chunk size.
  - Sets up logging to monitor operations. Persistant logging is added to keep the track of the processes in the pipeline, if something fishy we can track the logs in the CSV file.
  - Configures the number of workers based on CPU cores.

## Database Schema

- **`create_database_schema(self)`**
  - Schema Creation: The method defines the structure of the database, including tables, columns, and data types. It ensures that the database is set up to store questions, answers, and metadata in a way that minimizes redundancy and optimizes access speed.

  - Optimized Settings: The schema is created with various SQLite settings that enhance performance:

    - WAL Mode (Write-Ahead Logging): WAL mode improves the speed of read and write operations by logging changes before they are written to the database. This allows for concurrent reads and writes, increasing overall throughput and reducing the chances of database locks during heavy access.

    - Synchronous Mode (Normal): In normal synchronous mode, SQLite ensures a balance between performance and data integrity. This reduces the write latency while still maintaining a reasonable level of data safety.

    - Cache Configuration: The method sets an appropriate cache size to allow frequently accessed data to be stored in memory.


## Indices

- **`create_indices(self, conn)`**
  - Creates database indices separately to improve insertion performance and query efficiency, it would be helpful if large data is pushed into DB
  - Faster Lookups: It creates a data structure that speeds up queries that search or filter by question_id in the stackoverflow_posts table.

## Data Transformation and Cleaning

- **`transform_and_clean_chunk(self, chunk: pd.DataFrame) -> pd.DataFrame`**
  - Transforms and cleans a chunk of data in one pass.
  - Uses parallel processing to clean HTML content.
  - Calculates metrics like URL count and body length.
  - Converts dates and stores answers and answer IDs as JSON strings.
  - Creates preprocessing metadata.

## Data Insertion

- **`bulk_insert_chunk(self, conn: sqlite3.Connection, chunk: pd.DataFrame)`**
  - Efficiently inserts a processed chunk into the database using bulk insertion.
  - Prepares data for insertion and uses `executemany` for better performance in batch insertion.

## Data Ingestion

- **`ingest_data(self, csv_path: str)`**
  - Handles the entire data ingestion process using parallel processing.
  - Creates the database schema and connection pool.
  - Processes chunks of data in parallel and inserts them into the database.
  - Creates indices after all data is loaded.

## Query Execution

- **`query_data(self, query: str) -> pd.DataFrame`**
  - Executes a SQL query and returns the results as a DataFrame.
  - Handles errors and logs execution details.


In [7]:
class LogsCSVFormatter(logging.Formatter):
    """Custom formatter to output logs in CSV format"""
    def __init__(self):
        super().__init__()
        self.output_fields = ['timestamp', 'level', 'module', 'function', 'message']

    def format(self, record):
        timestamp = datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S')
        return {
            'timestamp': timestamp,
            'level': record.levelname,
            'module': record.module,
            'function': record.funcName,
            'message': record.getMessage()
        }

class CSVFileHandler(logging.FileHandler):
    """Custom file handler to write logs to CSV file"""
    def __init__(self, filename, mode='a'):
        super().__init__(filename, mode)
        self.formatter = LogsCSVFormatter()

        # Create CSV file with headers if it doesn't exist
        if not os.path.exists(filename) or os.path.getsize(filename) == 0:
            with open(filename, 'w', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=self.formatter.output_fields)
                writer.writeheader()

    def emit(self, record):
        try:
            with open(self.baseFilename, 'a', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=self.formatter.output_fields)
                writer.writerow(self.formatter.format(record))
        except Exception:
            self.handleError(record)

In [8]:
# Data Ingestion pipeline.
class DataIngestionPipeline:
    def __init__(self, db_path: str = 'stackoverflow.db', chunk_size: int = 10000, log_file: str = 'pipeline_logs.csv'):
        """Initialize the pipeline with configurable chunk size and logging."""
        self.db_path = db_path
        self.chunk_size = chunk_size
        self.preprocessor = DataPreprocessor()

        # Setup logging
        self.logger = logging.getLogger(__name__)
        if not self.logger.handlers:
            # Console handler with standard formatting
            console_handler = logging.StreamHandler()
            console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            console_handler.setFormatter(console_formatter)

            # CSV file handler
            csv_handler = CSVFileHandler(log_file)

            # Add both handlers
            self.logger.addHandler(console_handler)
            self.logger.addHandler(csv_handler)
            self.logger.setLevel(logging.INFO)

        # Configure the number of workers based on CPU cores
        self.max_workers = max(1, 2)
        self.logger.info(f"Initialized pipeline with {self.max_workers} workers")

    def create_database_schema(self):
        """Create database schema with optimized settings."""
        self.logger.info("Creating database schema")
        with sqlite3.connect(self.db_path) as conn:
            # Enable WAL mode for better concurrent access
            conn.execute('PRAGMA journal_mode=WAL')
            conn.execute('PRAGMA synchronous=NORMAL')
            conn.execute('PRAGMA cache_size=100000')
            conn.execute('PRAGMA temp_store=MEMORY')

            conn.execute("""
                CREATE TABLE IF NOT EXISTS stackoverflow_posts (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    question_id INTEGER NOT NULL,
                    title TEXT,
                    question_body TEXT,
                    question_score INTEGER,
                    question_date TEXT,
                    tags TEXT,
                    answers TEXT,  -- Store answers as JSON array
                    answer_ids TEXT,  -- Store answer IDs as JSON array
                    question_body_cleaned TEXT,
                    question_body_code_blocks TEXT,
                    question_body_url_count INTEGER,
                    question_body_length INTEGER,
                    preprocessing_metadata TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
        self.logger.info("Database schema created successfully")

    def create_indices(self, conn):
        """Create indices separately for better insertion performance."""
        self.logger.info("Creating database indices")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_question_id ON stackoverflow_posts(question_id)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_question_date ON stackoverflow_posts(question_date)")
        self.logger.info("Database indices created successfully")

    def transform_and_clean_chunk(self, chunk: pd.DataFrame) -> pd.DataFrame:
        """Transform and clean a chunk of data in one pass."""
        try:
            chunk_size = len(chunk)
            self.logger.debug(f"Processing chunk of size {chunk_size}")

            # First transform the data to group by questions
            transformed_df = self.preprocessor.transform_stackoverflow_data(chunk)

            # Clean the transformed data
            processed_df = transformed_df.copy()

            # Process in parallel using ThreadPoolExecutor
            with ThreadPoolExecutor() as executor:
                # Clean question body HTML
                cleaned_results = list(executor.map(
                    self.preprocessor.clean_html,
                    processed_df['question_body']
                ))

                processed_df['question_body_cleaned'] = [result[0] for result in cleaned_results]
                processed_df['question_body_code_blocks'] = [result[1] for result in cleaned_results]

            # Calculate metrics and process remaining fields
            processed_df['question_body_url_count'] = processed_df['question_body'].apply(
                lambda x: len(self.preprocessor.url_pattern.findall(x))
            )
            processed_df['question_body_length'] = processed_df['question_body_cleaned'].str.len()

            # Convert dates
            processed_df['question_date'] = pd.to_datetime(
                processed_df['question_date'],
                errors='coerce'
            )

            # Store answers and answer_ids as JSON strings
            processed_df['answers'] = processed_df['answers'].apply(json.dumps)
            processed_df['answer_ids'] = processed_df['answer_ids'].apply(json.dumps)

            # Create preprocessing metadata
            processed_df['preprocessing_metadata'] = processed_df.apply(
                lambda row: json.dumps({
                    'question_urls': row['question_body_url_count'],
                    'question_length': row['question_body_length'],
                    'has_code': bool(row['question_body_code_blocks']),
                    'num_answers': len(json.loads(row['answers'])),
                }), axis=1
            )

            self.logger.debug(f"Successfully processed chunk of {chunk_size} rows")
            return processed_df

        except Exception as e:
            self.logger.error(f"Error processing chunk: {str(e)}")
            raise

    def bulk_insert_chunk(self, conn: sqlite3.Connection, chunk: pd.DataFrame):
        """Efficiently insert a processed chunk into the database."""
        try:
            chunk_size = len(chunk)
            self.logger.debug(f"Inserting chunk of {chunk_size} rows into database")

            # Prepare data for insertion
            data = [
                (
                    int(row['question_id']),
                    str(row['title']),
                    str(row['question_body']),
                    int(row['question_score']),
                    str(row['question_date']),
                    str(row['tags']),
                    str(row['answers']),
                    str(row['answer_ids']),
                    str(row['question_body_cleaned']),
                    json.dumps(row['question_body_code_blocks']),
                    int(row['question_body_url_count']),
                    int(row['question_body_length']),
                    str(row['preprocessing_metadata'])
                )
                for _, row in chunk.iterrows()
            ]

            # Bulk insert using executemany
            conn.executemany("""
                INSERT INTO stackoverflow_posts (
                    question_id, title, question_body, question_score,
                    question_date, tags, answers, answer_ids,
                    question_body_cleaned, question_body_code_blocks,
                    question_body_url_count, question_body_length,
                    preprocessing_metadata
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, data)

            self.logger.debug(f"Successfully inserted {chunk_size} rows into database")

        except Exception as e:
            self.logger.error(f"Error inserting chunk: {str(e)}")
            raise

    def ingest_data(self, csv_path: str):
        """Optimized data ingestion method using parallel processing."""
        try:
            self.logger.info(f"Starting data ingestion from {csv_path}")

            # Create database schema
            self.create_database_schema()

            # Create connection pool
            self.logger.info(f"Creating database connection pool with {self.max_workers} connections")
            db_pool = []
            for _ in range(self.max_workers):
                conn = sqlite3.connect(self.db_path)
                conn.execute('PRAGMA journal_mode=WAL')
                conn.execute('PRAGMA synchronous=NORMAL')
                db_pool.append(conn)

            # Get total number of chunks for progress bar
            total_chunks = sum(1 for _ in pd.read_csv(csv_path, chunksize=self.chunk_size))
            self.logger.info(f"Found {total_chunks} total chunks to process")

            # Process chunks in parallel
            with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
                chunks = pd.read_csv(csv_path, chunksize=self.chunk_size)

                # Submit all chunks for processing
                future_to_chunk = {
                    executor.submit(self.transform_and_clean_chunk, chunk): i
                    for i, chunk in enumerate(chunks)
                }

                # Process results as they complete
                with tqdm(total=total_chunks, desc="Processing chunks") as pbar:
                    for future in concurrent.futures.as_completed(future_to_chunk):
                        chunk_idx = future_to_chunk[future]
                        try:
                            processed_chunk = future.result()
                            # Get a connection from the pool
                            conn = db_pool[chunk_idx % len(db_pool)]
                            # Insert the processed chunk
                            self.bulk_insert_chunk(conn, processed_chunk)
                            conn.commit()
                            pbar.update(1)
                        except Exception as e:
                            self.logger.error(f"Error processing chunk {chunk_idx}: {str(e)}")

            # Close all connections
            self.logger.info("Closing database connections")
            for conn in db_pool:
                conn.close()

            # Create indices after all data is loaded
            self.logger.info("Creating final database indices")
            with sqlite3.connect(self.db_path) as conn:
                self.create_indices(conn)

            self.logger.info("Completed data ingestion successfully")

        except Exception as e:
            self.logger.error(f"Fatal error during data ingestion: {str(e)}")
            raise

    def query_data(self, query: str) -> pd.DataFrame:
        """Execute a query and return results as DataFrame."""
        try:
            with sqlite3.connect(self.db_path) as conn:
                return pd.read_sql_query(query, conn)
        except Exception as e:
            self.logger.error(f"Error executing query: {str(e)}")
            raise

    def get_post_statistics(self) -> Dict[str, Any]:
        """Get basic statistical summary of posts."""
        query = """
            SELECT
                COUNT(*) as total_posts,
                AVG(question_score) as avg_score,
                COUNT(DISTINCT question_id) as unique_questions,
                AVG(question_body_length) as avg_question_body_length
            FROM stackoverflow_posts
        """
        return self.query_data(query).iloc[0].to_dict()

    def connect_db(self) -> sqlite3.Connection:
        """Create and return a database connection."""
        return sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES)

In [9]:
if __name__ == "__main__":
    import os
    if os.path.exists('stackoverflow.db'):
        os.remove('stackoverflow.db')

    # Initialize with log file
    data_ingestion_pipeline = DataIngestionPipeline(
        db_path='stackoverflow.db',
        chunk_size=10000,
        log_file='data_ingestion_logs.csv'
    )

    data_ingestion_pipeline.ingest_data(dataset_file_path)

2024-11-05 04:19:38,626 - __main__ - INFO - Initialized pipeline with 2 workers
INFO:__main__:Initialized pipeline with 2 workers
2024-11-05 04:19:38,630 - __main__ - INFO - Starting data ingestion from /content/python_stackover_flow_dataset.csv
INFO:__main__:Starting data ingestion from /content/python_stackover_flow_dataset.csv
2024-11-05 04:19:38,631 - __main__ - INFO - Creating database schema
INFO:__main__:Creating database schema
2024-11-05 04:19:38,647 - __main__ - INFO - Database schema created successfully
INFO:__main__:Database schema created successfully
2024-11-05 04:19:38,657 - __main__ - INFO - Creating database connection pool with 2 connections
INFO:__main__:Creating database connection pool with 2 connections
2024-11-05 04:19:39,151 - __main__ - INFO - Found 3 total chunks to process
INFO:__main__:Found 3 total chunks to process
Processing chunks: 100%|██████████| 3/3 [00:05<00:00,  1.79s/it]
2024-11-05 04:19:45,325 - __main__ - INFO - Closing database connections
INFO

In [None]:
data_ingestion_pipeline.get_post_statistics()

{'total_posts': 5280.0,
 'avg_score': 35.29450757575758,
 'unique_questions': 5279.0,
 'avg_question_body_length': 705.8554924242425}

## Data Injestion to Vector Database(ChromaDB)

# VectorIngestion pipeline

The `VectorIngestion` class is designed to initialize and manage the ingestion of vector embeddings into a vector database.

## Initialization

### `__init__(self, open_source_mode)`
The constructor initializes the vector ingestion pipeline.

- **Parameters:**
  - `open_source_mode` (bool): Determines whether to use open-source embeddings or VoyageAI embeddings.

- **Steps:**
  1. **Retrieve API Key:** Attempts to fetch the `VOYAGEAI_API_KEY` from user data.
  2. **Set Mode:** Stores the `open_source_mode` flag.
  3. **Initialize Hugging Face Client:** Creates an inference client for Hugging Face using a specific API key.
  4. **Set Index Name:** Sets the index name for the vector database to `"chroma_vectorDB"`.
  5. **Model and Collection Setup:**
     - If `open_source_mode` is `True`:
       - Downloads the Hugging Face embeddings model `"all-mpnet-base-v2"`.
       - Sets the collection name to `"hf_collection"`.
     - If `open_source_mode` is `False`:
       - Sets the VoyageAI API key.
       - Initializes the VoyageAI embeddings model with the provided API key and model `"voyage-large-2-instruct"`.
       - Sets the collection name to `"voyageai_collection"`.
  6. **Initialize ChromaDB:**
     - Creates a Chroma vector store instance with the specified collection name, embedding function, and a directory for persisting data locally.


## Data Parsing

### `data_parsing(self, df)`
This method processes a DataFrame to extract documents, metadata, and unique IDs for ingestion into the vector database.

- **Parameters:**
  - `df` (DataFrame): The input data to be processed.

- **Steps:**
  1. **Initialize Lists:** Creates empty lists for documents, metadata, and unique IDs.
  2. **Iterate Over DataFrame Rows:**
     - For each row, combines the title and question body to form a document. Joining Title and question give contextual details to the question it learns better contextual embeddings.
     - Constructs a metadata dictionary with question ID, tags, title, question body, and answer IDs.
     - Generates a unique ID for each document using `uuid`.


## Embedding Generation

### `generate_embeddings(self, documents, input_type)`
Generates embeddings for a list of documents using the specified input type.

- **Parameters:**
  - `documents` (list): The documents to generate embeddings for.
  - `input_type` (str): The type of input for the embedding model.

- **Steps:**

  1. **Set Batch Size:** Defines the batch size for processing documents.
  2. **Initialize Embeddings List:** Creates an empty list to store embeddings.
  3. **Iterate Over Documents in Batches:**
     - Generates embeddings for each batch using the VoyageAI model.
     - Appends the generated embeddings to the embeddings list.
  4. **Return Embeddings:** Outputs the list of embeddings.


## Data Insertion to Vector Database

### `data_insertion_to_VectorDB(self, documents, metadatas, ids)`
Inserts documents, metadata, and unique IDs into the vector database.

- **Parameters:**
  - `documents` (list): The documents to insert.
  - `metadatas` (list): The metadata associated with the documents.
  - `ids` (list): The unique IDs for the documents.

- **Steps:**
  1. **Create Document Objects:** Constructs `Document` objects for each document, combining content, metadata, and ID.
  2. **Add Documents to Vector Store:** Inserts the documents into the vector store using the `add_documents` method.





In [11]:
from pinecone import ServerlessSpec
from pinecone.grpc import PineconeGRPC as Pinecone
from google.colab import userdata
from huggingface_hub import InferenceClient
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_voyageai import VoyageAIEmbeddings
from langchain_core.documents import Document
from langchain_chroma import Chroma
import voyageai
import uuid
import json

In [None]:
class VectorIngestion:
  def __init__(self,open_source_mode):

    self.VOYAGEAI_API_KEY = VoyageAI_API_KEY
    self.open_source_mode = open_source_mode
    self.hf_client = InferenceClient(api_key=hf_client_API_KEY)
    self.index_name = "chroma_vectorDB"


    if self.open_source_mode:
      #dowload model and tokenizer
      embeddings_model = HuggingFaceEmbeddings(model_name="all-mpnet-base-v2")
      collection_name = "hf_collection"

    else:
      # Initialize Voyageai
      embeddings_model = VoyageAIEmbeddings(api_key = self.VOYAGEAI_API_KEY ,  model="voyage-large-2-instruct")
      collection_name = "voyageai_collection"

    # #initialize chromaDB
    self.vector_store = Chroma(
      collection_name=collection_name,
      embedding_function=embeddings_model,
      persist_directory="./chroma_db",  # Where to save data locally, remove if not necessary
    )

    print(f"ChromaDB Vector Store Intialized...\n")
    print(f"Index named {self.index_name} created...\n")
    print(f"Embedding Model Intialized...\n")
    print(f"HuggingFace LLM Client Intialized...\n")


  def data_parsing(self,df):
    print(f"creating the documents, metadata and uniques ids to insert into vector database")
    docs,metadatas,ids = [],[],[]
    for id, row in df.iterrows():

      docs.append(row['title'] + '\n\n' + row["question_body"])
      metadata = {"question_id":row["question_id"],
                  "tags":row['tags'],
                  "title":row["title"],
                  "question_body":row["question_body"],
                  "answer_ids" : json.dumps(row["answer_ids"])
                  }
      metadatas.append(metadata)
      ids.append(str(uuid.uuid4()))
    print(f"length of uniques question: {len(docs),len(metadatas),len(ids)}")
    return docs,metadatas,ids


  def generate_embeddings(self,documents,input_type):
      print(f"\nGenerating the embeddings of the {len(documents)} documents...")
      voyageai.api_key = self.VOYAGEAI_API_KEY
      vo = voyageai.Client()
      # Generate embeddings
      batch_size = 128
      embeddings = []

      for i in tqdm(range(0, len(documents), batch_size)):
          embeddings += vo.embed(
              documents[i:i + batch_size], model="voyage-large-2-instruct", input_type=input_type
          ).embeddings

      return embeddings


  def data_insertion_to_vectordb(
            self,
            documents: List[str],
            metadatas: List[Dict[str, Any]],
            ids: List[str],
            batch_size: int = None
        ) -> bool:
            """
            Insert documents into vector database in batches.

            Args:
                documents: List of text documents
                metadatas: List of metadata dictionaries
                ids: List of document IDs
                batch_size: Optional batch size override

            Returns:
                bool: True if insertion was successful
            """
            if batch_size is None:
                batch_size = self.batch_size

            total_docs = len(documents)
            successful_insertions = 0

            # Process in batches with progress bar
            with tqdm(total=total_docs, desc="Inserting documents") as pbar:
                for i in range(0, total_docs, batch_size):
                    # Get batch slices
                    batch_docs = documents[i:i + batch_size]
                    batch_metadata = metadatas[i:i + batch_size]
                    batch_ids = ids[i:i + batch_size]

                    # Create Document objects for the batch
                    batch_documents = [
                        Document(
                            page_content=doc,
                            metadata=metadata,
                            id=doc_id
                        )
                        for doc, metadata, doc_id in zip(batch_docs, batch_metadata, batch_ids)
                    ]

                    try:
                        # Insert batch into vector store
                        resp = self.vector_store.add_documents(
                            documents=batch_documents,
                            ids=batch_ids
                        )
                        successful_insertions += len(resp)
                        pbar.update(len(batch_docs))

                    except Exception as e:
                        print(f"Error inserting batch {i//batch_size}: {str(e)}")
                        continue

            print(f"Successfully inserted {successful_insertions}/{total_docs} documents")





Open source model is faster because it a small model but prefer to keep ```open_source_flag=True```

In [13]:
open_source_mode = True
inject_vectors = VectorIngestion(open_source_mode)

ChromaDB Vector Store Intialized...

Index named chroma_vectorDB created...

Embedding Model Intialized...

HuggingFace LLM Client Intialized...



In [14]:
#Fetching all the records of python_stackOverFlow as a dataframe from the sqlite database
sql_query = "select * from stackoverflow_posts"
df = data_ingestion_pipeline.query_data(sql_query)

This cell will generate embeddings of the whole dataset and store in vector DB, it will take around 1 and half minute to execute.

In [15]:
#pass the dataframe to data parsing function, which parse the data and generate embeddings store it in the Vector database.
batch_size = 256
documents,metadatas,ids = inject_vectors.data_parsing(df)
inject_vectors.data_insertion_to_vectordb(documents,metadatas,ids,batch_size)

creating the documents, metadata and uniques ids to insert into vector database
length of uniques question: (5280, 5280, 5280)


Inserting documents: 100%|██████████| 5280/5280 [01:55<00:00, 45.54it/s]

Successfully inserted 5280/5280 documents





#Inference Pipeline (Query and Retrieve)

## Initialization

### `__init__(self, ingestion_pipeline, open_source_mode)`
Initializes the inference class by extending the `VectorIngestion` class. Sets up logging to monitor operations. Persistant logging is added to keep the track of the processes in the pipeline, if something fishy we can track the logs in the CSV file.

- **Parameters:**
  - `ingestion_pipeline`: The data ingestion pipeline.
  - `open_source_mode` (bool): Determines whether to use open-source embeddings or VoyageAI embeddings.

- **Attributes:**
  - `ingestion_pipeline`: Stores the ingestion pipeline.
  - `model_name`: The name of the language model to be used (`"mistralai/Mistral-Nemo-Instruct-2407"`).
  - `sql_query`: A template SQL query to fetch answers from the database based on question IDs.
  - `prompt`: A pre-defined prompt template for generating responses from the LLM.


## Retriever

### `retriever(self, query, verbose)`
Retrieves relevant context from the vector database based on the user's query.

- **Parameters:**
  - `query` (str): The user's query.
  - `verbose` (bool): If `True`, prints additional debugging information.

- **Steps:**
  1. **Search Vector Store:** Calls the vector store to retrieve documents similar to the query.
  2. **Extract Metadata:** Extracts and stores metadata from the results.
  3. **Execute SQL Query:** Replaces placeholders in the SQL query and fetches answers.
  4. **Aggregate Context:** Aggregates relevant answers into a single context string.
  5. **Print Debug Info:** If `verbose` is `True`, prints detailed information about the retrieved documents and context.

- **Returns:** Lists of question IDs, titles, tags, question bodies, scores, answer IDs, and the relevant context.



## LLM Call

### `llm_call(self, query, verbose, stream)`
Generates a response to the user's query using a language model.

- **Parameters:**
  - `query` (str): The user's query.
  - `verbose` (bool): If `True`, prints additional debugging information.
  - `stream` (bool): If `True`, streams the response.

- **Steps:**
  1. **Retrieve Context:** Calls the `retriever` method to get relevant context.
  2. **Prepare Prompt:** Replaces placeholders in the prompt template with the query and context.
  3. **Send LLM Request:** Sends the prepared prompt to the language model.
  4. **Print Response:** Prints the model's response, optionally streaming it if `stream` is `True`.



In [16]:
class InferenceClass(VectorIngestion):
    def __init__(self, ingestion_pipeline, open_source_mode, log_file: str = 'logs/inference_logs.csv'):
        """Initialize the inference pipeline with logging."""
        super().__init__(open_source_mode)

        # Setup logging
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)  # Set level before adding handlers

        # Remove any existing handlers to avoid duplicates
        if self.logger.handlers:
            for handler in self.logger.handlers:
                self.logger.removeHandler(handler)

        # Console handler
        console_handler = logging.StreamHandler()
        console_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        console_handler.setFormatter(console_formatter)
        console_handler.setLevel(logging.INFO)

        # CSV file handler
        csv_handler = CSVFileHandler(log_file)
        csv_handler.setLevel(logging.INFO)

        # Add both handlers
        self.logger.addHandler(console_handler)
        self.logger.addHandler(csv_handler)

        self.ingestion_pipeline = ingestion_pipeline
        self.model_name = "mistralai/Mistral-Nemo-Instruct-2407"
        self.logger.info(f"Initialized inference pipeline with model: {self.model_name}")


        self.sql_query = f"""
        SELECT answers
        FROM stackoverflow_posts
        WHERE question_id IN (_question_ids_);
        """

        self.prompt = """You are a helpful assistant. Your task is to answer the user's query related to Python based on the provided context. The context consists of answers from developers to questions asked on Stack Overflow.

Instructions:
Cohesive and Clear Response: Ensure your answer is well-organized, easy to understand, and directly addresses the user's query.
Include Code Examples: If a relevant code example is available in the provided context, include it in your response.
Contextual Limitation: If the answer to the query is not present in the provided context or context is empty, must respond with: "Sorry, the provided context does not contain an answer to the query."

If the answer to the query is not present in the provided context, do not add anything from your knowledge.
Your answer should be less than 500 words.

Query:
__query__

Context:
__context__
"""

    def retriever(self, query, verbose):
        """Retrieve relevant context from vector store."""
        try:
            self.logger.info(f"Starting retrieval for query: {query[:100]}...")  # Log first 100 chars of query

            question_ids = []
            titles = []
            tags = []
            question_bodies = []
            s_scores = []
            answer_ids = []
            relevant_context = []

            self.logger.info("Querying chromaDB vector stores...")
            results = self.vector_store.similarity_search(query, k=3)

            for res in results:
                metadata = res.metadata
                question_ids.append(metadata['question_id'])
                titles.append(metadata['title'])
                question_bodies.append(metadata['question_body'])
                tags.append(metadata['tags'])

            self.logger.debug(f"Retrieved {len(question_ids)} documents from vector store")

            sql_query = self.sql_query.replace("_question_ids_", str(','.join([f'{i}' for i in question_ids])))
            self.logger.debug("Executing SQL query to fetch answers")

            answers = self.ingestion_pipeline.query_data(sql_query)['answers'].to_list()
            relevant_context = "\n\n".join([ans['answer_body'] for answer in answers for ans in json.loads(answer)[:2]])

            total_inputs_vectors = len(question_ids)
            self.logger.info(f"Retrieved {total_inputs_vectors} relevant documents")

            if verbose:
                self.logger.info(f"Question IDs: {question_ids}")
                self.logger.debug(f"Titles: {titles}")
                self.logger.debug(f"Context length: {len(relevant_context)} characters")
                self.logger.debug(f"Context: {relevant_context}")
                print(f"\n\nContext: {relevant_context}\n\n")

            return question_ids, titles, tags, question_bodies, s_scores, answer_ids, relevant_context

        except Exception as e:
            self.logger.error(f"Error in retriever: {str(e)}")
            raise

    def llm_call(self, query, verbose, stream):
        """Make LLM API call with retrieved context."""
        try:
            self.logger.info("Starting LLM call process")

            # Get retrieved context
            retrieval_start = datetime.now()
            question_ids, titles, tags, question_bodies, s_scores, answer_ids, relevant_answers = self.retriever(query, verbose)
            retrieval_time = (datetime.now() - retrieval_start).total_seconds()
            self.logger.info(f"Context retrieval completed in {retrieval_time:.2f} seconds")

            # Prepare prompt
            prompt = self.prompt.replace("__query__", query).replace("__context__", str(relevant_answers))
            messages = [
                {"role": "user", "content": prompt}
            ]

            # Make LLM call
            self.logger.info(f"Making LLM API call to {self.model_name}")
            llm_start = datetime.now()
            response = self.hf_client.chat.completions.create(
                model=self.model_name,
                messages=messages,
                max_tokens=600,
                stream=stream
            )

            print("\n\n")
            print("-"*180)
            print(f"\n\n\nQuestion: {query}\n\nAnswer: ")

            # Handle streaming vs non-streaming response
            if stream:
                response_text = ""
                for chunk in response:
                    chunk_text = chunk.choices[0].delta.content
                    response_text += chunk_text
                    print(chunk_text, end="")
            else:
                response_text = response.choices[0].message.content
                print(response_text)

            llm_time = (datetime.now() - llm_start).total_seconds()
            self.logger.info(f"LLM response completed in {llm_time:.2f} seconds")

            # Log response metadata
            self.logger.debug({
                "query_length": len(query),
                "context_length": len(relevant_answers),
                "response_length": len(response_text),
                "retrieval_time": retrieval_time,
                "llm_time": llm_time
            })

            print("\n")
            print("-"*180)

        except Exception as e:
            self.logger.error(f"Error in LLM call: {str(e)}")
            raise


In [17]:
#Initialize the inference class
inference_instance = InferenceClass(
    ingestion_pipeline=data_ingestion_pipeline,
    open_source_mode=open_source_mode,
    log_file='inference_logs.csv'
)

2024-11-05 04:21:53,037 - __main__ - INFO - Initialized inference pipeline with model: mistralai/Mistral-Nemo-Instruct-2407
INFO:__main__:Initialized inference pipeline with model: mistralai/Mistral-Nemo-Instruct-2407


ChromaDB Vector Store Intialized...

Index named chroma_vectorDB created...

Embedding Model Intialized...

HuggingFace LLM Client Intialized...



**For single query run this cell and pass the query**

In [21]:
# query = "What is python's list comprehension?"
# query = "how to sort python dictionary on the basis of values?"
# query = "what are the generators in python?"
# query = "what are the OOP concepts in python?"
# query = "what is the difference between lists and tuples?"
query = "what GIL lock in python?"
# query = "How to reverse a list in Python?"
# query = "what the whether like in new york"

verbose = True
stream = True
inference_instance.llm_call(query,verbose,stream)

2024-11-05 04:22:26,653 - __main__ - INFO - Starting LLM call process
INFO:__main__:Starting LLM call process
2024-11-05 04:22:26,657 - __main__ - INFO - Starting retrieval for query: what GIL lock in python?...
INFO:__main__:Starting retrieval for query: what GIL lock in python?...
2024-11-05 04:22:26,660 - __main__ - INFO - Querying chromaDB vector stores...
INFO:__main__:Querying chromaDB vector stores...
2024-11-05 04:22:26,744 - __main__ - INFO - Retrieved 3 relevant documents
INFO:__main__:Retrieved 3 relevant documents
2024-11-05 04:22:26,751 - __main__ - INFO - Question IDs: [105095, 105095, 105095]
INFO:__main__:Question IDs: [105095, 105095, 105095]
2024-11-05 04:22:26,755 - __main__ - INFO - Context retrieval completed in 0.10 seconds
INFO:__main__:Context retrieval completed in 0.10 seconds
2024-11-05 04:22:26,759 - __main__ - INFO - Making LLM API call to mistralai/Mistral-Nemo-Instruct-2407
INFO:__main__:Making LLM API call to mistralai/Mistral-Nemo-Instruct-2407




Context: <p>You will still need locks if you share state between threads. The GIL only protects the interpreter internally. You can still have inconsistent updates in your own code.</p>

<p>For example:</p>

<pre><code>#!/usr/bin/env python
import threading

shared_balance = 0

class Deposit(threading.Thread):
    def run(self):
        for _ in xrange(1000000):
            global shared_balance
            balance = shared_balance
            balance += 100
            shared_balance = balance

class Withdraw(threading.Thread):
    def run(self):
        for _ in xrange(1000000):
            global shared_balance
            balance = shared_balance
            balance -= 100
            shared_balance = balance

threads = [Deposit(), Withdraw()]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

print shared_balance
</code></pre>

<p>Here, your code can be interrupted between reading the shared state (<code>balance = shared_balance</code>) and wri

2024-11-05 04:22:39,802 - __main__ - INFO - LLM response completed in 13.04 seconds
INFO:__main__:LLM response completed in 13.04 seconds




------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


**To run the inference in a loop run this cell and enter the query**

In [20]:
verbose = True
stream = True

# response_df = inference.retriever(query,verbose)
while True:
  query = input("Please enter your python related query...")
  inference_instance.llm_call(query,verbose,stream)



## Future Improvements:


  
- **Enhanced Embeddings:** Utilize better pre-trained embedding models to obtain more accurate contextual embeddings for queries and data, improving result accuracy.
  
- **Serverless Vector Databases:** Explore serverless vector databases such as Pinecone and Weaviate to achieve better latency performance and to get faster batch upsertion.
  
- **Hybrid Search Integration:** Integrate hybrid search techniques to enhance retrieval accuracy.
  
- **Advanced LLMs:** Use more robust large language models (LLMs) to generate more accurate and reliable query responses.

- **Persistent Storage:** Implement other databases like PostgreSQL for more efficient and persistent storage solutions.