# <center>**`Project Details`**</center>

#### **Purpose**:

Tis project goal is matching a resume to a job description. A poorly aligned resume can lead to missed opportunities, even when the candidate is a strong fit. The project aims to showcase how we can make use of AI agents to help applicants tailor their resumes more strategically, uncover hidden gaps, and present a stronger case to recruiters — all with minimal effort.

A [Github repo](https://github.com/vikrambhat2/MultiAgents-with-CrewAI-ResumeJDMatcher) tackling this project exist and our goal will be to improve it by seperating the **Backend** and the **Frontend** logics.

##### **Why Split**?

 - The backend will hold business logic, agent orchestration, model calls, state, data processing, API endpoints, while the frontend focuses on the UI/UX, user interaction, session management, file upload, displaying results.
 - Scalability: Backend can scale independently of UI (and can even serve other clients)
 - Security: Sensitive logic, API keys, and resource-intensive processing are kept server-side
 - Performance: Streamlit remains snappy, while heavy lifting is offloaded to backend
 
##### **Responsibilities**

  - *<u>Backend</u>*: 
    - Expose REST API endpoints:
        - `/match`: Accepts resume + JD, returns match results and insights.
        - `/enhance`: Accepts resume + JD, returns resume improvement suggestions.
        - `/cover-letter`: Accepts resume + JD, returns a cover letter.
    - Agent orchestration: All CrewAI workflows run here.
    - Input validation, error handling.
    - PDF/text parsing if desired (or can also be handled in frontend, see below).
    - Optional: Authentication, user/session management, logging, monitoring.
    - Optional: Serve as an async queue for heavy jobs if latency is an issue (using Celery/RQ, etc.).
 
 - *<u>Frontend</u>(Streamlit)*
    - UI for uploading files, entering/pasting text.
    - Visualization: Render reports, scores, enhanced resume, cover letter, etc.
    - API client: Handles all interaction with FastAPI backend.
    - Light preprocessing: E.g., local PDF parsing if you want to send plain text to backend (saves bandwidth).
    - Session/user state, feedback, download links, etc.

Here is how the system works (Flow):

 1. User uploads resume & JD (PDF or text) → Streamlit UI

 2. Frontend extracts or passes files → Sends to FastAPI (as text or file)

 3. FastAPI endpoint receives, orchestrates CrewAI agents, returns structured results

 4. Streamlit displays results, progress, suggestions, etc.

#### **Constraints**:

 - None


#### **Tools**:

 - Use local **ollama** model

#### **Requirements**:
 - Make it work as expected


***

## <center>**`Implementation`**</center>

## **`Backend`**

### Storage

#### DB

In [35]:
%%writefile ../backend/app/storage/db.py
# backend/app/storage/db.py

import os
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from backend.app.storage.models import Base

DATA_DIR = os.environ.get("DATA_DIR", "./data")
DB_PATH = os.path.join(DATA_DIR, "app.sqlite3")
ENGINE = create_engine(f"sqlite:///{DB_PATH}", connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(bind=ENGINE, autoflush=False, autocommit=False)

def init_db():
    # For tests/dev, ensure tables exist. In prod, prefer `alembic upgrade head`.
    if os.environ.get("DB_BOOTSTRAP", "orm") == "orm":
        Base.metadata.create_all(bind=ENGINE)

def ensure_dirs():
    os.makedirs(DATA_DIR, exist_ok=True)
    os.makedirs(os.path.join(DATA_DIR, "artifacts"), exist_ok=True)
    init_db()

@contextmanager
def get_session():
    s = SessionLocal()
    try:
        yield s
    finally:
        s.close()

Overwriting ../backend/app/storage/db.py


#### Models

In [12]:
%%writefile ../backend/app/storage/models.py
# backend/app/storage/models.py
from __future__ import annotations
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Enum, JSON, Text, Integer, DateTime, Index
import enum
import uuid
from datetime import datetime, timezone

def utcnow() -> datetime:
    return datetime.now(timezone.utc)

class Base(DeclarativeBase):
    pass

class RunStatus(str, enum.Enum):
    queued = "queued"
    running = "running"
    succeeded = "succeeded"
    failed = "failed"

class Run(Base):
    __tablename__ = "runs"

    id: Mapped[str] = mapped_column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    payload_hash: Mapped[str] = mapped_column(String, index=True)
    status: Mapped[RunStatus] = mapped_column(Enum(RunStatus), index=True, default=RunStatus.queued)
    error: Mapped[str | None] = mapped_column(Text, nullable=True)

    # normalized inputs
    resume_text: Mapped[str] = mapped_column(Text)
    jd_text: Mapped[str] = mapped_column(Text)
    params: Mapped[dict] = mapped_column(JSON, default=dict)

    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)
    started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
    finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)

    __table_args__ = (
        Index("ix_runs_payload_hash_status", "payload_hash", "status"),
    )

class Artifact(Base):
    __tablename__ = "artifacts"

    id: Mapped[str] = mapped_column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    run_id: Mapped[str] = mapped_column(String, index=True)
    name: Mapped[str] = mapped_column(String)   # filename only
    kind: Mapped[str] = mapped_column(String)   # e.g., "scorecard", "trace"
    mime: Mapped[str] = mapped_column(String)   # e.g., "application/json"
    path: Mapped[str] = mapped_column(String)   # absolute or relative path on disk

    size_bytes: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    sha256: Mapped[str] = mapped_column(String, index=True)

    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, nullable=False)

    __table_args__ = (
        Index("ix_artifacts_run_name", "run_id", "name"),
    )


Overwriting ../backend/app/storage/models.py


#### Artifacts

In [13]:
%%writefile ../backend/app/storage/artifacts.py
# backend/app/storage/artifacts.py
import os
import io
import json
import hashlib
import tempfile
from typing import List
from sqlalchemy.orm import Session
from .models import Artifact
from datetime import datetime, timezone

DATA_DIR = os.environ.get("DATA_DIR", "./data")

MIME_BY_NAME = {
    "scorecard.json": "application/json",
    "scorecard.md": "text/markdown",
    "graph_trace.jsonl": "application/json",
    "gaps.csv": "text/csv",
    # add more as needed
}

def run_dir(run_id: str) -> str:
    base = os.path.join(DATA_DIR, "artifacts")
    path = os.path.join(base, run_id)
    # Ensure no path traversal
    os.makedirs(path, exist_ok=True)
    return path

def _safe_name(name: str) -> str:
    # Reject path separators and traversal
    bn = os.path.basename(name)
    if bn != name or ".." in name or "/" in name or "\\" in name:
        raise ValueError("invalid artifact name")
    return bn

def _sha256_bytes(data: bytes) -> str:
    h = hashlib.sha256()
    h.update(data)
    return h.hexdigest()

def _to_bytes(content: str | dict) -> bytes:
    if isinstance(content, dict):
        return json.dumps(content, ensure_ascii=False, indent=2).encode("utf-8")
    return content.encode("utf-8")

def _atomic_write(target_path: str, data: bytes) -> None:
    # Write to temp file then rename to target (atomic on POSIX)
    d = os.path.dirname(target_path)
    os.makedirs(d, exist_ok=True)
    fd, tmp_path = tempfile.mkstemp(prefix=".tmp_", dir=d)
    try:
        with os.fdopen(fd, "wb") as f:
            f.write(data)
            f.flush()
            os.fsync(f.fileno())
        os.replace(tmp_path, target_path)
    finally:
        try:
            if os.path.exists(tmp_path):
                os.remove(tmp_path)
        except Exception:
            pass

def write_artifact(
    s: Session,
    run_id: str,
    name: str,
    kind: str,
    mime: str | None,
    content: str | dict,
) -> Artifact:
    name = _safe_name(name)
    rd = run_dir(run_id)
    fpath = os.path.join(rd, name)
    data = _to_bytes(content)

    # write atomically
    _atomic_write(fpath, data)

    size = len(data)
    digest = _sha256_bytes(data)
    mime = mime or MIME_BY_NAME.get(name, "application/octet-stream")

    a = Artifact(
        run_id=run_id,
        name=name,
        kind=kind,
        mime=mime,
        path=fpath,
        size_bytes=size,
        sha256=digest,
        created_at=datetime.now(timezone.utc),
    )
    s.add(a)
    s.commit()
    return a

def list_artifacts_for_run(s: Session, run_id: str) -> List[Artifact]:
    # sorted by created_at then name for stability
    return (
        s.query(Artifact)
        .filter(Artifact.run_id == run_id)
        .order_by(Artifact.created_at.asc(), Artifact.name.asc())
        .all()
    )


Overwriting ../backend/app/storage/artifacts.py


### Migrations

In [14]:
%%writefile ../migrations/env.py
# migrations/env.py
from __future__ import annotations
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
import os
import sys

# Ensure backend package is importable
sys.path.append(os.path.abspath("."))

from backend.app.storage.models import Base  # noqa: E402

config = context.config
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = Base.metadata

def run_migrations_offline():
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)
        with context.begin_transaction():
            context.run_migrations()

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()


Overwriting ../migrations/env.py


#### Versions

In [15]:
%%writefile ../migrations/versions/0001_baseline.py
# migrations/versions/0001_baseline.py
"""baseline schema

Revision ID: 0001_baseline
Revises:
Create Date: 2025-10-06 00:00:00
"""
from alembic import op
import sqlalchemy as sa

revision = "0001_baseline"
down_revision = None
branch_labels = None
depends_on = None

def upgrade() -> None:
    op.create_table(
        "runs",
        sa.Column("id", sa.String(), primary_key=True),
        sa.Column("payload_hash", sa.String(), index=True),
        sa.Column("status", sa.Enum("queued", "running", "succeeded", "failed", name="runstatus"), index=True),
        sa.Column("error", sa.Text(), nullable=True),
        sa.Column("resume_text", sa.Text()),
        sa.Column("jd_text", sa.Text()),
        sa.Column("params", sa.JSON()),
        sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
        sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
        sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True),
    )
    op.create_index("ix_runs_payload_hash_status", "runs", ["payload_hash", "status"])

    op.create_table(
        "artifacts",
        sa.Column("id", sa.String(), primary_key=True),
        sa.Column("run_id", sa.String(), index=True),
        sa.Column("name", sa.String()),
        sa.Column("kind", sa.String()),
        sa.Column("mime", sa.String()),
        sa.Column("path", sa.String()),
        sa.Column("size_bytes", sa.Integer(), nullable=False, server_default="0"),
        sa.Column("sha256", sa.String(), index=True),
        sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
    )
    op.create_index("ix_artifacts_run_name", "artifacts", ["run_id", "name"])

def downgrade() -> None:
    op.drop_index("ix_artifacts_run_name", table_name="artifacts")
    op.drop_table("artifacts")
    op.drop_index("ix_runs_payload_hash_status", table_name="runs")
    op.drop_table("runs")
    op.execute("DROP TYPE IF EXISTS runstatus")


Overwriting ../migrations/versions/0001_baseline.py


### Core

#### Graph

In [16]:
%%writefile ../backend/app/core/graph.py
# backend/app/core/graph.py

from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Dict, Any
import re
import json

@dataclass
class GraphState:
    resume_text: str
    jd_text: str
    skills_resume: List[str] = field(default_factory=list)
    skills_jd: List[str] = field(default_factory=list)
    coverage: float = 0.0
    scorecard: Dict[str, Any] = field(default_factory=dict)
    logs: List[Dict[str, Any]] = field(default_factory=list)

SKILL_REGEX = re.compile(r"\b([A-Za-z][A-Za-z0-9\+\#\.]{1,30})\b")

COMMON_STOP = {
    "and","or","the","a","an","with","for","to","of","in","on","at","is","are","be",
    "experience","team","work","project","projects","skills","tool","tools","stack",
    "senior","lead","junior","engineer","developer","manager","product","data",
}

ALIAS = {
    "py": "python",
    "js": "javascript",
    "ts": "typescript",
    "node": "nodejs",
    "postgres": "postgresql",
    "xgboost": "xgboost",
    "ml": "machinelearning",
    "dl": "deep learning",
    "llm": "large language model",
    "ai": "ai",
}

def _norm_text(t: str) -> str:
    t = t.replace("\r\n", "\n")
    t = re.sub(r"[\x00-\x08\x0B-\x1F\x7F]", " ", t)
    return re.sub(r"\s+", " ", t).strip()

def _extract_skills(text: str) -> List[str]:
    raw = [m.group(1).lower() for m in SKILL_REGEX.finditer(text)]
    mapped = [ALIAS.get(x, x) for x in raw]
    dedup = []
    for x in mapped:
        if x in COMMON_STOP:
            continue
        if x.isdigit():
            continue
        if len(x) < 2:
            continue
        if x not in dedup:
            dedup.append(x)
    return dedup[:128]  # cap for deterministic behavior

def _coverage(a: List[str], b: List[str]) -> float:
    if not b:
        return 0.0
    return round(100.0 * len(set(a) & set(b)) / len(set(b)), 1)

def node_normalize(state: GraphState) -> GraphState:
    state.resume_text = _norm_text(state.resume_text)
    state.jd_text = _norm_text(state.jd_text)
    state.logs.append({"node":"normalize_text", "ok": True})
    return state

def node_extract_skills(state: GraphState) -> GraphState:
    state.skills_resume = _extract_skills(state.resume_text)
    state.skills_jd = _extract_skills(state.jd_text)
    state.logs.append({
        "node": "extract_skills_rule_based",
        "resume_count": len(state.skills_resume),
        "jd_count": len(state.skills_jd),
    })
    return state

def node_score_rule_based(state: GraphState) -> GraphState:
    cov = _coverage(state.skills_resume, state.skills_jd)
    # very simple dimensions
    dims = {
        "skills_match": cov,
        "keyword_density": min(100.0, round(len(state.skills_resume)/3, 1)),
        "ats_hygiene": 80.0,  # placeholder constant
    }
    overall = round(0.6 * dims["skills_match"] + 0.25 * dims["keyword_density"] + 0.15 * dims["ats_hygiene"], 1)
    state.coverage = cov
    state.scorecard = {
        "overall_score": overall,
        "dimensions": dims,
        "coverage_terms_overlap": sorted(list(set(state.skills_resume) & set(state.skills_jd)))[:25],
    }
    state.logs.append({"node": "score_rule_based", "coverage": cov, "overall": overall})
    return state

def node_build_scorecard(state: GraphState) -> GraphState:
    # no-op here, but good place to format artifacts later
    state.logs.append({"node": "build_scorecard", "ok": True})
    return state

def run_minimal_graph(resume_text: str, jd_text: str) -> GraphState:
    state = GraphState(resume_text=resume_text, jd_text=jd_text)
    for step in (node_normalize, node_extract_skills, node_score_rule_based, node_build_scorecard):
        state = step(state)
    return state

def scorecard_markdown(state: GraphState) -> str:
    sc = state.scorecard
    dims = sc.get("dimensions", {})
    overlap = sc.get("coverage_terms_overlap", [])
    md = [
        "# Scorecard",
        f"**Overall**: {sc.get('overall_score', 0)}/100",
        "",
        "## Dimensions",
        f"- Skills Match: {dims.get('skills_match', 0)}",
        f"- Keyword Density: {dims.get('keyword_density', 0)}",
        f"- ATS Hygiene: {dims.get('ats_hygiene', 0)}",
        "",
        "## Overlap Terms",
        (", ".join(overlap) if overlap else "_none_"),
    ]
    return "\n".join(md)

def trace_jsonl(state: GraphState) -> str:
    return "\n".join(json.dumps(e, ensure_ascii=False) for e in state.logs)


Overwriting ../backend/app/core/graph.py


#### Run Manager

In [17]:
%%writefile ../backend/app/core/run_manager.py
# backend/app/core/run_manager.py

from __future__ import annotations
from sqlalchemy.orm import Session
from backend.app.storage.models import Run, RunStatus
from backend.app.storage.artifacts import write_artifact
import datetime
from backend.app.core.graph import run_minimal_graph, scorecard_markdown, trace_jsonl

class RunManager:
    def __init__(self, s: Session, run: Run):
        self.s = s
        self.run = run

    def _update_status(self, status: RunStatus, error: str | None = None):
        self.run.status = status
        self.run.error = error
        if status == RunStatus.running:
            self.run.started_at = datetime.datetime.now(datetime.UTC)
        if status in (RunStatus.succeeded, RunStatus.failed):
            self.run.finished_at = datetime.datetime.now(datetime.UTC)
        self.s.add(self.run)
        self.s.commit()

    def execute(self):
        self._update_status(RunStatus.running)

        state = run_minimal_graph(self.run.resume_text, self.run.jd_text)

        write_artifact(self.s, self.run.id, "scorecard.json", "scorecard", "application/json", state.scorecard)
        write_artifact(self.s, self.run.id, "scorecard.md", "scorecard", "text/markdown", scorecard_markdown(state))
        write_artifact(self.s, self.run.id, "graph_trace.jsonl", "trace", "application/json", trace_jsonl(state))

        self._update_status(RunStatus.succeeded)

Overwriting ../backend/app/core/run_manager.py


#### Queue

In [18]:
%%writefile ../backend/app/core/queue.py
# backend/app/core/queue.py
import os
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
from sqlalchemy.orm import Session
from backend.app.storage.db import get_session, ensure_dirs
from backend.app.storage.models import Run, RunStatus
from backend.app.core.run_manager import RunManager

REDIS_URL = os.environ.get("REDIS_URL", "redis://host.docker.internal:6379/0")

async def enqueue_run(run_id: str):
    redis = await create_pool(RedisSettings.from_dsn(REDIS_URL))
    await redis.enqueue_job("run_match_job", run_id=run_id)

async def run_match_job(ctx, run_id: str):
    ensure_dirs()
    # Do all heavy work here (worker process with its own loop)
    with get_session() as s:
        run = s.get(Run, run_id)
        if not run:
            return
        if run.status not in (RunStatus.queued, RunStatus.failed):
            return
        mgr = RunManager(s, run)
        try:
            mgr.execute()
        except Exception as e:
            run.status = RunStatus.failed
            run.error = str(e)
            s.add(run)
            s.commit()

class WorkerSettings:
    redis_settings = RedisSettings.from_dsn(REDIS_URL)
    functions = [run_match_job]
    max_jobs = 10
    retry_jobs = True


Overwriting ../backend/app/core/queue.py


### Worker

In [19]:
%%writefile ../backend/worker.py
# backend/worker.py

# Convenient entrypoint to run worker without module path issues
from typing import cast
from arq import run_worker
from arq.typing import WorkerSettingsType
from backend.app.core.queue import WorkerSettings

if __name__ == "__main__":
    run_worker(cast(WorkerSettingsType, WorkerSettings))


Overwriting ../backend/worker.py


### Data Models

In [20]:
%%writefile ../backend/app/models/schemas.py
# backend/app/models/schemas.py
from typing import Optional, Dict, List, Any
from pydantic import BaseModel, Field

class RunRequest(BaseModel):
    resume_text: str = Field(..., min_length=1, description="Plain text resume")
    jd_text: str = Field(..., min_length=1, description="Plain text job description")
    params: Optional[Dict[str, Any]] = None

class RunResponse(BaseModel):
    run_id: str
    status: str

class ArtifactMeta(BaseModel):
    name: str
    kind: str
    mime: str
    size_bytes: Optional[int] = None

class RunStatusResponse(BaseModel):
    run_id: str
    status: str
    error: Optional[str] = None
    artifacts: List[ArtifactMeta] = []


Overwriting ../backend/app/models/schemas.py


### API Routes

In [21]:
%%writefile ../backend/app/api/routes.py
# backend/app/api/routes.py
import json
import hashlib
from typing import Optional

from fastapi import APIRouter, HTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session

from backend.app.models.schemas import RunRequest, RunResponse, RunStatusResponse, ArtifactMeta
from backend.app.storage.db import get_session, ensure_dirs
from backend.app.storage.models import Run, RunStatus
from backend.app.core.queue import enqueue_run
from backend.app.storage.artifacts import list_artifacts_for_run
from fastapi.responses import FileResponse
from backend.app.storage.artifacts import run_dir


router = APIRouter()

MAX_LEN = 2_000_000  # ~2MB chars


@router.post("/runs", response_model=RunResponse, status_code=202)
async def create_run(req: RunRequest) -> RunResponse:
    """Queue a new run. FastAPI will validate the body into RunRequest automatically."""

    if len(req.resume_text) > MAX_LEN or len(req.jd_text) > MAX_LEN:
        raise HTTPException(status_code=413, detail="payload too large")

    ensure_dirs()

    # Simple idempotency hash
    h = hashlib.sha256(
        (
            req.resume_text.strip()
            + "\n---\n"
            + req.jd_text.strip()
            + json.dumps(req.params or {}, sort_keys=True)
        ).encode("utf-8")
    ).hexdigest()

    with get_session() as s:
        existing = (
            s.execute(
                select(Run)
                .where(Run.payload_hash == h, Run.status == RunStatus.succeeded)
                .order_by(Run.finished_at.desc())   # take newest
            )
            .scalars()
            .first()
        )
        if existing:
            return RunResponse(run_id=existing.id, status=existing.status.value)

        run = Run(
            payload_hash=h,
            status=RunStatus.queued,
            resume_text=req.resume_text,
            jd_text=req.jd_text,
            params=req.params or {},
        )
        s.add(run)
        s.commit()
        s.refresh(run)

    # enqueue the job
    await enqueue_run(run_id=run.id)

    return RunResponse(run_id=run.id, status=str(RunStatus.queued.value))


@router.get("/runs/{run_id}", response_model=RunStatusResponse)
async def get_run(run_id: str) -> RunStatusResponse:
    with get_session() as s:
        run = s.get(Run, run_id)
        if not run:
            raise HTTPException(status_code=404, detail="run not found")

        artifacts = []
        if run.status == RunStatus.succeeded:
            artifacts = [
                ArtifactMeta(name=a.name, kind=a.kind, mime=a.mime, size_bytes=a.size_bytes)
                for a in list_artifacts_for_run(s, run_id)
            ]

        return RunStatusResponse(
            run_id=run.id, status=str(run.status.value), error=run.error, artifacts=artifacts
        )


@router.get("/artifacts/{run_id}")
async def list_artifacts(run_id: str):
    with get_session() as s:
        run = s.get(Run, run_id)
        if not run:
            raise HTTPException(status_code=404, detail="run not found")
        arts = list_artifacts_for_run(s, run_id)
        files = [
            {"name": a.name, "kind": a.kind, "mime": a.mime, "size_bytes": a.size_bytes}
            for a in arts
        ]
        return {"run_id": run_id, "files": files}


@router.get("/artifacts/{run_id}/{name}")
async def get_artifact(run_id: str, name: str):
    with get_session() as s:
        run = s.get(Run, run_id)
        if not run:
            raise HTTPException(status_code=404, detail="run not found")
        arts = list_artifacts_for_run(s, run_id)
        for a in arts:
            if a.name == name:
                # Serve with content-disposition for nice filename in downloads
                return FileResponse(
                    a.path,
                    media_type=a.mime,
                    filename=a.name,
                )
    raise HTTPException(status_code=404, detail="artifact not found")

Overwriting ../backend/app/api/routes.py


### Main app

In [22]:
%%writefile ../backend/app/main.py
# backend/app/main.py

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from backend.app.api.routes import router

app = FastAPI(title="JobMatch-AI API", version="0.1.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

@app.get("/healthz")
async def healtz():
    return {"ok": True}

app.include_router(router, prefix="")

Overwriting ../backend/app/main.py


### Tests

#### ConfTest

In [33]:
%%writefile ../tests/conftest.py
# tests/conftest.py
import os
import sys
import tempfile
from pathlib import Path
import pytest

# Set DATA_DIR & ENV *before* importing any backend code
TEST_DATA_DIR = tempfile.mkdtemp(prefix="jobmatch_test_data_")
os.environ["DATA_DIR"] = TEST_DATA_DIR
os.environ["ENV"] = "test"

# Add repo root to PYTHONPATH for tests
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

# Force AnyIO to use asyncio to avoid trio dependency
@pytest.fixture
def anyio_backend():
    return "asyncio"


Overwriting ../tests/conftest.py


#### Api Test

##### W/O Redis

In [31]:
%%writefile ../tests/test_api_contract.py
# tests/test_api_contract.py
import pytest
import uuid
from httpx import AsyncClient, ASGITransport
from backend.app.main import app
from backend.app.storage.db import ensure_dirs


@pytest.mark.anyio
async def test_healthz():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        r = await ac.get("/healthz")
        assert r.status_code == 200
        assert r.json()["ok"] is True


@pytest.mark.anyio
async def test_create_and_poll_run(monkeypatch):
    ensure_dirs()

    async def _noop_enqueue_run(*args, **kwargs):
        return None

    import backend.app.api.routes as routes_mod
    monkeypatch.setattr(routes_mod, "enqueue_run", _noop_enqueue_run)

    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        payload = {
            "resume_text": f"A B C {uuid.uuid4()}",
            "jd_text": "A B",
            "params": {}
        }
        r = await ac.post("/runs", json=payload)
        assert r.status_code in (200, 202)
        run_id = r.json()["run_id"]

        r2 = await ac.get(f"/runs/{run_id}")
        assert r2.status_code == 200
        body = r2.json()
        assert body["run_id"] == run_id
        assert body["status"] in ("queued", "running", "succeeded", "failed")


Overwriting ../tests/test_api_contract.py


##### With Redis

In [25]:
%%writefile ../tests/test_api_contract.py
# tests/test_api_contract.py
import pytest
from httpx import AsyncClient
from httpx import ASGITransport 
from backend.app.main import app
from backend.app.storage.db import ensure_dirs

@pytest.mark.anyio
async def test_healthz():
    transport = ASGITransport(app=app)  # NEW
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        r = await ac.get("/healthz")
        assert r.status_code == 200
        assert r.json()["ok"] is True

@pytest.mark.anyio
async def test_create_and_poll_run(monkeypatch):
    ensure_dirs()
    transport = ASGITransport(app=app)  # NEW
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        r = await ac.post("/runs", json={"resume_text": "A B C", "jd_text": "A B", "params": {}})
        assert r.status_code in (200, 202)
        run_id = r.json()["run_id"]

        r2 = await ac.get(f"/runs/{run_id}")
        assert r2.status_code == 200
        body = r2.json()
        assert body["run_id"] == run_id
        assert body["status"] in ("queued", "running", "succeeded", "failed")

Overwriting ../tests/test_api_contract.py


#### Test graph

In [26]:
%%writefile ../tests/test_graph_unit.py
# tests/test_graph_unit.py

from backend.app.core.graph import run_minimal_graph

def test_graph_basic():
    resume = "Built APIs in Python & FastAPI. Used PostgreSQL and Docker. Deployed on AWS."
    jd = "Looking for a Python developer with FastAPI, PostgreSQL, and AWS experience."
    st = run_minimal_graph(resume, jd)
    assert st.scorecard["overall_score"] > 50
    assert "python" in st.skills_resume
    assert "fastapi" in st.skills_resume
    assert st.coverage >= 50.0

Overwriting ../tests/test_graph_unit.py


## **`Frontend`**

### Streamlit App

In [27]:
%%writefile ../frontend/streamlit_app/main.py
# streamlit_app/main.py
import os
import asyncio
import httpx
import streamlit as st

API_BASE = os.environ.get("API_BASE", "http://localhost:8000")

st.set_page_config(page_title="JobMatch-AI", layout="wide")
st.title("JobMatch-AI — Resume ↔ JD Matcher")

with st.sidebar:
    st.markdown("**Status**: Test (stub graph)")
    st.markdown("API: " + API_BASE)

resume_text = st.text_area("Paste your Resume (plain text)", height=220)
jd_text = st.text_area("Paste the Job Description (plain text)", height=220)
params_col1, params_col2 = st.columns(2)
with params_col1:
    threshold = st.slider("Minimum score threshold (UI only)", 0, 100, 65)

run_btn = st.button("Run Match", type="primary", use_container_width=True)

status_placeholder = st.empty()
artifacts_placeholder = st.empty()

async def poll_status(run_id: str):
    status_placeholder.info(f"Run `{run_id}` queued…")
    async with httpx.AsyncClient(timeout=30.0) as client:
        while True:
            r = await client.get(f"{API_BASE}/runs/{run_id}")
            r.raise_for_status()
            data = r.json()
            if data["status"] == "succeeded":
                status_placeholder.success(f"Run `{run_id}`: succeeded ✅")
                return data
            if data["status"] == "failed":
                status_placeholder.error(f"Run `{run_id}`: failed ❌ — {data.get('error')}")
                return data
            status_placeholder.info(f"Run `{run_id}`: {data['status']}…")
            await asyncio.sleep(1.0)

if run_btn:
    if not resume_text.strip() or not jd_text.strip():
        st.warning("Please paste both Resume and Job Description.")
    else:
        with st.spinner("Submitting…"):
            resp = httpx.post(f"{API_BASE}/runs", json={"resume_text": resume_text, "jd_text": jd_text, "params": {"ui_threshold": threshold}})
            if resp.status_code not in (200, 202):
                st.error(f"Error: {resp.text}")
            else:
                run_id = resp.json()["run_id"]
                data = asyncio.run(poll_status(run_id))
                if data and data.get("artifacts"):
                    with artifacts_placeholder.container():
                        st.subheader("Artifacts")
                        for a in data["artifacts"]:
                            url = f"{API_BASE}/artifacts/{run_id}/{a['name']}"
                            st.write(f"- **{a['name']}** ({a['mime']}) — [download]({url})")


Overwriting ../frontend/streamlit_app/main.py
