In [5]:
import boto3
import time
import json
import logging
import os

In [7]:
# Set up logging if not already done
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [8]:
# --- CONFIGURATION ---
S3_BUCKET_NAME = "resume-parser-waijian-20250525"
# Let's target one specific folder first
S3_PREFIX = "resume/ACCOUNTANT/"
TEXTRACT_ROLE_ARN = 'arn:aws:iam::747549824523:role/TextractS3AccessRole'
AWS_REGION = "ap-southeast-1"
# --- END CONFIGURATION ---

In [9]:
# Create Boto3 clients
s3_client = boto3.client('s3', region_name=AWS_REGION)
textract_client = boto3.client('textract', region_name=AWS_REGION)

logging.info(f"Setup complete. Bucket: {S3_BUCKET_NAME}, Prefix: {S3_PREFIX}")

2025-05-26 23:54:48,480 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-05-26 23:54:48,736 - INFO - Setup complete. Bucket: resume-parser-waijian-20250525, Prefix: resume/ACCOUNTANT/


In [10]:
def list_s3_files(bucket, prefix, max_keys=10):
    """Lists files in an S3 bucket under a given prefix."""
    try:
        response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=max_keys)
        files = [item['Key'] for item in response.get('Contents', []) if item['Key'].lower().endswith('.pdf')]
        logging.info(f"Found {len(files)} PDFs in S3 under {prefix} (showing max {max_keys}).")
        return files
    except Exception as e:
        logging.error(f"Error listing S3 files: {e}")
        return []

pdf_files_to_process = list_s3_files(S3_BUCKET_NAME, S3_PREFIX, max_keys=5) # Let's start with 5
print("Files to process:")
print(pdf_files_to_process)

2025-05-26 23:54:51,218 - INFO - Found 5 PDFs in S3 under resume/ACCOUNTANT/ (showing max 5).


Files to process:
['resume/ACCOUNTANT/10554236.pdf', 'resume/ACCOUNTANT/10674770.pdf', 'resume/ACCOUNTANT/11163645.pdf', 'resume/ACCOUNTANT/11759079.pdf', 'resume/ACCOUNTANT/12065211.pdf']


In [11]:
def start_textract_job(bucket, document_key):
    """Starts an asynchronous Textract job."""
    if not document_key:
        logging.error("No document key provided.")
        return None

    logging.info(f"Starting Textract job for: {document_key}")
    try:
        response = textract_client.start_document_text_detection(
            DocumentLocation={
                'S3Object': {
                    'Bucket': bucket,
                    'Name': document_key
                }
            }
        )
        job_id = response['JobId']
        logging.info(f"Started Job with ID: {job_id}")
        return job_id
    except Exception as e:
        logging.error(f"Error starting Textract job for {document_key}: {e}")
        return None

# Start a job for the *first* PDF in our list (if any)
current_job_id = None
if pdf_files_to_process:
    first_pdf = pdf_files_to_process[0]
    current_job_id = start_textract_job(S3_BUCKET_NAME, first_pdf)
else:
    logging.warning("No PDFs found to process.")

2025-05-26 23:54:53,997 - INFO - Starting Textract job for: resume/ACCOUNTANT/10554236.pdf
2025-05-26 23:54:54,463 - INFO - Started Job with ID: c37f258c97e986fe107fa8ae5b44a3143a86f3055c27f79fa44d13308b761541


In [12]:
def check_textract_job_status(job_id):
    """Checks the status of a Textract job."""
    if not job_id:
        logging.error("No Job ID provided.")
        return None

    logging.info(f"Checking status for Job ID: {job_id}")
    try:
        response = textract_client.get_document_text_detection(JobId=job_id)
        status = response['JobStatus']
        logging.info(f"Current Status: {status}")
        return status
    except Exception as e:
        logging.error(f"Error checking job status {job_id}: {e}")
        return "FAILED" # Treat errors as failures

def wait_for_job_completion(job_id, delay=5, timeout=300):
    """Waits for a Textract job to complete by polling."""
    if not job_id: return False

    start_time = time.time()
    while time.time() - start_time < timeout:
        status = check_textract_job_status(job_id)
        if status == 'SUCCEEDED':
            logging.info(f"Job {job_id} SUCCEEDED!")
            return True
        elif status in ['FAILED', 'PARTIAL_SUCCESS']:
            logging.error(f"Job {job_id} finished with status: {status}")
            return False
        
        logging.info(f"Waiting for {delay} seconds...")
        time.sleep(delay)

    logging.error(f"Job {job_id} timed out after {timeout} seconds.")
    return False


# Wait for the job we started (if any)
job_succeeded = False
if current_job_id:
    job_succeeded = wait_for_job_completion(current_job_id)
else:
    logging.warning("No job was started.")

2025-05-26 23:54:56,889 - INFO - Checking status for Job ID: c37f258c97e986fe107fa8ae5b44a3143a86f3055c27f79fa44d13308b761541
2025-05-26 23:54:56,904 - INFO - Current Status: IN_PROGRESS
2025-05-26 23:54:56,905 - INFO - Waiting for 5 seconds...
2025-05-26 23:55:01,910 - INFO - Checking status for Job ID: c37f258c97e986fe107fa8ae5b44a3143a86f3055c27f79fa44d13308b761541
2025-05-26 23:55:02,152 - INFO - Current Status: SUCCEEDED
2025-05-26 23:55:02,153 - INFO - Job c37f258c97e986fe107fa8ae5b44a3143a86f3055c27f79fa44d13308b761541 SUCCEEDED!


In [13]:
def get_textract_results(job_id):
    """Retrieves all pages of results for a completed Textract job."""
    if not job_id: return None
    
    logging.info(f"Retrieving results for Job ID: {job_id}")
    all_blocks = []
    next_token = None
    
    try:
        while True:
            if next_token:
                response = textract_client.get_document_text_detection(JobId=job_id, NextToken=next_token)
            else:
                response = textract_client.get_document_text_detection(JobId=job_id)
            
            blocks = response.get('Blocks', [])
            all_blocks.extend(blocks)
            
            next_token = response.get('NextToken')
            if not next_token:
                break # No more pages
                
        logging.info(f"Retrieved {len(all_blocks)} blocks for Job ID: {job_id}")
        return all_blocks
        
    except Exception as e:
        logging.error(f"Error retrieving results for {job_id}: {e}")
        return None

def extract_text_from_blocks(blocks):
    """Extracts plain text (lines) from Textract blocks."""
    if not blocks: return ""
    
    lines = []
    for block in blocks:
        if block['BlockType'] == 'LINE':
            lines.append(block['Text'])
            
    return "\n".join(lines)

# Get and process results if the job succeeded
if job_succeeded and current_job_id:
    textract_blocks = get_textract_results(current_job_id)
    
    if textract_blocks:
        # Optional: Save the full JSON for inspection
        # with open(f'{current_job_id}.json', 'w') as f:
        #     json.dump(textract_blocks, f, indent=4)
        # logging.info(f"Saved full JSON to {current_job_id}.json")

        # Extract and print the plain text
        extracted_text = extract_text_from_blocks(textract_blocks)
        print("\n--- EXTRACTED TEXT (First 1000 chars) ---")
        print(extracted_text[:1000])
        print("----------------------------------------")
        
    else:
        logging.error("Failed to retrieve Textract blocks.")
else:
    logging.warning("Job did not succeed or no job was run. Cannot get results.")

2025-05-26 23:55:07,556 - INFO - Retrieving results for Job ID: c37f258c97e986fe107fa8ae5b44a3143a86f3055c27f79fa44d13308b761541
2025-05-26 23:55:08,339 - INFO - Retrieved 3732 blocks for Job ID: c37f258c97e986fe107fa8ae5b44a3143a86f3055c27f79fa44d13308b761541



--- EXTRACTED TEXT (First 1000 chars) ---
ACCOUNTANT
Summary
Financial Accountant specializing in financial planning, reporting and analysis within the Department of Defense.
Highlights
Account reconciliations
Results-oriented
Accounting operations professional
Financial reporting
Analysis of financial systems
Critical thinking
ERP (Enterprise Resource Planning) software.
Excellent facilitator
Accomplishments
Served on a tiger team which identified and resolved General Ledger postings in DEAMS totaling $360B in accounting adjustments. This allowed
for the first successful fiscal year-end close for 2012.
In collaboration with DFAS Europe, developed an automated tool that identified duplicate obligations. This tool allowed HQ USAFE to
deobligate over $5M in duplicate obligations.
Experience
Company Name July 2011 to November 2012 Accountant
City, State
Enterprise Resource Planning Office (ERO)
In this position as an Accountant assigned to the Defense Enterprise Accounting and Management

In [20]:
# Create a Boto3 client for Comprehend
comprehend_client = boto3.client('comprehend', region_name=AWS_REGION)

def find_entities_comprehend(text):
    """Uses AWS Comprehend to find named entities."""
    if not text:
        return []

    # Comprehend has a 5000 byte limit per call.
    # We should handle larger texts, but for a start, let's truncate.
    # A better approach would be to split the text.
    text_to_process = text[:4900] # Stay safely under the limit

    logging.info("Calling AWS Comprehend DetectEntities...")
    try:
        response = comprehend_client.detect_entities(
            Text=text_to_process,
            LanguageCode='en' # Assuming English resumes
        )
        entities = response.get('Entities', [])
        logging.info(f"Comprehend found {len(entities)} entities.")
        return entities
    except Exception as e:
        logging.error(f"Error calling AWS Comprehend: {e}")
        return []

# Find entities using Comprehend
if 'extracted_text' in locals():
    comprehend_entities = find_entities_comprehend(extracted_text)
    print("\n--- Found Entities (AWS Comprehend) ---")
    # Print a summary: Text, Type, Score
    for entity in comprehend_entities:
        print(f"  - Text: {entity['Text']}, Type: {entity['Type']}, Score: {entity['Score']:.2f}")
    print("---------------------------------------")
else:
    print("Cannot perform Comprehend search: 'extracted_text' not found.")

2025-05-27 00:26:15,355 - INFO - Calling AWS Comprehend DetectEntities...
2025-05-27 00:26:15,506 - INFO - Comprehend found 64 entities.



--- Found Entities (AWS Comprehend) ---
  - Text: Department of Defense, Type: ORGANIZATION, Score: 0.98
  - Text: DEAMS, Type: ORGANIZATION, Score: 0.63
  - Text: $360B, Type: QUANTITY, Score: 1.00
  - Text: first, Type: QUANTITY, Score: 0.86
  - Text: fiscal year-end, Type: DATE, Score: 0.73
  - Text: 2012, Type: DATE, Score: 0.97
  - Text: DFAS Europe, Type: ORGANIZATION, Score: 0.99
  - Text: HQ, Type: ORGANIZATION, Score: 0.86
  - Text: USAFE, Type: ORGANIZATION, Score: 0.79
  - Text: over $5M, Type: QUANTITY, Score: 0.97
  - Text: July 2011, Type: DATE, Score: 1.00
  - Text: November 2012, Type: DATE, Score: 1.00
  - Text: State
Enterprise Resource Planning Office, Type: ORGANIZATION, Score: 0.84
  - Text: ERO, Type: ORGANIZATION, Score: 0.82
  - Text: Defense Enterprise Accounting and Management System, Type: ORGANIZATION, Score: 0.93
  - Text: DEAMS, Type: ORGANIZATION, Score: 0.77
  - Text: ERO, Type: ORGANIZATION, Score: 0.60
  - Text: DEAMS, Type: ORGANIZATION, Score: 0.80


In [16]:
SKILL_KEYWORDS = [
    # Programming Languages
    'python', 'java', 'c++', 'c#', 'javascript', 'typescript', 'php', 'ruby', 'go', 'swift', 'kotlin', 'sql', 'pl/sql', 'scala',
    # Web Frameworks/Libraries
    'react', 'angular', 'vue', 'django', 'flask', 'spring', 'node.js', 'jquery', 'bootstrap', '.net',
    # Databases
    'mysql', 'postgresql', 'mongodb', 'redis', 'oracle', 'sql server', 'sqlite', 'cassandra', 'dynamodb',
    # Cloud/AWS
    'aws', 'azure', 'gcp', 'amazon web services', 'google cloud platform', 's3', 'ec2', 'lambda', 'rds', 'eks', 'ecs', 'textract', 'comprehend', 'sagemaker',
    # ML/Data Science
    'machine learning', 'data science', 'deep learning', 'nlp', 'natural language processing', 'pandas', 'numpy', 'scikit-learn', 'tensorflow', 'pytorch', 'keras', 'matplotlib', 'seaborn', 'spark', 'hadoop', 'airflow', 'mlflow',
    # DevOps/Tools
    'docker', 'kubernetes', 'jenkins', 'git', 'github', 'gitlab', 'ansible', 'terraform', 'ci/cd', 'jira',
    # OS
    'linux', 'windows', 'macos',
    # Methodologies
    'agile', 'scrum', 'kanban',
    # Other
    'api', 'rest', 'graphql', 'microservices', 'statistics', 'operations research', 'data analysis', 'etl', 'power bi', 'tableau'
]

In [17]:
def find_skills_keyword_based(text, skills_list):
    """Finds skills from a list in the given text."""
    found_skills = set() # Use a set to avoid duplicates

    if not text:
        return []

    # Convert text to lowercase for case-insensitive matching
    text_lower = text.lower()

    for skill in skills_list:
        # Check if the skill (as a whole word or phrase) is present.
        # We can use simple 'in' for now, but regex (word boundaries) would be more robust.
        if skill in text_lower:
            # We add the original cased skill for better presentation if needed,
            # but for consistency, let's add the lowercase version we searched for.
            found_skills.add(skill)

    return list(found_skills)

# Find skills in our extracted text
if 'extracted_text' in locals():
    found_skills_rule_based = find_skills_keyword_based(extracted_text, SKILL_KEYWORDS)
    print("\n--- Found Skills (Rule-Based) ---")
    print(found_skills_rule_based)
    print(f"Found {len(found_skills_rule_based)} skills.")
    print("---------------------------------")
else:
    print("Cannot perform rule-based search: 'extracted_text' not found.")


--- Found Skills (Rule-Based) ---
['rest', 'aws', 'rds', 'api', 'go']
Found 5 skills.
---------------------------------


In [21]:
def combine_results(rule_skills, comp_entities, exclude_types=None):
    """
    Combines skills from rule-based matching and entities from Comprehend,
    optionally excluding certain Comprehend types.
    """
    if exclude_types is None:
        exclude_types = ['PERSON', 'LOCATION', 'DATE', 'ORGANIZATION', 'QUANTITY']

    # Start with rule-based skills, converted to lowercase
    combined_set = set([skill.lower() for skill in rule_skills])

    # Process Comprehend entities
    for entity in comp_entities:
        # If the entity type is not in our exclusion list...
        if entity['Type'] not in exclude_types:
            # ...add its text (converted to lowercase) to the set.
            combined_set.add(entity['Text'].lower())

    return list(combined_set)

# --- Ensure we have the results before combining ---
# Make sure 'found_skills_rule_based' exists
if 'found_skills_rule_based' not in locals():
    found_skills_rule_based = []
    print("Warning: 'found_skills_rule_based' not found, using empty list.")

# Make sure 'comprehend_entities' exists
if 'comprehend_entities' not in locals():
    comprehend_entities = []
    print("Warning: 'comprehend_entities' not found, using empty list.")
# --- End Ensure ---


# Perform the combination
combined_keywords = combine_results(found_skills_rule_based, comprehend_entities)

print("\n--- Combined & Filtered Keywords/Entities ---")
print(combined_keywords)
print(f"Found {len(combined_keywords)} unique combined keywords/entities.")
print("---------------------------------------------")

# You might also want to keep the *full* Comprehend results for the UI,
# but for a 'skill list', this 'combined_keywords' is a good start.
# We can store the full results too:
full_details = {
    "RuleBasedSkills": found_skills_rule_based,
    "ComprehendEntities": comprehend_entities,
    "CombinedKeywords": combined_keywords,
    "RawText": extracted_text[:2000] # Store a snippet or full text
}

# print("\n--- Full Details (Sample) ---")
# print(json.dumps(full_details, indent=2, default=str)) # Use default=str for any non-serializable types


--- Combined & Filtered Keywords/Entities ---
['rest', 'aws', 'rds', 'control program', 'api', 'go']
Found 6 unique combined keywords/entities.
---------------------------------------------


In [22]:
import mlflow
import tempfile # We'll need this to save artifacts temporarily

# Set an experiment name. If it doesn't exist, MLflow creates it.
mlflow.set_experiment("Resume_Processing_Textract")

logging.info(f"MLflow tracking URI: {mlflow.get_tracking_uri()}")
logging.info(f"MLflow experiment set to 'Resume_Processing_Textract'")

2025/05/27 00:49:26 INFO mlflow.tracking.fluent: Experiment with name 'Resume_Processing_Textract' does not exist. Creating a new experiment.
2025-05-27 00:49:26,056 - INFO - MLflow tracking URI: file:///home/waijianlim/resume_parser_project/notebooks/mlruns
2025-05-27 00:49:26,056 - INFO - MLflow experiment set to 'Resume_Processing_Textract'


In [24]:
def process_resume_and_log(s3_bucket, s3_key, role_arn, skills_list, exclude_types):
    """
    Processes a single resume PDF from S3 using Textract, extracts skills,
    and logs parameters, metrics, and artifacts to MLflow.
    Returns True if successful, False otherwise.
    """
    logging.info(f"--- Starting MLflow run for: {s3_key} ---")

    # Start an MLflow run. Everything inside 'with' gets logged to this run.
    with mlflow.start_run():

        # 1. Log Parameters
        logging.info("Logging parameters...")
        mlflow.log_param("s3_bucket", s3_bucket)
        mlflow.log_param("s3_key", s3_key)
        mlflow.log_param("textract_role_arn", role_arn)
        mlflow.log_param("comprehend_exclude_types", str(exclude_types))
        mlflow.log_param("num_skill_keywords", len(skills_list))

        # 2. Run Textract (Start & Poll)
        job_id = start_textract_job(s3_bucket, s3_key)
        if not job_id:
            logging.error("Failed to start Textract job. Ending run.")
            mlflow.log_metric("status", 0) # Log 0 for failure
            return False

        mlflow.log_param("textract_job_id", job_id)
        job_succeeded = wait_for_job_completion(job_id)

        if not job_succeeded:
            logging.error("Textract job did not succeed. Ending run.")
            mlflow.log_metric("status", 0)
            return False

        # 3. Get Textract Results & Extract Text
        textract_blocks = get_textract_results(job_id)
        if not textract_blocks:
            logging.error("Failed to get Textract results. Ending run.")
            mlflow.log_metric("status", 0)
            return False

        extracted_text = extract_text_from_blocks(textract_blocks)

        # 4. Perform Skill Extraction (Rule-Based & Comprehend)
        found_skills_rule_based = find_skills_keyword_based(extracted_text, skills_list)
        comprehend_entities = find_entities_comprehend(extracted_text) # Assumes client exists
        combined_keywords = combine_results(found_skills_rule_based, comprehend_entities, exclude_types)

        # 5. Log Metrics
        logging.info("Logging metrics...")
        mlflow.log_metric("text_length_chars", len(extracted_text))
        mlflow.log_metric("num_textract_blocks", len(textract_blocks))
        mlflow.log_metric("num_rule_based_skills", len(found_skills_rule_based))
        mlflow.log_metric("num_comprehend_entities", len(comprehend_entities))
        mlflow.log_metric("num_combined_keywords", len(combined_keywords))
        mlflow.log_metric("status", 1) # Log 1 for success

        # 6. Log Artifacts
        logging.info("Logging artifacts...")
        with tempfile.TemporaryDirectory() as tmpdir:
            # Save extracted text
            text_path = os.path.join(tmpdir, "extracted_text.txt")
            with open(text_path, "w", encoding='utf-8') as f:
                f.write(extracted_text)
            mlflow.log_artifact(text_path, "extracted_content")

            # Save combined keywords
            keywords_path = os.path.join(tmpdir, "combined_keywords.json")
            with open(keywords_path, "w") as f:
                json.dump(combined_keywords, f, indent=4)
            mlflow.log_artifact(keywords_path, "extracted_content")

            # Save full Textract JSON (can be large!)
            # textract_path = os.path.join(tmpdir, "textract_output.json")
            # with open(textract_path, "w") as f:
            #     json.dump(textract_blocks, f, indent=4)
            # mlflow.log_artifact(textract_path, "raw_outputs")

            # Save skill list used
            skill_list_path = os.path.join(tmpdir, "skill_keywords_used.json")
            with open(skill_list_path, "w") as f:
                json.dump(skills_list, f, indent=4)
            mlflow.log_artifact(skill_list_path, "parameters")


        logging.info(f"--- MLflow run COMPLETED for: {s3_key} ---")
        return True

In [25]:
# Ensure we have our clients and lists defined from previous sections
# (s3_client, textract_client, comprehend_client, SKILL_KEYWORDS)

# Define the types to exclude for Comprehend
COMPREHEND_EXCLUDE = ['PERSON', 'LOCATION', 'DATE', 'ORGANIZATION', 'QUANTITY']

# Process the first PDF (if available)
if pdf_files_to_process:
    logging.info("\n>>> Processing first PDF...")
    process_resume_and_log(
        S3_BUCKET_NAME,
        pdf_files_to_process[0], # Process the first file
        TEXTRACT_ROLE_ARN,
        SKILL_KEYWORDS,
        COMPREHEND_EXCLUDE
    )

# Process the second PDF (if available)
if len(pdf_files_to_process) > 1:
    logging.info("\n>>> Processing second PDF...")
    process_resume_and_log(
        S3_BUCKET_NAME,
        pdf_files_to_process[1], # Process the second file
        TEXTRACT_ROLE_ARN,
        SKILL_KEYWORDS,
        COMPREHEND_EXCLUDE
    )
else:
    logging.info("Only one or zero PDFs found, not processing a second one.")

2025-05-27 00:57:27,378 - INFO - 
>>> Processing first PDF...
2025-05-27 00:57:27,379 - INFO - --- Starting MLflow run for: resume/ACCOUNTANT/10554236.pdf ---
2025-05-27 00:57:27,459 - INFO - Logging parameters...
2025-05-27 00:57:27,462 - INFO - Starting Textract job for: resume/ACCOUNTANT/10554236.pdf
2025-05-27 00:57:27,659 - INFO - Started Job with ID: 68dd7d3a6aec945de917cb0823bbca83cd3e5a9695a059b04a1efa3c955e6459
2025-05-27 00:57:27,661 - INFO - Checking status for Job ID: 68dd7d3a6aec945de917cb0823bbca83cd3e5a9695a059b04a1efa3c955e6459
2025-05-27 00:57:27,676 - INFO - Current Status: IN_PROGRESS
2025-05-27 00:57:27,677 - INFO - Waiting for 5 seconds...
2025-05-27 00:57:32,689 - INFO - Checking status for Job ID: 68dd7d3a6aec945de917cb0823bbca83cd3e5a9695a059b04a1efa3c955e6459
2025-05-27 00:57:32,702 - INFO - Current Status: IN_PROGRESS
2025-05-27 00:57:32,702 - INFO - Waiting for 5 seconds...
2025-05-27 00:57:37,714 - INFO - Checking status for Job ID: 68dd7d3a6aec945de917cb082