# <center>**`Project Details`**</center>

#### **Purpose**:

Tis project goal is matching a resume to a job description. A poorly aligned resume can lead to missed opportunities, even when the candidate is a strong fit. The project aims to showcase how we can make use of AI agents to help applicants tailor their resumes more strategically, uncover hidden gaps, and present a stronger case to recruiters — all with minimal effort.

A [Github repo](https://github.com/vikrambhat2/MultiAgents-with-CrewAI-ResumeJDMatcher) tackling this project exist and our goal will be to improve it by seperating the **Backend** and the **Frontend** logics.

##### **Why Split**?

 - The backend will hold business logic, agent orchestration, model calls, state, data processing, API endpoints, while the frontend focuses on the UI/UX, user interaction, session management, file upload, displaying results.
 - Scalability: Backend can scale independently of UI (and can even serve other clients)
 - Security: Sensitive logic, API keys, and resource-intensive processing are kept server-side
 - Performance: Streamlit remains snappy, while heavy lifting is offloaded to backend
 
##### **Responsibilities**

  - *<u>Backend</u>*: 
    - Expose REST API endpoints:
        - `/match`: Accepts resume + JD, returns match results and insights.
        - `/enhance`: Accepts resume + JD, returns resume improvement suggestions.
        - `/cover-letter`: Accepts resume + JD, returns a cover letter.
    - Agent orchestration: All CrewAI workflows run here.
    - Input validation, error handling.
    - PDF/text parsing if desired (or can also be handled in frontend, see below).
    - Optional: Authentication, user/session management, logging, monitoring.
    - Optional: Serve as an async queue for heavy jobs if latency is an issue (using Celery/RQ, etc.).
 
 - *<u>Frontend</u>(Streamlit)*
    - UI for uploading files, entering/pasting text.
    - Visualization: Render reports, scores, enhanced resume, cover letter, etc.
    - API client: Handles all interaction with FastAPI backend.
    - Light preprocessing: E.g., local PDF parsing if you want to send plain text to backend (saves bandwidth).
    - Session/user state, feedback, download links, etc.

Here is how the system works (Flow):

 1. User uploads resume & JD (PDF or text) → Streamlit UI

 2. Frontend extracts or passes files → Sends to FastAPI (as text or file)

 3. FastAPI endpoint receives, orchestrates CrewAI agents, returns structured results

 4. Streamlit displays results, progress, suggestions, etc.

#### **Constraints**:

 - None


#### **Tools**:

 - Use local **ollama** model

#### **Requirements**:
 - Make it work as expected


***

## <center>**`Implementation`**</center>

## **`Backend`**

#### Config

In [17]:
%%writefile ../backend/app/config.py
# backend/app/config.py

from pydantic import BaseModel, Field
import os
from dotenv import load_dotenv

load_dotenv()

class Settings(BaseModel):
    # LLM config
    LLM_PROVIDER: str = Field(default=os.getenv("LLM_PROVIDER", "ollama"))
    LLM_API_KEY: str = Field(default=os.getenv("LLM_API_KEY", "ollama"))
    LLM_BASE_URL: str = Field(default=os.getenv("LLM_BASE_URL", "http://ollama:11434"))
    #LLM_BASE_URL: str = Field(default=os.getenv("LLM_BASE_URL", "http://host.docker.internal:11434"))
    LLM_MODEL_NAME: str = Field(default=os.getenv("LLM_MODEL_NAME", "llama3.2"))
    LLM_TEMPERATURE: str = Field(default=float(os.getenv("LLM_TEMPERATURE", "0.2")))

    # Celery/Redis
    REDIS_URL: str = Field(default=os.getenv("REDIS_URL", "redis://host.docker.internal:6379/0"))

    def full_model_id(self) -> str:
        """
        Return provider-prefixed model id for LiteLLM, e.g.:
        - 'ollama/llama3.2'
        - 'openai/gpt-4o-mini'
        - 'groq/llama3-8b-8192'
        """
        provider = self.LLM_PROVIDER.strip().lower()
        # If already prefixed, keep as is
        if "/" in self.LLM_MODEL_NAME:
            return self.LLM_MODEL_NAME
        return f"{provider}/{self.LLM_MODEL_NAME}"


settings = Settings()

Overwriting ../backend/app/config.py


### Core

#### PDF Parsing

In [2]:
%%writefile ../backend/app/core/pdf_parser.py

#backend/app/core/pdf_parser.py
from typing import Union
from pathlib import Path
from PyPDF2 import PdfReader


class PDFParser:
    """Handles PDF and plain text extraction."""

    def extract_text(self, file: Union[Path, bytes]) -> str:
        if isinstance(file, Path):
            with open(file, "rb") as f:
                reader = PdfReader(f)
                return self._extract_all(reader)
        elif isinstance(file, bytes):
            from io import BytesIO
            reader = PdfReader(BytesIO(file))
            return self._extract_all(reader)
        else:
            raise ValueError("Unsupported file type for PDFParser.")
        
    def _extract_all(self, reader: PdfReader) -> str:
        text = []
        for page in reader.pages:
            page_text = page.extract_text()
            if page_text:
                text.append(page_text)
        return "\n".join(text).strip()

Overwriting ../backend/app/core/pdf_parser.py


#### Agents

In [3]:
%%writefile ../backend/app/core/agents.py
# backend/app/core/agents.py

from dataclasses import dataclass
from typing import Any
from crewai import Agent, LLM

@dataclass
class MatcherAgents:
    resume_parser: Agent
    jd_parser: Agent
    matcher: Agent
    enhancer: Agent
    cover_letter: Agent

class AgentsFactory:
    """Factory that builds all CrewAI agents with a shared LLM."""
    def __init__(self, llm: LLM):
        self.llm = llm

    def build(self) -> MatcherAgents:
        resume_parser = Agent(
            role="Resume Parsing Specialist",
            goal="Extract structured data (skills, experience, education, tools) from a resume.",
            backstory="You are meticulous and consistent. Output JSON only.",
            llm=self.llm,
            verbose=False
        )
        jd_parser = Agent(
            role="Job Description Analyst",
            goal="Extract required skills, responsibilities, and must-haves from a JD.",
            backstory="You identify core requirements and hiring signals. Output JSON only.",
            llm=self.llm,
            verbose=False
        )
        matcher = Agent(
            role="Resume-JD Matcher",
            goal="Compare parsed resume vs parsed JD. Score 0-100 and list strengths and gaps.",
            backstory="You are objective and concise. Output JSON only.",
            llm=self.llm,
            verbose=False
        )
        enhancer = Agent(
            role="Resume Enhancer",
            goal="Suggest resume improvements aligned with the JD and rewrite 3–5 key bullets.",
            backstory="Keep it ATS-friendly and specific. Output Markdown.",
            llm=self.llm,
            verbose=False
        )
        cover_letter = Agent(
            role="Cover Letter Writer",
            goal="Draft a tailored one-page cover letter aligned with resume and JD.",
            backstory="Professional, concise, concrete achievements. Output Markdown.",
            llm=self.llm,
            verbose=False
        )

        return MatcherAgents(
            resume_parser=resume_parser,
            jd_parser=jd_parser,
            matcher=matcher,
            enhancer=enhancer,
            cover_letter=cover_letter
        )

Writing ../backend/app/core/agents.py


#### Agent Orchestrator

In [None]:
#%%writefile ../backend/app/core/agent_orchestrator.py

# backend/app/core/agent_orchestrator.py
from typing import Dict, Any
from crewai import Task, Crew, LLM, Process
from backend.app.config import settings
from backend.app.core.agents import AgentsFactory

class AgentOrchestrator:
    """Handles agent pipeline for resume-JD matching."""
    def __init__(self):
        # We will build LLM here
        model_id = settings.full_model_id()
        self.llm = LLM(
            model=model_id,
            base_url=settings.LLM_BASE_URL,
            api_key=settings.LLM_API_KEY,
            temperature=settings.LLM_TEMPERATURE,
        )

    def _common_validate(self, data: Dict[str, Any]):
        resume = (data or {}).get("resume") or ""
        jd = (data or {}).get("jd") or ""
        if not resume.strip() or not jd.strip():
            raise ValueError("Both 'resume' and 'jd' text are required.")
        return resume, jd
    
    def _build_parsing_tasks(self, agents, resume: str, jd: str):
        resume_task = Task(
            description=f"Extract structured JSON from the resume text below.\nReturn keys: skills, experience, education, tools.\n\nRESUME:\n{resume}",
            expected_output="Valid JSON with keys: skills, experience, education, tools.",
            agent=agents.resume_parser
        )
        jd_task = Task(
            description=f"Extract structured JSON from the job description below.\nReturn keys: must_haves, nice_to_haves, responsibilities, keywords.\n\nJD:\n{jd}",
            expected_output="Valid JSON with keys: must_haves, nice_to_haves, responsibilities, keywords.",
            agent=agents.jd_parser
        )
        return resume_task, jd_task

    def run(self, job_type: str, data: Dict[str, Any]) -> Dict[str, Any]:
        """Executes the specified agent pipeline.
        Args:
            job_type: 'match', 'enhance', or 'cover_letter'
            data: Dict with 'resume' and 'jd' (plain text)
        Returns:
            Dict with results
        """

        job_type = (job_type or "").lower()
        if job_type not in {"match", "enhance", "cover_letter"}:
            raise ValueError(f"Unsupported job_type: {job_type}")
        
        resume, jd = self._common_validate(data)
        agents = AgentsFactory(self.llm).build()

        if job_type == "match":
            resume_task, jd_task = self._build_parsing_tasks(agents, resume, jd)
            match_task = Task(
                description="Compare the parsed resume vs parsed JD and return a JSON with keys: "
                            "- match_score: integer from 0-100, " \
                            "- strengths: list of matching skills from the resume and the JD, " \
                            "- gaps: list of gaps in the resume compared to the JD, " \
                            "- summary: string to summarize the evaluation.",
                expected_output="Valid JSON with keys: match_score, strengths, gaps, summary.",
                agent=agents.matcher,
                context=[resume_task, jd_task]
            )
            crew = Crew(
                agents=[agents.resume_parser, agents.jd_parser, agents.matcher],
                tasks=[resume_task, jd_task, match_task],
                process=Process.sequential,
                verbose=False,
                name="MatchCrew",
                description="Parses resume and JD, then computes a structured match report."
            )
            result = crew.kickoff()
            return self._safe_parse_result(result, kind="match")
        
        if job_type == "enhance":
            resume_task, jd_task = self._build_parsing_tasks(agents, resume, jd)
            enhance_task = Task(
                description="Using parsed resume and JD, suggest concrete improvements and rewrite 3–5 bullets. "
                            "Return Markdown with sections: 'Improvements' (bulleted) and 'Rewritten Bullets'.",
                expected_output="Markdown with 'Improvements' and 'Rewritten Bullets' sections.",
                agent=agents.enhancer,
                context=[resume_task, jd_task]
            )
            crew = Crew(
                agents=[agents.resume_parser, agents.jd_parser, agents.enhancer],
                tasks=[resume_task, jd_task, enhance_task],
                process=Process.sequential,
                verbose=False,
                name="EnhanceCrew",
                description="Parses resume and JD, then produces targeted enhancements."
            )
            result = crew.kickoff()
            return {"status": "done", "result": {"resume_enhancement_md": getattr(result, "raw", str(result))}}

        if job_type == "cover_letter":
            resume_task, jd_task = self._build_parsing_tasks(agents, resume, jd)
            cl_task = Task(
                description="Draft a tailored one-page cover letter in Markdown based on parsed resume and JD.",
                expected_output="A Markdown-formatted cover letter.",
                agent=agents.cover_letter,
                context=[resume_task, jd_task]
            )
            crew = Crew(
                agents=[agents.resume_parser, agents.jd_parser, agents.cover_letter],
                tasks=[resume_task, jd_task, cl_task],
                process=Process.sequential,
                verbose=False,
                name="CoverLetterCrew",
                description="Parses resume and JD, then writes a tailored cover letter."
            )
            result = crew.kickoff()
            return {"status": "done", "result": {"cover_letter_md": getattr(result, "raw", str(result))}}

        raise RuntimeError("Unreachable branch.")

    def _safe_parse_result(self, crew_result, kind: str) -> Dict[str, Any]:
        raw = getattr(crew_result, "raw", None)
        if not raw:
            return {"status": "done", "result": {"raw": str(crew_result)}}
        # The matcher agent is instructed to output JSON, but we guard anyway.
        try:
            import json
            parsed = json.loads(raw)
            return {"status": "done", "result": parsed}
        except Exception:
            return {"status": "done", "result": {"raw": raw}}


Overwriting ../backend/app/core/agent_orchestrator.py


### Job Queueing + Celery for distributed background job handling

#### Queueing

In [12]:
%%writefile ../backend/app/core/async_queue.py
# backend/app/core/async_queue.py

from celery.result import AsyncResult
from backend.app.core.tasks import run_agent_job
from backend.worker.worker import celery_app

class AsyncJobQueueCelery:
    """Async job queue using Celery."""
    def submit_job(self, job_type: str, payload: dict) -> str:
        # Ensure we don't pass 'job_type' twice (in task arg and inside payload)
        clean_payload = dict(payload or {})
        clean_payload.pop("job_type", None)
        celery_result = run_agent_job.delay(job_type, clean_payload)
        return celery_result.id
    
    def get_status(self, job_id: str) -> dict:
        result = AsyncResult(job_id, app=celery_app)
        status = result.status
        value = result.result if result.successful() else None
        return {"status": status, "result": value}
    
# Singleton
queue = AsyncJobQueueCelery()

Overwriting ../backend/app/core/async_queue.py


#### Celery config

In [10]:
%%writefile ../backend/celeryconfig.py
# backend/celeryconfig.py

import os

# redis is in another docker container
# if it's not the case for you,
# use : "redis://localhost:6379/0"

BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://host.docker.internal:6379/0")
RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", BROKER_URL)

broker_url = BROKER_URL
result_backend = RESULT_BACKEND


task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "UTC"
enable_utc = True

# Optional routing example (future: create dedicated queues)
task_queues = None

Overwriting ../backend/celeryconfig.py


#### Celery worker

In [9]:
%%writefile ../backend/worker/worker.py
# backend/worker/worker.py

from celery import Celery

# Create Celery app
celery_app = Celery("resume_jd_matcher")
celery_app.config_from_object("backend.celeryconfig")

# Ensure tasks are imported on worker start
import backend.app.core.tasks

Overwriting ../backend/worker/worker.py


#### Celery Task

In [13]:
%%writefile ../backend/app/core/tasks.py
# backend/app/core/tasks.py

from celery.utils.log import get_task_logger
from backend.worker.worker import celery_app
from backend.app.core.agent_orchestrator import AgentOrchestrator

logger = get_task_logger(__name__)

@celery_app.task(
    name="run_agent_job",
    bind=False,
    autoretry_for=(Exception,),
    retry_backoff=True,
    retry_jitter=True,
    retry_kwargs={"max_retries": 3},
    soft_time_limit=180,  # seconds
    time_limit=240        # hard limit)
)
def run_agent_job(job_type: str, data: dict):
    logger.info("Starting job type=%s", job_type)
    orchestrator = AgentOrchestrator()
    result = orchestrator.run(job_type, data or {})
    logger.info("Finished job type=%s", job_type)
    return result

Overwriting ../backend/app/core/tasks.py


### Backend api

#### Data models

In [7]:
%%writefile ../backend/app/models/job_models.py

#backend/app/models/job_models.py

from pydantic import BaseModel, Field
from typing import Optional, Dict

class ResumeJDRequest(BaseModel):
    job_type: str = Field(..., description="One of: match, enhance, cover_letter")
    resume: Optional[str] = Field(default=None, description="Plain text resume")
    jd: Optional[str] = Field(default=None, description="Plain text job description")

class PDFUploadResponse(BaseModel):
    extracted_text: str

class JobSubmitResponse(BaseModel):
    job_id: str

class JobStatusResponse(BaseModel):
    status: str
    result: Optional[Dict] = None

Overwriting ../backend/app/models/job_models.py


#### Router

In [8]:
%%writefile ../backend/app/api/routes.py
#backend/app/api/routes.py

from fastapi import APIRouter, File, UploadFile
from backend.app.core.pdf_parser import PDFParser
from backend.app.core.async_queue import queue
from backend.app.models.job_models import(
    ResumeJDRequest,
    PDFUploadResponse,
    JobSubmitResponse,
    JobStatusResponse
)


api_router = APIRouter()

@api_router.get("/health", tags=["Health"])
def health_check():
    return {"status": "ok"}

@api_router.post("/parse-pdf", response_model=PDFUploadResponse, tags=["Parsing"])
async def parse_pdf_endpoint(file: UploadFile = File(...)):
    """Extract text from uploaded PDF file."""
    content = await file.read()
    parser = PDFParser()
    text = parser.extract_text(content)
    return PDFUploadResponse(extracted_text=text)

@api_router.post("/submit-job", response_model=JobSubmitResponse, tags=["Jobs"])
async def submit_job(request: ResumeJDRequest):
    """Submit a matching/enhancing/cover letter job."""
    job_id = queue.submit_job(request.job_type, request.dict())
    return JobSubmitResponse(job_id=job_id)

@api_router.get("/job-status/{job_id}", response_model=JobStatusResponse, tags=["Jobs"])
async def job_status(job_id: str):
    status = queue.get_status(job_id)
    return JobStatusResponse(**status)

Overwriting ../backend/app/api/routes.py


#### App

In [37]:
%%writefile ../backend/app/main.py
#backend/app/main.py

from fastapi import FastAPI
from backend.app.api.routes import api_router

class JDMatcherApp:
    def __init__(self):
        self.app = FastAPI(
            title="Resume-JD Matcher API",
            description="Backend for matching candidate resumes to job descriptions using AI agents.",
            version="0.1.0"
        )
        self.include_routers()


    def include_routers(self):
        self.app.include_router(api_router)

def get_app():
    """Entrypoint for ASGI"""
    return JDMatcherApp().app

# Run with 'uvicorn backend.app.main:get_app'
app = get_app()

Overwriting ../backend/app/main.py


## **`Frontend`**