In [14]:
from enum import Enum
from pathlib import Path
import requests
import json
import shutil

from pypdf import PdfReader
from kaggle.api.kaggle_api_extended import KaggleApi
import kagglehub

In [15]:
kg_api = KaggleApi()
kg_api.authenticate()

In [16]:
DatasetSavePath = Path("../Datasets/Source1/raw")

In [18]:
# DatasetTempPath = kagglehub.dataset_download("snehaanbhawal/resume-dataset")
# shutil.move(DatasetTempPath, DatasetSavePath)

# Core Functionality (Required)
## 1. Resume Ingestion and Processing
- Index multiple resumes: Ingest and process multiple resume files (PDF and plain text).
- Parse content: Extract text from resumes.
- Chunk text: Split parsed text into meaningful chunks (100–500 tokens with overlap).
- Generate embeddings: Produce vector embeddings for each chunk using a free-tier
model.

In [20]:
datasetPath = DatasetSavePath

In [21]:
class FileType(Enum):
    NAF = 0
    TXT = 1
    PDF = 2

In [22]:
type_suffix_conversion = {
    FileType.NAF: "",
    FileType.TXT: ".txt",
    FileType.PDF: ".pdf",
}
# To convert from suffix to FileType (excluding empty string):
suffix_type_conversion = {v: k for k, v in type_suffix_conversion.items() if v}

In [24]:
class File:
    def __init__(self, file_path: Path, file_type: FileType = FileType.NAF) -> None:
        self.file_path = file_path
        self.file_type: FileType = file_type
        self.content: str = ""

    def __getitem__(self, idx: int) -> str:
        if idx >= len(self.content):
            raise IndexError("Index out of range")
        return self.content[idx]
    
    def read(self) -> str:
        if self.file_type == FileType.PDF:
            reader = PdfReader(self.file_path)
            self.content = "\n".join(page.extract_text() for page in reader.pages)
        elif self.file_type == FileType.TXT:
            with open(self.file_path, "r", encoding="utf-8") as file:
                self.content = file.read()
        else:
            raise ValueError(f"Unsupported file type: {self.file_type}")
        return self.content

In [25]:
class Files:
    def __init__(self, file_list: list[File] = []) -> None:
        self.file_list = []
        for file in file_list:
            self.file_list.append(file)
        
    def __getitem__(self, index: int) -> File:
        return self.file_list[index]
    
    def add(self, file: File) -> None:
        self.file_list.append(file)

In [8]:
class Collection:
    def __init__(self, path: Path) -> None:
        self.path: Path = path
        self.data: dict[FileType, list[File]] = {}
    def add_file_with_type(self, file: File, file_type: FileType) -> None:
        if file_type not in self.data:
            self.data[file_type] = []
        self.data[file_type].append(file)
    def add_files_with_type(self, files: list[File], file_type: FileType) -> None:
        if file_type not in self.data:
            self.data[file_type] = []
        self.data[file_type] = files

In [9]:
class IngestCollection:
    def __init__(self, path: Path) -> None:
        self.path: Path = path
        self.collection: Collection = Collection(path)
    
    def _files_with_types(self, path: Path) -> dict[FileType, list[File]]:
        files_by_type = {ftype: [] for ftype in type_suffix_conversion.keys()}
        
        for file in path.iterdir():
            if file.is_file():
                for file_type, suffix in type_suffix_conversion.items():
                    if file.suffix.lower() == suffix:
                        files_by_type[file_type].append(File(file, file_type))
                        break
        return files_by_type
    
    def ingest(self) -> None:
        files_by_type = self._files_with_types(self.path)
        for file_type, files in files_by_type.items():
            if files:
                self.collection.add_files_with_type(files, file_type)
    
    def get_collection(self) -> Collection:
        return self.collection

In [10]:
from typing import Optional


class IngestFolders:
    def __init__(self, path: Path) -> None:
        self.path: Path = path
        self.folder_names: list[str] = []
        self.ingesters: list[IngestCollection] = []
    
    def _folders_in_path(self) -> list[Path]:
        return [folder for folder in self.path.iterdir() if folder.is_dir()]
    
    def ingest(self) -> None:
        for folder in self._folders_in_path():
            self.folder_names.append(folder.name)
            ingester = IngestCollection(folder)
            ingester.ingest()
            self.ingesters.append(ingester)
    
    def get_collections(self) -> list[Collection]:
        return [ingester.get_collection() for ingester in self.ingesters]
    
    def get_folder_names(self) -> list[str]:
        return self.folder_names

In [11]:
Ingester = IngestFolders(datasetPath)
Ingester.ingest()

In [12]:
files = Ingester.get_collections()
folder_names = Ingester.get_folder_names()

In [13]:
class Reader:
    def __init__(self, file: File) -> None:
        self.file = file
        self.reader = None
        if file.file_type == FileType.PDF:
            self.reader = PdfReader(file.file_path)
    
    def get_text(self) -> str:
        if self.reader is None:
            return ""
        text = ""
        for page in self.reader.pages:
            text += page.extract_text() + "\n"
        return text

In [14]:
class ReadCollection:
    def __init__(self, collection: Collection) -> None:
        self.collection = collection
        self.readers: dict[FileType, list[Reader]] = {}
        for file_type, files in collection.data.items():
            self.readers[file_type] = [Reader(file) for file in files]
    
    def get_texts(self) -> dict[FileType, list[str]]:
        texts = {}
        for file_type, readers in self.readers.items():
            texts[file_type] = [reader.get_text() for reader in readers]
        return texts

In [15]:
class ReadCollections:
    def __init__(self, collections: list[Collection]) -> None:
        self.collections = collections
        self.read_collections: list[ReadCollection] = [ReadCollection(collection) for collection in collections]
    
    def get_texts(self) -> dict[FileType, list[str]]:
        texts = {}
        for read_collection in self.read_collections:
            collection_texts = read_collection.get_texts()
            for file_type, text_list in collection_texts.items():
                if file_type not in texts:
                    texts[file_type] = []
                texts[file_type].extend(text_list)
        return texts

In [16]:
# processed_texts = ReadCollections(files).get_texts()

In [17]:
# def save_texts_to_files(texts: dict[FileType, list[str]], output_dir: Path) -> None:
#     output_dir.mkdir(parents=True, exist_ok=True)
#     for file_type, text_list in texts.items():
#         suffix = type_suffix_conversion[FileType.TXT]
#         for idx, text in enumerate(text_list):
#             file_name = f"{file_type.name.lower()}_{idx}{suffix}"
#             with open(output_dir / file_name, "w", encoding="utf-8") as f:
#                 f.write(text)

In [18]:
# save_texts_to_files(processed_texts, Path("../Data/Datasets/Source2/data/processed"))
# print("Files saved successfully.")

In [19]:
def read_txt_files(texts_path: Path) -> list[str]:
    texts = []
    for file in texts_path.glob("*.txt"):
        with open(file, "r", encoding="utf-8") as f:
            texts.append(f.read())
    return texts

In [20]:
textsPath = Path("../Data/Datasets/Source2/data/processed")
texts = read_txt_files(textsPath)

In [21]:
def chunk_sliding_window(text: str, window_size: int = 150, step_size: int = 100) -> list[str]:
    return [text[i:i + window_size] for i in range(0, len(text), step_size) if i + window_size <= len(text)]

In [22]:
def chunked_texts(texts: list[str]):
    window_size = 150
    step_size = 100
    chunked_texts = []
    for text in texts:
        chunks = chunk_sliding_window(text, window_size, step_size)
        chunked_texts.append(chunks)
    return chunked_texts

In [23]:
chunked_texts_list = chunked_texts(texts)

In [24]:
print(chunked_texts_list[0][0])

ACCOUNTANT
Summary
Financial Accountant specializing in financial planning, reporting and analysis within the Department of Defense.
Highlights
Accoun


In [44]:
url: str = "http://localhost:1234/v1/embeddings"
headers: dict[str, str] = {"Content-Type": "application/json"}
model_name: str = "google/gemma-3n-e4b"

In [48]:
payload: dict = {"model": model_name, "input": chunked_texts_list[0][0]}
response = requests.post(
    url=url, headers=headers, json=payload
)
print(response)

<Response [404]>


## 2. Vector Database Integration
- Store embeddings: Save generated embeddings in a vector database.
- Efficient retrieval: Implement top‑K similarity search for queries.

In [59]:
print("Hello :D")

Hello :D


## 3. Retrieval-Augmented Generation (RAG) Chatbot
- Job description input: Accept a text description of the job role.
- Retrieve relevant chunks: Perform vector search to find top resume chunks.
- LLM-based matching: Use an LLM to generate conversational answers about
candidate fit, citing retrieved content.
- Conversational interface: Support follow-up questions for deeper insights.

In [3]:
print("Hello :D")

Hello :D


# Optional Bonus Features
## A. Web User Interface
- Frontend (Bonus): Implement a simple web UI for file upload, job description entry, and
chat interaction.
- Tech Stack: Next.js with TypeScript
- Hosting: Vercel (Hobby/Free Tier)

In [4]:
print("Hello :D")

Hello :D


## B. SQL-Based Metadata Search
- Extract metadata: Tag resumes with structured metadata (skills, titles, experience).
- Metadata storage: Save tags in a relational database (e.g., PostgreSQL alongside
vector store).
- Metadata API: Expose an endpoint for SQL queries (e.g., SELECT * FROM
resume_metadata WHERE skills @> ARRAY['TypeScript'] AND
years_experience >= 5).

In [5]:
print("Hello :D")

Hello :D
