# 01 Data Collection — Raw Data Archive (v1.0.0)

## Purpose
Scrape LinkedIn job listings and extract raw metadata with maximum fidelity. Build immutable, reproducible datasets suitable for downstream preprocessing and analysis.

## Pipeline (v1.0.0 — Raw Only)
Frozen architecture with no parsing or normalization:
1. **Phase 1 — Listing Scrape**: Fetch job cards (title, company, location, link, company URL) with exponential backoff
2. **Phase 2 — Detail Extraction**: Visit each job page → extract full description (raw text + HTML), insight panel, salary, applicant count, easy apply flags, seniority/industry/function/type fields, embedded JSON-LD
3. **Phase 3 — Company Pages** *(optional)*: Visit company about pages (deduplicated) → extract about section (text + HTML), industry, size, HQ, type, specialties
4. **Phase 4 — Save Raw Data**: Immutable archive to `data/raw/jobs/` and `data/raw/companies/` with v1.0.0 schema

## Schema & Formats
- **Job Records**: Nested JSON with sections: scrape_metadata, job_identity, job_card_raw, job_page_raw (30+ fields), company_info, quality_tracking, hashing
- **Company Records**: Nested JSON with sections: company_identity, company_page_raw, hashing, timestamps, quality_tracking
- **Storage**: JSON files with content SHA-256 hashing for change detection
- **Versions**: `SCRAPER_VERSION="1.0.0"`, `RAW_SCHEMA_VERSION="1.0.0"` (frozen)

## IO
- **Input**: Search parameters (keywords, location, limit)
- **Output (Raw Jobs)**: `data/raw/linkedin_raw_jobs_<timestamp>.json`
- **Output (Raw Companies)**: `data/raw/linkedin_raw_companies_<timestamp>.json` 
- **Reports**: `outputs/tables/scrape_report_*.json`, `outputs/logs/scrape_*.log`

## Key Features
- **Exponential backoff** on 429 rate limits (2^attempt formula with randomization)
- **Content hashing** for deduplication and change detection (SHA-256)
- **Interim checkpointing** (saves every 10 records to prevent data loss)
- **Quality tracking** (high/medium/low extraction quality ratings)
- **Company deduplication** (one fetch per unique company, caching)
- **Frozen schema** (immutable v1.0.0 for reproducibility)


In [1]:
import sys, os

# Ensure the project root is on the path so `src.*` imports work
project_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

import pandas as pd
import json
import logging
from datetime import datetime

import pyarrow # For efficient Parquet handling (future)

# Project modules
from src.scraping.linkedin_scraper import LinkedInScraper
from src.scraping.raw_schema_v1 import SCRAPER_VERSION, RAW_SCHEMA_VERSION
from src.pipeline import (
    setup_logging,
    run_pipeline,
    save_scrape_report,
)

# Ensure output directories exist
for d in ["../data/raw", "../data/raw/jobs", "../data/raw/companies", 
          "../data/raw/crawl_logs", "../outputs/logs", "../outputs/tables"]:
    os.makedirs(d, exist_ok=True)

print(f"Scraper version: {SCRAPER_VERSION}")
print(f"Raw schema version: {RAW_SCHEMA_VERSION}")
print(f"Pipeline: Raw-Only Mode (v1.0.0)")


Scraper version: 1.0.0
Raw schema version: 1.0.0
Pipeline: Raw-Only Mode (v1.0.0)


## Configuration
Set search parameters and scraping options. Keep `LIMIT` at 20–30 during development.


In [3]:
# ── Search keywords (select one per run) ──
KEYWORDS_LIST = [ 
    # Data Science / Core Roles
    "Data Scientist",            # 0
    "Machine Learning Engineer", # 1
    "AI Engineer",               # 2
    "Deep Learning Engineer",    # 3
    "Applied Data Scientist",    # 4
    "Research Scientist",        # 5
    "Data Analyst",              # 6
    "Business Intelligence Analyst", # 7
    # GenAI / LLM / Emerging AI
    "Generative AI Engineer",    # 8
    "LLM Engineer",              # 9
    "Prompt Engineer",           # 10
    "NLP Engineer",              # 11
    "Large Language Models Engineer", # 12
    "AI Researcher",             # 13
    # Data / Analytics Infrastructure
    "Data Engineer",             # 14
    "ML Ops Engineer",           # 15
    "MLOps Engineer",            # 16
    "AI Infrastructure Engineer", # 17
    "Data Platform Engineer",    # 18
    "ETL Engineer",              # 19
    # Specialized / Advanced Roles
    "Computer Vision Engineer",  # 20
    "Robotics Engineer",         # 21
    "Forecasting Analyst",       # 22
    "Quantitative Analyst",      # 23
    "Research Engineer",         # 24
    # Internships / Entry-Level
    "Data Science Intern",       # 25
    "ML Intern",                 # 26
    "AI Intern",                 # 27
    "Data Analyst Intern",       # 28
]

# Select keyword by index (0 = "Data Scientist", 1 = "Machine Learning Engineer", etc.)
KEYWORD_INDEX = 0
KEYWORDS = KEYWORDS_LIST[KEYWORD_INDEX]

# ── Other search parameters ──
LOCATION = ""          # Empty = worldwide
LIMIT = 100             # Keep at 20-30 for dev; scale later

# ── Scraping options ──
SCRAPE_COMPANY_PAGES = False   # Visit company about pages (deduplicated)

# ── Output paths (use project root for absolute path) ──
RAW_OUTPUT_DIR = os.path.join(project_root, "data/raw/jobs")

print(f"Selected keyword: '{KEYWORDS}' (index {KEYWORD_INDEX})")
print(f"Location: '{LOCATION or 'worldwide'}' | Limit: {LIMIT}")
print(f"Company pages: {SCRAPE_COMPANY_PAGES} | Raw Schema v1.0.0")

Selected keyword: 'Data Scientist' (index 0)
Location: 'worldwide' | Limit: 100
Company pages: False | Raw Schema v1.0.0


## Run Pipeline (v1.0.0 — Raw Only)
Execute the raw data collection pipeline. No parsing or normalization is performed.

**Process:**
1. **Phase 1**: Fetch job listings from LinkedIn search
2. **Phase 2**: Visit each job detail page → extract full raw fields
3. **Phase 3** (optional): Fetch company about pages if `SCRAPE_COMPANY_PAGES=True`
4. **Save**: Raw JSON with v1.0.0 schema to `data/raw/`

Output records include all raw field values, content hashing, extraction quality metrics, and metadata for reproducibility.


In [4]:
# Set working directory to project root for pipeline paths
os.chdir(project_root)

# ── Run Raw-Only Pipeline (v1.0.0) ──
df = run_pipeline(
    keywords=KEYWORDS,
    location=LOCATION,
    limit=LIMIT,
    raw_output_dir=RAW_OUTPUT_DIR,
    scrape_company_pages=SCRAPE_COMPANY_PAGES
)

if df is not None and not df.empty:
    print(f"\n{'='*60}")
    print(f"SUCCESS: {len(df)} raw job records captured (v1.0.0)")
    print(f"Columns: {len(df.columns)} raw fields")
    print(f"Location: {RAW_OUTPUT_DIR}/")
    print(f"{'='*60}")
else:
    print("Pipeline returned no data. Check logs in outputs/logs/")


2026-02-16 20:34:39,611 | INFO    | src.pipeline | TOP APPLICANT — Data Collection Pipeline v1.0.0
2026-02-16 20:34:39,612 | INFO    | src.pipeline | Raw Schema Version: 1.0.0
2026-02-16 20:34:39,612 | INFO    | src.pipeline | Scraper Version: 1.0.0
2026-02-16 20:34:39,613 | INFO    | src.pipeline | Search: 'Data Scientist' | Location: '' | Limit: 100
2026-02-16 20:34:39,615 | INFO    | src.pipeline | Mode: Raw Data Collection Only (No Parsing)
2026-02-16 20:34:39,619 | INFO    | src.scraping.linkedin_scraper | Loaded 0 existing job IDs to skip.
2026-02-16 20:34:39,620 | INFO    | src.scraping.linkedin_scraper | Phase 1: Scraping up to 100 listings for 'Data Scientist' in ''
2026-02-16 20:34:41,481 | INFO    | src.scraping.linkedin_scraper |   [1] Data Scientist Intern @ Tinder
2026-02-16 20:34:43,782 | INFO    | src.scraping.linkedin_scraper |   [2] Data Scientist II, Growth @ Tinder
2026-02-16 20:34:43,783 | INFO    | src.scraping.linkedin_scraper |   [3] Data Scientist (Entry-Level)


SUCCESS: 100 raw job records captured (v1.0.0)
Columns: 9 raw fields
Location: c:\top-applicant\data/raw/jobs/


## Inspect Raw Results
Quick validation of captured raw data and extraction quality metrics.


In [8]:
if df is not None and not df.empty:
    print("─── Raw Schema (v1.0.0) ───")
    print(f"Total records: {len(df)}")
    print(f"Total columns: {len(df.columns)}")
    
    # Show nested structure
    print("\nNested sections in each record:")
    nested_sections = [
        'scrape_metadata', 'job_identity', 'job_card_raw', 'job_page_raw',
        'company_info', 'quality_tracking', 'hashing'
    ]
    for section in nested_sections:
        if section in df.columns or (isinstance(df.iloc[0], dict) and section in df.iloc[0]):
            print(f"  • {section}")
    
    # Field coverage
    print("\n─── Field Coverage (Raw Fields) ───")
    coverage_cols = [col for col in df.columns if 'raw' in col or 'hash' in col]
    if coverage_cols:
        for col in coverage_cols[:15]:  # Show first 15
            non_null = df[col].notna().sum()
            pct = non_null / len(df) * 100
            print(f"  {col:40s} {non_null:3d}/{len(df)} ({pct:5.1f}%)")
    
    # Quality metrics
    print("\n─── Extraction Quality ───")
    if 'quality_tracking' in df.columns or any('extraction_quality' in str(col) for col in df.columns):
        try:
            # Try to extract quality data from nested structure
            quality_stats = {"high": 0, "medium": 0, "low": 0}
            for record in df.itertuples():
                if hasattr(record, 'quality_tracking') and isinstance(record.quality_tracking, dict):
                    qual = record.quality_tracking.get('extraction_quality', 'unknown')
                    if qual in quality_stats:
                        quality_stats[qual] += 1
            
            if sum(quality_stats.values()) > 0:
                total = sum(quality_stats.values())
                print(f"  High quality:   {quality_stats['high']:3d} ({quality_stats['high']/total*100:5.1f}%)")
                print(f"  Medium quality: {quality_stats['medium']:3d} ({quality_stats['medium']/total*100:5.1f}%)")
                print(f"  Low quality:    {quality_stats['low']:3d} ({quality_stats['low']/total*100:5.1f}%)")
        except:
            print("  (Quality metrics not available in current format)")
    
    # Sample record structure
    print("\n─── Sample Record Keys (Top Level) ───")
    sample = df.iloc[0]
    if isinstance(sample, dict):
        for k in list(sample.keys())[:10]:
            print(f"  • {k}")
    else:
        print("  (Inspect via DataFrame directly)")
else:
    print("No data available. Run the pipeline cell above first.")


─── Raw Schema (v1.0.0) ───
Total records: 100
Total columns: 9

Nested sections in each record:
  • scrape_metadata
  • job_identity
  • job_card_raw
  • job_page_raw
  • company_info
  • quality_tracking
  • hashing

─── Field Coverage (Raw Fields) ───
  raw_schema_version                       100/100 (100.0%)
  job_card_raw                             100/100 (100.0%)
  job_page_raw                             100/100 (100.0%)
  hashing                                  100/100 (100.0%)

─── Extraction Quality ───
  High quality:    55 ( 55.0%)
  Medium quality:  45 ( 45.0%)
  Low quality:      0 (  0.0%)

─── Sample Record Keys (Top Level) ───
  (Inspect via DataFrame directly)


## Data Quality & Next Steps
Summary of data collection and recommendations for preprocessing.


In [9]:
if df is not None and not df.empty:
    print("─── Data Collection Summary ───")
    print(f"Records collected: {len(df)}")
    print(f"Schema version: 1.0.0 (frozen)")
    print(f"Company pages scraped: {SCRAPE_COMPANY_PAGES}")
    
    print("\n─── Next Steps ───")
    print(f"1. Raw data saved to: {RAW_OUTPUT_DIR}/")
    print(f"2. Load with:")
    print(f"     import json")
    print(f"     # Find latest file in {RAW_OUTPUT_DIR}/")
    print(f"     with open('data/raw/linkedin_raw_jobs_<timestamp>.json') as f:")
    print(f"         raw_jobs = json.load(f)")
    print(f"3. Process raw fields using preprocessing pipelines")
    print(f"4. Content hashing enables change detection across runs")
    
    print("\n─── Raw Fields Available ───")
    print("Job records include:")
    print("  • Scrape metadata (timestamp, user agent, keywords)")
    print("  • Job identity (URL, hashed job ID)")
    print("  • Job card fields (title, company, location, posted date)")
    print("  • Full job page (description text + HTML, salary, applicants, criteria)")
    print("  • Embedded data (JSON-LD, job-specific JSON)")
    print("  • Extraction quality (high/medium/low, selector hits, retry count)")
    print("  • Content hashes (for deduplication and change tracking)")
    
    if SCRAPE_COMPANY_PAGES:
        print("\n  + Company records include:")
        print("    • Company metadata (identity, URLs)")
        print("    • About section (text + HTML)")
        print("    • Company facts (industry, size, HQ, type, specialties)")
        print("    • Timestamps (first/last seen)")
    
    print("\n─── Data Preservation ───")
    print("✓ All raw data is immutable (v1.0.0 schema frozen)")
    print("✓ Content hashing prevents accidental modifications")
    print("✓ Multiple runs can be compared for changes")
    print("✓ Archive in data/raw/ for historical tracking")
    
else:
    print("No data available. Run the pipeline cell to collect data.")


─── Data Collection Summary ───
Records collected: 100
Schema version: 1.0.0 (frozen)
Company pages scraped: False

─── Next Steps ───
1. Raw data saved to: c:\top-applicant\data/raw/jobs/
2. Load with:
     import json
     # Find latest file in c:\top-applicant\data/raw/jobs/
     with open('data/raw/linkedin_raw_jobs_<timestamp>.json') as f:
         raw_jobs = json.load(f)
3. Process raw fields using preprocessing pipelines
4. Content hashing enables change detection across runs

─── Raw Fields Available ───
Job records include:
  • Scrape metadata (timestamp, user agent, keywords)
  • Job identity (URL, hashed job ID)
  • Job card fields (title, company, location, posted date)
  • Full job page (description text + HTML, salary, applicants, criteria)
  • Embedded data (JSON-LD, job-specific JSON)
  • Extraction quality (high/medium/low, selector hits, retry count)
  • Content hashes (for deduplication and change tracking)

─── Data Preservation ───
✓ All raw data is immutable (v1.0