In [69]:
#final
import os
import re
import json
import csv
import fitz  # PyMuPDF for PDF text extraction
from pathlib import Path
import unicodedata

# Define paths
codebook_pdf = "anes/anes_timeseries_2024_userguidecodebook_20250219-2.pdf"
data_csv = "anes/anes_timeseries_2024_csv_20250219.csv"
output_dir = "respondents"

# Ensure the output directory exists
Path(output_dir).mkdir(exist_ok=True)

def clean_text(text):
    """Normalize text, remove extra spaces, fix encoding issues, and correct hyphenation."""
    text = unicodedata.normalize("NFKC", text)  # Normalize special characters
    text = re.sub(r'\s+', ' ', text).strip()  # Normalize spaces
    text = text.replace("\u2010", "").replace("\xad", "")  # Remove soft hyphens
    text = re.sub(r"(\w)- (\w)", r"\1\2", text)  # Fix words broken across lines by hard hyphens
    text = re.sub(r"\d+\s*CODEBOOK: VARIABLES.*", "", text)  # Remove unwanted text
    return text

def remove_prefixes(text):
    """Remove prefixes like PRE:, PAPER:, WEB:, POST:, RESTRICTED: from summaries."""
    return re.sub(r'^(PRE:|PAPER:|WEB:|POST:|RESTRICTED:|FTF:|CASI:|CAPI:|VIDEO:|PAPI:)\s*', '', text, flags=re.IGNORECASE).strip()

def extract_codebook_data(pdf_path):
    """Extracts variable codes, summaries, full questions, and possible answers from the ANES Codebook."""
    doc = fitz.open(pdf_path)
    text = "\n".join(page.get_text("text") for page in doc)

    # Updated regex pattern based on structure
    var_pattern = re.compile(
        r"(V\d{6})\s+PRE:\s*(.*?)\n(?:Question\s*(.*?)\n)?Value Labels\s*(.*?)\nSurvey Question", 
        re.DOTALL
    )

    questions = {}

    for match in var_pattern.finditer(text):
        var_code = match.group(1).strip()  # Extract variable code (e.g., V241063)
        summary = clean_text(remove_prefixes(match.group(2)))  # Extract summary (cleaned)
        question_text = clean_text(match.group(3)) if match.group(3) else None  # Extract full question (optional)

        # Extract possible answers (Value Labels section)
        answer_section = match.group(4).strip()
        answers = []
        for line in answer_section.split("\n"):
            ans_match = re.match(r"^(-?\d+)\.\s*(.*)$", line)  # Handle negative & normal values
            if ans_match:
                answers.append({"code": ans_match.group(1), "text": clean_text(ans_match.group(2))})

        # 🚀 **Fix the state response issue**
        if "Alabama" in answer_section and "2. Alaska" in answer_section:
            states = re.findall(r"(\d+)\.\s*([A-Za-z\s\(\)]+)", answer_section)
            answers = [{"code": state[0], "text": clean_text(state[1])} for state in states]

        # 🚀 **Modify FEELING Thermometer Questions**
        if "FEELING THERMOMETER" in summary:
            # Ensure that the instruction is added only once
            if "Rate between 0-100" not in question_text:
                question_text += " (Rate between 0-100)"
        
        # 🚀 **Skip invalid variables**
        if "-2. Data will be available in forthcoming release" in answer_section:
            continue
        if "RESTRICTED" in summary.upper() or "RESTRICTED" in answer_section.upper():
            continue
        if "OPEN-END RESPONSE" in summary.upper() or "OPEN-END RESPONSE" in answer_section.upper():
            continue
        if not question_text or question_text.lower() == "none":
            continue
        if len(answers) == 0:  # Skip if no valid answer choices
            continue

        # Store the extracted data
        questions[var_code] = {
            "summary": summary,
            "question_text": question_text,
            "possible_answers": answers
        }

    print(f"✅ Extracted {len(questions)} valid questions from the Codebook PDF.")
    return questions

def generate_json(data_csv, questions, output_dir):
    """Creates JSON files for each respondent from the CSV data."""
    with open(data_csv, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            respondent_id = row.get("V240001", "unknown").strip()
            respondent_record = {"respondent_id": respondent_id, "responses": []}

            for var_code, question_data in questions.items():
                if var_code in row:
                    response = row[var_code].strip()
                    response_text = next((ans["text"] for ans in question_data["possible_answers"] if ans["code"] == response), response)
                    response_text = clean_text(response_text) if response_text else None  # Fix encoding issues

                else:
                    response_text = None

                respondent_record["responses"].append({
                    "variable_code": var_code,
                    "question_summary": question_data["summary"],
                    "full_question_text": question_data["question_text"],
                    "possible_answers": question_data["possible_answers"],
                    "respondent_answer": response_text
                })

            out_path = os.path.join(output_dir, f"respondent_{respondent_id}.json")
            with open(out_path, 'w', encoding='utf-8') as jsonfile:
                json.dump(respondent_record, jsonfile, indent=2, ensure_ascii=False)  # Prevents \u2019 issues

    print(f"✅ Generated JSON files for respondents.")

# Execute pipeline
codebook_data = extract_codebook_data(codebook_pdf)
generate_json(data_csv, codebook_data, output_dir)


✅ Extracted 386 valid questions from the Codebook PDF.
✅ Generated JSON files for respondents.


In [70]:
import os
import re
import json
import csv
import fitz  # PyMuPDF for PDF text extraction
from pathlib import Path
import unicodedata

# Define paths
codebook_pdf = "anes/anes_timeseries_2024_userguidecodebook_20250219-2.pdf"
data_csv = "anes/anes_timeseries_2024_csv_20250219.csv"
output_dir = "respondents"

# Ensure the output directory exists
Path(output_dir).mkdir(exist_ok=True)

def clean_text(text):
    """Normalize text, remove extra spaces, fix encoding issues, and correct hyphenation."""
    text = unicodedata.normalize("NFKC", text)  # Normalize special characters
    text = re.sub(r'\s+', ' ', text).strip()  # Normalize spaces
    text = text.replace("\u2010", "").replace("\xad", "")  # Remove soft hyphens
    text = re.sub(r"(\w)- (\w)", r"\1\2", text)  # Fix words broken across lines by hard hyphens
    text = re.sub(r"\d+\s*CODEBOOK: VARIABLES.*", "", text)  # Remove unwanted text
    return text

def remove_prefixes(text):
    """Remove prefixes like PRE:, PAPER:, WEB:, POST:, RESTRICTED: from summaries."""
    return re.sub(r'^(PRE:|PAPER:|WEB:|POST:|RESTRICTED:|FTF:|CASI:|CAPI:|VIDEO:|PAPI:)\s*', '', text, flags=re.IGNORECASE).strip()

def extract_codebook_data(pdf_path):
    """Extracts variable codes, summaries, full questions, and possible answers from the ANES Codebook."""
    doc = fitz.open(pdf_path)
    text = "\n".join(page.get_text("text") for page in doc)

    # Updated regex pattern based on structure
    var_pattern = re.compile(
        r"(V\d{6})\s+PRE:\s*(.*?)\n(?:Question\s*(.*?)\n)?Value Labels\s*(.*?)\nSurvey Question", 
        re.DOTALL
    )

    questions = {}

    for match in var_pattern.finditer(text):
        var_code = match.group(1).strip()  # Extract variable code (e.g., V241063)
        summary = clean_text(remove_prefixes(match.group(2)))  # Extract summary (cleaned)
        question_text = clean_text(match.group(3)) if match.group(3) else None  # Extract full question (optional)

        # Extract possible answers (Value Labels section)
        answer_section = match.group(4).strip()
        answers = []
        for line in answer_section.split("\n"):
            ans_match = re.match(r"^(-?\d+)\.\s*(.*)$", line)  # Handle negative & normal values
            if ans_match:
                code, text = ans_match.group(1), clean_text(ans_match.group(2))
                
                # ✅ **Fix "Inapplicable" Code Issue**
                if text.lower() == "inapplicable":
                    code = "-1"

                answers.append({"code": code, "text": text})

        # ✅ **Fix the state response issue**
        if "Alabama" in answer_section and "2. Alaska" in answer_section:
            states = re.findall(r"(\d+)\.\s*([A-Za-z\s\(\)]+)", answer_section)
            answers = [{"code": state[0], "text": clean_text(state[1])} for state in states]

            # ✅ **Ensure "Inapplicable" is correctly assigned -1**
            answers.insert(0, {"code": "-1", "text": "Inapplicable"})

        # ✅ **Modify FEELING Thermometer Questions**
        if "FEELING THERMOMETER" in summary:
            # Ensure that the instruction is added only once
            if "Rate between 0-100" not in question_text:
                question_text += " (Rate between 0-100)"
        
        # ✅ **Skip invalid variables**
        if "-2. Data will be available in forthcoming release" in answer_section:
            continue
        if "RESTRICTED" in summary.upper() or "RESTRICTED" in answer_section.upper():
            continue
        if "OPEN-END RESPONSE" in summary.upper() or "OPEN-END RESPONSE" in answer_section.upper():
            continue
        if not question_text or question_text.lower() == "none":
            continue
        if len(answers) == 0:  # Skip if no valid answer choices
            continue

        # Store the extracted data
        questions[var_code] = {
            "summary": summary,
            "question_text": question_text,
            "possible_answers": answers
        }

    print(f"✅ Extracted {len(questions)} valid questions from the Codebook PDF.")
    return questions

def generate_json(data_csv, questions, output_dir):
    """Creates JSON files for each respondent from the CSV data."""
    with open(data_csv, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            respondent_id = row.get("V240001", "unknown").strip()
            respondent_record = {"respondent_id": respondent_id, "responses": []}

            for var_code, question_data in questions.items():
                if var_code in row:
                    response = row[var_code].strip()
                    response_text = next((ans["text"] for ans in question_data["possible_answers"] if ans["code"] == response), response)
                    response_text = clean_text(response_text) if response_text else None  # Fix encoding issues

                else:
                    response_text = None

                respondent_record["responses"].append({
                    "variable_code": var_code,
                    "question_summary": question_data["summary"],
                    "full_question_text": question_data["question_text"],
                    "possible_answers": question_data["possible_answers"],
                    "respondent_answer": response_text
                })

            out_path = os.path.join(output_dir, f"respondent_{respondent_id}.json")
            with open(out_path, 'w', encoding='utf-8') as jsonfile:
                json.dump(respondent_record, jsonfile, indent=2, ensure_ascii=False)  # Prevents \u2019 issues

    print(f"✅ Generated JSON files for respondents.")

# Execute pipeline
codebook_data = extract_codebook_data(codebook_pdf)
generate_json(data_csv, codebook_data, output_dir)


✅ Extracted 386 valid questions from the Codebook PDF.
✅ Generated JSON files for respondents.
