In [None]:
!pip install PyPDF2 python-docx pytesseract pdf2image Pillow langdetect google-genai

Collecting PyPDF2
  Using cached pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Collecting python-docx
  Downloading python_docx-1.2.0-py3-none-any.whl.metadata (2.0 kB)
Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Collecting pdf2image
  Downloading pdf2image-1.17.0-py3-none-any.whl.metadata (6.2 kB)
Collecting Pillow
  Downloading pillow-11.3.0-cp39-cp39-win_amd64.whl.metadata (9.2 kB)
Collecting langdetect
  Downloading langdetect-1.0.9.tar.gz (981 kB)
     ---------------------------------------- 0.0/981.5 kB ? eta -:--:--
     -------------------- ----------------- 524.3/981.5 kB 5.6 MB/s eta 0:00:01
     ---------------------------------------- 981.5/981.5 kB 2.9 MB/s  0:00:00
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started


In [1]:
import os
import shutil
from pathlib import Path
from langdetect import detect
from PyPDF2 import PdfReader
from docx import Document
from pdf2image import convert_from_path
import pytesseract
import re
from PIL import Image
# Folders
RAW_DIR = "data/raw"
CLEAN_DIR = "data/cleaned"
LOG_FILE = "docs/cleaning_log.txt"

Path(CLEAN_DIR).mkdir(parents=True, exist_ok=True)
Path("docs").mkdir(parents=True, exist_ok=True)

# Function to redact emails and phone numbers
def redact_pii(text):
    text = re.sub(r'\b[\w\.-]+@[\w\.-]+\.\w+\b', '[REDACTED_EMAIL]', text)
    text = re.sub(r'\b\d{10,}\b', '[REDACTED_PHONE]', text)
    return text

def is_pdf(file_path):
    return file_path.lower().endswith(".pdf")

def is_docx(file_path):
    return file_path.lower().endswith(".docx")

def detect_language(text):
    try:
        return detect(text)
    except:
        return "unknown"

def extract_text_pdf(file_path):
    try:
        reader = PdfReader(file_path)
        text = ""
        for page in reader.pages:
            text += page.extract_text() or ""
        # If no text found → scanned PDF → use OCR
        if len(text.strip()) == 0:
            images = convert_from_path(file_path)
            text = ""
            for img in images:
                text += pytesseract.image_to_string(img)
        return text
    except:
        return None
   
# import fitz  # PyMuPDF
# import pytesseract

# def extract_text_pdf(file_path):
#     try:
#         doc = fitz.open(file_path)
#         text = ""
#         for page in doc:
#             # Extract text directly
#             page_text = page.get_text()
#             if page_text.strip():
#                 text += page_text
#             else:
#                 # If no text, use OCR on the page image
#                 pix = page.get_pixmap(dpi=300)
#                 img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
#                 text += pytesseract.image_to_string(img, lang='eng')
#         return text if text.strip() else None
#     except Exception as e:
#         print(f"Error reading PDF {file_path}: {e}")
#         return None

def extract_text_docx(file_path):
    try:
        doc = Document(file_path)
        return "\n".join([p.text for p in doc.paragraphs])
    except:
        return None

def process_files():
    cleaning_log = []
    seen_files = set()
    for filename in os.listdir(RAW_DIR):
        file_path = os.path.join(RAW_DIR, filename)

        # Skip duplicates
        if filename in seen_files:
            cleaning_log.append((filename, "duplicate"))
            continue
        seen_files.add(filename)

        # Unsupported format
        if not (is_pdf(file_path) or is_docx(file_path)):
            cleaning_log.append((filename, "unsupported format"))
            continue

        # Extract text
        text = extract_text_pdf(file_path) if is_pdf(file_path) else extract_text_docx(file_path)
        if not text or len(text.strip()) == 0:
            cleaning_log.append((filename, "corrupted or empty"))
            continue

        # Detect language
        lang = detect_language(text)
        if lang != "en":
            cleaning_log.append((filename, f"non-English ({lang})"))
            continue

        # Redact PII
        text = redact_pii(text)

        # Save cleaned file
        shutil.copy(file_path, os.path.join(CLEAN_DIR, filename))
        cleaning_log.append((filename, "cleaned"))

    # Save log
    with open(LOG_FILE, "w") as f:
        for item in cleaning_log:
            f.write(f"{item[0]} : {item[1]}\n")
    print(f"Cleaning done. Log saved to {LOG_FILE}")

if __name__ == "__main__":
    process_files()


Cleaning done. Log saved to docs/cleaning_log.txt


In [2]:
import os
from pathlib import Path
from PyPDF2 import PdfReader
from docx import Document

# Folders
CLEAN_DIR = "data/cleaned"
TEXT_DIR = "data/text"

# Create text folder if it doesn't exist
Path(TEXT_DIR).mkdir(parents=True, exist_ok=True)

# Extract text from PDF
def extract_pdf(file_path):
    reader = PdfReader(file_path)
    text = ""
    for page in reader.pages:
        text += page.extract_text() or ""
    return text

# Extract text from DOCX
def extract_docx(file_path):
    doc = Document(file_path)
    return "\n".join([p.text for p in doc.paragraphs])

# Parse all cleaned CVs
for filename in os.listdir(CLEAN_DIR):
    file_path = os.path.join(CLEAN_DIR, filename)
    text = ""
    if filename.lower().endswith(".pdf"):
        text = extract_pdf(file_path)
    elif filename.lower().endswith(".docx"):
        text = extract_docx(file_path)
    else:
        continue  # skip unsupported files

    # Save extracted text
    text_file = os.path.join(TEXT_DIR, filename + ".txt")
    with open(text_file, "w", encoding="utf-8") as f:
        f.write(text)

print(f"Parsing done. Text files saved in {TEXT_DIR}")


Parsing done. Text files saved in data/text


In [3]:
import os
import json
import re
from pathlib import Path

TEXT_DIR = "data/text"
JSON_DIR = "data/json"

Path(JSON_DIR).mkdir(parents=True, exist_ok=True)

def extract_education(text):
    education = []
    # Simple regex examples
    edu_matches = re.findall(r"(BSc|MSc|PhD|Bachelor|Master|Doctor).+?([0-9]{4})\s*-\s*([0-9]{4})", text)
    for match in edu_matches:
        degree, start, end = match
        education.append({
            "degree": degree,
            "field": "",  # could be parsed further
            "university": "",
            "country": "",
            "start": int(start),
            "end": int(end),
            "gpa": None,
            "scale": None
        })
    return education

def extract_experience(text):
    experience = []
    exp_matches = re.findall(r"(\w.+?),\s*(\w.+?),\s*([0-9]{4}).*?([0-9]{4})", text)
    for match in exp_matches:
        title, org, start, end = match
        experience.append({
            "title": title,
            "org": org,
            "start": int(start),
            "end": int(end),
            "duration_months": (int(end)-int(start))*12,
            "domain": ""
        })
    return experience

def extract_publications(text):
    publications = []
    pub_matches = re.findall(r'"(.+?)",\s*(.+?),\s*([0-9]{4})', text)
    for match in pub_matches:
        title, venue, year = match
        publications.append({
            "title": title,
            "venue": venue,
            "year": int(year),
            "type": "conference",
            "authors": [],
            "author_position": None,
            "journal_if": None,
            "domain": ""
        })
    return publications

def extract_awards(text):
    awards = []
    award_matches = re.findall(r'Best Paper Award|Awarded', text)
    for match in award_matches:
        awards.append({
            "title": match,
            "issuer": "",
            "year": None,
            "type": "award"
        })
    return awards

for filename in os.listdir(TEXT_DIR):
    file_path = os.path.join(TEXT_DIR, filename)
    with open(file_path, "r", encoding="utf-8") as f:
        text = f.read()

    data = {
        "education": extract_education(text),
        "experience": extract_experience(text),
        "publications": extract_publications(text),
        "awards": extract_awards(text)
    }

    json_file = os.path.join(JSON_DIR, filename.replace(".txt", ".json"))
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2)

print(f"JSON extraction done. Files saved in {JSON_DIR}")


JSON extraction done. Files saved in data/json


In [7]:
! pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.2.1-py3-none-any.whl.metadata (25 kB)
Downloading python_dotenv-1.2.1-py3-none-any.whl (21 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.2.1


In [12]:
import os
import json
import re
from pathlib import Path
from google import genai
from google.genai import types
import dotenv
dotenv.load_dotenv()
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
# ---------------------- CONFIG ----------------------
TEXT_DIR = "data/text"
JSON_DIR = "data/json"
RAW_DIR = "data/raw"  # Store raw LLM responses
Path(JSON_DIR).mkdir(parents=True, exist_ok=True)
Path(RAW_DIR).mkdir(parents=True, exist_ok=True)

# Initialize Gemini client
client = genai.Client(api_key=GOOGLE_API_KEY)

# ---------------------- FUNCTION ----------------------
def extract_json_with_gemini(text):
    """
    Send CV text to Gemini LLM and get structured JSON safely.
    Returns (parsed_json, raw_response_text)
    """
    prompt = f"""
    Extract structured CV information in the following JSON schema:
    {{
      "education": [{{"degree":"","field":"","university":"","country":"","start":null,"end":null,"gpa":null,"scale":null}}],
      "experience": [{{"title":"","org":"","start":null,"end":null,"duration_months":null,"domain":""}}],
      "publications": [{{"title":"","venue":"","year":null,"type":"","authors":[],"author_position":null,"journal_if":null,"domain":""}}],
      "awards": [{{"title":"","issuer":"","year":null,"type":""}}]
    }}

    CV Text:
    {text}

    Return **only valid JSON**. Do not add any explanation or extra text.
    """

    # Call Gemini model
    resp = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt,
        config=types.GenerateContentConfig(temperature=0)
    )

    raw_text = resp.text.strip()

    # Save raw response for auditing
    raw_text_cleaned = raw_text.replace("\ufeff", "")  # remove BOM if any

    # Attempt to parse JSON safely
    parsed_json = None
    if raw_text_cleaned:
        try:
            parsed_json = json.loads(raw_text_cleaned)
        except json.JSONDecodeError:
            # Fix common issues: trailing commas, extra text
            first_brace = raw_text_cleaned.find("{")
            last_brace = raw_text_cleaned.rfind("}")
            if first_brace != -1 and last_brace != -1:
                raw_text_cleaned = raw_text_cleaned[first_brace:last_brace+1]
            # remove trailing commas
            raw_text_cleaned = re.sub(r',\s*([}\]])', r'\1', raw_text_cleaned)
            try:
                parsed_json = json.loads(raw_text_cleaned)
            except json.JSONDecodeError:
                # Fallback to empty schema
                parsed_json = {"education": [], "experience": [], "publications": [], "awards": []}

    else:
        parsed_json = {"education": [], "experience": [], "publications": [], "awards": []}

    return parsed_json, raw_text

# ---------------------- MAIN LOOP ----------------------
for filename in os.listdir(TEXT_DIR):
    if not filename.endswith(".txt"):
        continue
    file_path = os.path.join(TEXT_DIR, filename)
    with open(file_path, "r", encoding="utf-8") as f:
        text = f.read()

    # Extract JSON using Gemini
    structured_json, raw_resp = extract_json_with_gemini(text)

    # Save raw response
    raw_file = os.path.join(RAW_DIR, filename.replace(".txt", "_raw.txt"))
    with open(raw_file, "w", encoding="utf-8") as f:
        f.write(raw_resp)

    # Save cleaned JSON
    json_file = os.path.join(JSON_DIR, filename.replace(".txt", ".json"))
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump(structured_json, f, indent=2)

print(f"Gemini JSON extraction done. Raw responses in {RAW_DIR}, cleaned JSON in {JSON_DIR}")


Gemini JSON extraction done. Raw responses in data/raw, cleaned JSON in data/json


With regex sections

In [None]:
import re

def extract_sections(text):
    """
    Extract main CV sections using regex.
    Returns a dictionary with keys: 'education', 'experience', 'publications', 'awards'
    """
    sections = {'education': '', 'experience': '', 'publications': '', 'awards': ''}
    
    # Define regex patterns for section headers (case-insensitive)
    patterns = {
        'education': r"(Education|Academic Background|Academic Qualifications)(.*?)(?=\n[A-Z][a-zA-Z ]{2,}:|\Z)",
        'experience': r"(Experience|Work History|Employment)(.*?)(?=\n[A-Z][a-zA-Z ]{2,}:|\Z)",
        'publications': r"(Publications|Research Papers|Articles)(.*?)(?=\n[A-Z][a-zA-Z ]{2,}:|\Z)",
        'awards': r"(Awards|Honors|Achievements)(.*?)(?=\n[A-Z][a-zA-Z ]{2,}:|\Z)"
    }
    
    for section, pattern in patterns.items():
        match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
        if match:
            sections[section] = match.group(2).strip()  # only the content, not the header

    return sections


In [None]:
def extract_json_with_gemini_sections(sections):
    """
    Send only relevant CV sections to Gemini LLM.
    """
    prompt = f"""
    Extract structured CV information in the following JSON schema:
    {{
      "education": [{{"degree":"","field":"","university":"","country":"","start":null,"end":null,"gpa":null,"scale":null}}],
      "experience": [{{"title":"","org":"","start":null,"end":null,"duration_months":null,"domain":""}}],
      "publications": [{{"title":"","venue":"","year":null,"type":"","authors":[],"author_position":null,"journal_if":null,"domain":""}}],
      "awards": [{{"title":"","issuer":"","year":null,"type":""}}]
    }}

    Only use the following CV content. Do not invent information:
    Education: {sections['education']}
    Experience: {sections['experience']}
    Publications: {sections['publications']}
    Awards: {sections['awards']}

    Return only valid JSON. Do not add explanations or extra text.
    """

    resp = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=prompt,
        config=types.GenerateContentConfig(temperature=0)
    )

    raw_text = resp.text.strip()
    
    # Safe JSON parsing
    import re
    try:
        data = json.loads(raw_text)
    except json.JSONDecodeError:
        first_brace = raw_text.find("{")
        last_brace = raw_text.rfind("}")
        if first_brace != -1 and last_brace != -1:
            raw_text = raw_text[first_brace:last_brace+1]
        raw_text = re.sub(r',\s*([}\]])', r'\1', raw_text)
        try:
            data = json.loads(raw_text)
        except:
            data = {"education": [], "experience": [], "publications": [], "awards": []}
    
    return data, raw_text


In [None]:
for filename in os.listdir(TEXT_DIR):
    if not filename.endswith(".txt"):
        continue
    file_path = os.path.join(TEXT_DIR, filename)
    with open(file_path, "r", encoding="utf-8") as f:
        text = f.read()

    # 1️⃣ Extract relevant sections using regex
    sections = extract_sections(text)

    # 2️⃣ Send sections to Gemini
    structured_json, raw_resp = extract_json_with_gemini_sections(sections)

    # 3️⃣ Save raw response
    raw_file = os.path.join(RAW_DIR, filename.replace(".txt", "_raw.txt"))
    with open(raw_file, "w", encoding="utf-8") as f:
        f.write(raw_resp)

    # 4️⃣ Save structured JSON
    json_file = os.path.join(JSON_DIR, filename.replace(".txt", ".json"))
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump(structured_json, f, indent=2)
