In [1]:
%pip install pdfminer.six pandas

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.0.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
# Part 1: Import Libraries and Initialize Directories
import os
import re
import csv
import logging
from datetime import datetime
import pandas as pd

# Define base directory (aligned with 01_scraper.py)
try:
    BASE_DIR = os.path.dirname(os.getcwd())  # Parent of 'notebooks'
except NameError:
    BASE_DIR = os.getcwd()  # Fallback for interactive environments like Jupyter

# Define paths
PATH_OUTPUT = os.path.join(BASE_DIR, 'data', 'raw')  # Where text files are read from
PATH_CSV = os.path.join(BASE_DIR, 'data', 'processed', 'cases.csv')  # Where CSV output is saved
LOG_DIR = os.path.join(BASE_DIR, 'logs')
LOG_PATH = os.path.join(LOG_DIR, 'metadata_extraction.log')

# Validate path length for Windows
MAX_PATH_LENGTH = 260

def validate_path(path):
    if len(path) > MAX_PATH_LENGTH:
        raise ValueError(f"Path {path} exceeds Windows maximum length of {MAX_PATH_LENGTH} characters")
    return path

# Ensure directories exist
for path in [LOG_DIR, os.path.dirname(PATH_CSV), PATH_OUTPUT]:
    try:
        validate_path(path)
        os.makedirs(path, exist_ok=True)
        logging.info(f"Directory ensured: {path}")
    except ValueError as e:
        logging.error(f"Path validation failed: {e}")
        raise
    except Exception as e:
        logging.error(f"Failed to create directory {path}: {e}")
        raise

# Initialize logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(LOG_PATH, mode='w', encoding='utf-8'),  # Overwrite mode
        logging.StreamHandler()
    ],
    force=True
)
logging.info("Starting metadata extraction process at %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

# Part 2: Utility Functions
def clean_name(name):
    """Clean the extracted name by removing titles and extra spaces."""
    titles = [r'dr\.', r'drh\.', r'sh\.', r'mh\.', r'm\.h\.', r's\.h\.', r'prof\.', r'ir\.', r'hj\.', r'h\.']
    for title in titles:
        name = re.sub(title, '', name, flags=re.IGNORECASE)
    return ' '.join(name.split()).strip()

# Map Indonesian month names to English
month_map = {
    'januari': 'January',
    'februari': 'February',
    'maret': 'March',
    'april': 'April',
    'mei': 'May',
    'juni': 'June',
    'juli': 'July',
    'agustus': 'August',
    'september': 'September',
    'oktober': 'October',
    'november': 'November',
    'desember': 'December'
}

# Part 3: Metadata Extraction Function
def extract_metadata(text, file_name):
    metadata = {
        'case_id': file_name.replace('.txt', ''),
        'nomor_perkara': '',
        'tahun_putusan': '',
        'bulan_putusan': '',
        'tanggal_putusan': '',
        'jenis_perkara': '',
        'tingkat_pemeriksaan': '',
        'lembaga_peradilan': '',
        'pasal': '',
        'hakim_ketua': '',
        'ringkasan_fakta': '',
        'jumlah_kata_putusan': 0,
        'full_text': text
    }

    # Log a sample of the text to verify content
    logging.debug(f"Sample text for {file_name}: {text[:200]}")

    # Extract nomor_perkara with multiple patterns
    patterns = [
        r'(?:penetapan|putusan)\s+nomor\s*(\d+\s*pdt\s*p\s*\d{4})(?:\s*pn\s*\w+)?',
        r'(?:penetapan|putusan)\s+nomor\s*(\d+\s*pdt\s*g\s*\d{4})(?:\s*pa\s*\w+)?',
        r'(?:penetapan|putusan)\s+nomor\s*(\d+\s*pdt\s*\d{4})(?:\s*pt\s*\w+)?',
        r'(?:penetapan|putusan)\s+nomor\s*(\d+\s*(?:pk|k)\s*pid\s*sus\s*\d{4})',
        r'nomor\s*(\d+\s*(?:pk|k|pdt\s*p|pdt\s*g|pdt)\s*(?:pid\s*sus\s*)?\d{4})'
    ]
    for pattern in patterns:
        match = re.search(pattern, text, re.IGNORECASE)
        if match:
            metadata['nomor_perkara'] = match.group(1).replace('\s+', ' ').strip()
            logging.info(f"Extracted nomor_perkara from pattern: {metadata['nomor_perkara']} for {file_name}")
            break
    else:
        logging.warning(f"No nomor_perkara found in {file_name}. Sample text: {text[:200]}")

    # Extract case year from nomor_perkara for validation and tahun_putusan
    case_year = None
    case_year_match = re.search(r'(\d{4})$', metadata['nomor_perkara'])
    if case_year_match:
        case_year = int(case_year_match.group(1))
        metadata['tahun_putusan'] = str(case_year)
        logging.info(f"Extracted case year from nomor_perkara: {case_year} for {file_name}")

    # Extract tanggal/bulan/tahun putusan
    date_patterns = [
        r'(?:putusan|ditetapkan)\s+.*?tanggal\s+(\d{1,2}\s+\w+\s+\d{4})',
        r'tanggal\s+putusan\s+(\d{1,2}\s+\w+\s+\d{4})',
        r'tanggal\s+(\d{1,2}\s+\w+\s+\d{4})(?=\s*(?:diucapkan|ditetapkan|sidang))',
        r'(\d{1,2}\s+\w+\s+\d{4})(?=\s*(?:diucapkan|ditetapkan|sidang))',
        r'(\d{1,2}\s+\w+\s+\d{4})'
    ]
    date_formats = ['%d %B %Y', '%d %b %Y', '%d %m %Y']

    date_found = False
    for pattern in date_patterns:
        matches = re.finditer(pattern, text, re.IGNORECASE)
        for match in matches:
            full_date = match.group(1)
            for id_month, en_month in month_map.items():
                full_date = re.sub(rf'\b{id_month}\b', en_month, full_date, flags=re.IGNORECASE)
            for fmt in date_formats:
                try:
                    date_obj = datetime.strptime(full_date, fmt)
                    extracted_year = date_obj.year
                    if case_year and abs(extracted_year - case_year) > 5:
                        logging.warning(f"Extracted year {extracted_year} differs significantly from case year {case_year} in {file_name}. Using case year.")
                        continue
                    if extracted_year < 2000:
                        logging.warning(f"Extracted year {extracted_year} seems too early for {file_name}. Skipping.")
                        continue
                    metadata['tahun_putusan'] = str(case_year) if case_year else str(extracted_year)
                    metadata['bulan_putusan'] = date_obj.strftime('%B')
                    metadata['tanggal_putusan'] = str(date_obj.day)
                    date_found = True
                    logging.info(f"Extracted date: {full_date} (Year: {metadata['tahun_putusan']}, Month: {metadata['bulan_putusan']}, Day: {metadata['tanggal_putusan']}) for {file_name}")
                    break
                except ValueError:
                    continue
            if date_found:
                break
        if date_found:
            break
    if not date_found and case_year:
        logging.warning(f"No valid decision date found in {file_name}. Using case year {case_year} as tahun_putusan.")
        metadata['tahun_putusan'] = str(case_year)
    elif not date_found:
        logging.warning(f"No valid decision date or case year found in {file_name}.")

    # Jenis perkara
    if 'korupsi' in text.lower() or 'tindak pidana korupsi' in text.lower():
        metadata['jenis_perkara'] = 'Pidana Khusus Korupsi'
        logging.info(f"Set jenis_perkara to 'Pidana Khusus Korupsi' for {file_name}")
    elif 'pdt' in text.lower():
        metadata['jenis_perkara'] = 'Perdata'
        logging.info(f"Set jenis_perkara to 'Perdata' for {file_name}")
    else:
        logging.warning(f"No jenis_perkara identified in {file_name}.")

    # Tingkat pemeriksaan
    if 'peninjauan kembali' in text.lower():
        metadata['tingkat_pemeriksaan'] = 'Peninjauan Kembali'
    elif 'kasasi' in text.lower():
        metadata['tingkat_pemeriksaan'] = 'Kasasi'
    elif 'banding' in text.lower():
        metadata['tingkat_pemeriksaan'] = 'Banding'
    else:
        metadata['tingkat_pemeriksaan'] = 'Pertama'
    logging.info(f"Set tingkat_pemeriksaan to '{metadata['tingkat_pemeriksaan']}' for {file_name}")

    # Lembaga peradilan
    if 'mahkamah agung' in text.lower():
        metadata['lembaga_peradilan'] = 'Mahkamah Agung'
    elif 'pengadilan tinggi' in text.lower():
        metadata['lembaga_peradilan'] = 'Pengadilan Tinggi'
    elif 'pengadilan agama' in text.lower():
        metadata['lembaga_peradilan'] = 'Pengadilan Agama'
    elif 'pengadilan negeri' in text.lower():
        metadata['lembaga_peradilan'] = 'Pengadilan Negeri'
    else:
        metadata['lembaga_peradilan'] = 'Unknown'
    logging.info(f"Set lembaga_peradilan to '{metadata['lembaga_peradilan']}' for {file_name}")

    # Pasal
    match = re.search(r'(pasal\s+\d+(?:\s+ayat\s+\d+)?)\s+juncto\s+pasal\s+\d+\s+undang\s+undang\s+nomor\s+(\d+\s+tahun\s+\d{4})', text, re.IGNORECASE)
    if match:
        article = match.group(1).replace('\s+', ' ').strip()
        law = match.group(2).replace('\s+', ' ').strip()
        metadata['pasal'] = f"{article}, Undang-Undang Nomor {law}"
        logging.info(f"Extracted pasal: {metadata['pasal']} for {file_name}")
    else:
        match = re.search(r'(pasal\s+\d+(?:\s+ayat\s+\d+)?)', text, re.IGNORECASE)
        if match:
            metadata['pasal'] = match.group(1).replace('\s+', ' ').strip()
            logging.info(f"Extracted fallback pasal: {metadata['pasal']} for {file_name}")
        else:
            logging.warning(f"No pasal found in { file_name}.")

    # Hakim ketua
    match = re.search(r'ketua\s+majelis\s+([^\s][^,]+?)(?=\s+dan\s+[^\s].*?\s+hakim\s+hakim\s+anggota)', text, re.IGNORECASE)
    if match:
        metadata['hakim_ketua'] = clean_name(match.group(1))
        logging.info(f"Extracted hakim_ketua: {metadata['hakim_ketua']} for {file_name}")
    else:
        logging.warning(f"No hakim_ketua found in {file_name}")

    # Ringkasan fakta
    fact_patterns = [
        r'm\s*e\s*n\s*g\s*a\s*d\s*i\s*l\s*i\s+([^\s].*?)(?=\s*m\s*e\s*n\s*i\s*m\s*b\s*a\s*n\s*g|\s*d\s*e\s*m\s*i\s*k\s*i\s*a\s*n|\s*$)',
        r'(?:terdakwa|terpidana)\s+[^\s]+.*?didakwa\s+dengan\s+dakwaan\s+sebagai\s+berikut\s+([^\s].*?)(?=\s*m\s*e\s*n\s*i\s*m\s*b\s*a\s*n\s*g|\s*d\s*e\s*m\s*i\s*k\s*i\s*a\s*n|\s*$)',
        r'm\s*e\s*m\s*e\s*r\s*i\s*k\s*s\s*a\s+\s*p\s*e\s*r\s*k\s*a\s*r\s*a\s+([^\s].*?)(?=\s*m\s*e\s*n\s*i\s*m\s*b\s*a\s*n\s*g|\s*d\s*e\s*m\s*i\s*k\s*i\s*a\s*n|\s*$)',
        r'karena\s+didakwa\s+dengan\s+dakwaan\s+sebagai\s+berikut\s+([^\s].*?)(?=\s*m\s*e\s*n\s*i\s*m\s*b\s*a\s*n\s*g|\s*d\s*e\s*m\s*i\s*k\s*i\s*a\s*n|\s*$)'
    ]
    for idx, pattern in enumerate(fact_patterns, 1):
        match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
        if match:
            fact_text = ' '.join(match.group(1).split())
            if len(fact_text) > 50:
                metadata['ringkasan_fakta'] = fact_text[:1000] + '...' if len(fact_text) > 1000 else fact_text
                logging.info(f"Extracted ringkasan_fakta using pattern {idx} for {file_name}: {metadata['ringkasan_fakta'][:200]}")
                break
    else:
        logging.warning(f"No ringkasan_fakta found in {file_name}.")

    metadata['jumlah_kata_putusan'] = len(text.split())
    logging.info(f"Extracted jumlah_kata_putusan: {metadata['jumlah_kata_putusan']} for {file_name}")

    return metadata

# Part 4: CSV Saving Function
def save_to_csv(metadata_list):
    fieldnames = [
        'case_id', 'nomor_perkara', 'tahun_putusan', 'bulan_putusan', 'tanggal_putusan',
        'jenis_perkara', 'tingkat_pemeriksaan', 'lembaga_peradilan',
        'pasal', 'hakim_ketua', 'ringkasan_fakta', 'jumlah_kata_putusan', 'full_text'
    ]
    try:
        os.makedirs(os.path.dirname(PATH_CSV), exist_ok=True)
        with open(PATH_CSV, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            for metadata in metadata_list:
                writer.writerow(metadata)
                logging.info(f"Wrote metadata for case_id: {metadata['case_id']}")
        logging.info(f"Saved {len(metadata_list)} metadata entries to {PATH_CSV}")
    except Exception as e:
        logging.error(f"Failed to save CSV to {PATH_CSV}: {e}")
        raise

# Part 5: Main Processing Function
def process_text_files():
    metadata_list = []
    if not os.path.exists(PATH_OUTPUT):
        logging.error(f"Directory {PATH_OUTPUT} does not exist. Please ensure 01_scraper.py has run successfully.")
        print(f"Error: Directory {PATH_OUTPUT} does not exist.")
        return

    text_files = [f for f in os.listdir(PATH_OUTPUT) if f.endswith('.txt')]
    if not text_files:
        logging.warning(f"No .txt files found in {PATH_OUTPUT}. Please check if 01_scraper.py generated text files.")
        print(f"No .txt files found in {PATH_OUTPUT}.")
        return
    logging.info(f"Found {len(text_files)} text files to process in {PATH_OUTPUT}")

    for text_file in text_files:
        file_path = os.path.join(PATH_OUTPUT, text_file)
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read().strip()
            if not text:
                logging.warning(f"Text file {text_file} is empty")
                continue
            metadata = extract_metadata(text, text_file)
            metadata_list.append(metadata)
            logging.info(f"Processed text file: {text_file}")
        except UnicodeDecodeError:
            logging.warning(f"Encoding error in {text_file}. Trying 'latin-1' encoding.")
            try:
                with open(file_path, 'r', encoding='latin-1') as f:
                    text = f.read().strip()
                if not text:
                    logging.warning(f"Text file {text_file} is empty after retry")
                    continue
                metadata = extract_metadata(text, text_file)
                metadata_list.append(metadata)
                logging.info(f"Processed text file with latin-1 encoding: {text_file}")
            except Exception as e:
                logging.error(f"Failed to process {text_file} with latin-1 encoding: {e}")
                continue
        except Exception as e:
            logging.error(f"Error processing {text_file}: {e}")
            continue

    if metadata_list:
        save_to_csv(metadata_list)
        print(f"Processed {len(metadata_list)} files and saved to {PATH_CSV}")
    else:
        logging.warning("No files processed or no metadata extracted")
        print("No files processed or no metadata extracted")

# Part 6: Main Execution
if __name__ == "__main__":
    process_text_files()

2025-06-25 10:35:34,075 - INFO - Directory ensured: d:\AL FITRA\STUDI UMM\SEMESTER 6\TEORI\PENALARAN KOMPUTER\SOURCE CODE\PENALARAN KOMPUTER\CBR_Penalararan_Komputer\CBR\logs
2025-06-25 10:35:34,076 - INFO - Directory ensured: d:\AL FITRA\STUDI UMM\SEMESTER 6\TEORI\PENALARAN KOMPUTER\SOURCE CODE\PENALARAN KOMPUTER\CBR_Penalararan_Komputer\CBR\data\processed
2025-06-25 10:35:34,078 - INFO - Directory ensured: d:\AL FITRA\STUDI UMM\SEMESTER 6\TEORI\PENALARAN KOMPUTER\SOURCE CODE\PENALARAN KOMPUTER\CBR_Penalararan_Komputer\CBR\data\raw
2025-06-25 10:35:34,080 - INFO - Starting metadata extraction process at 10:35:34  on Wednesday, June 25, 2025
2025-06-25 10:35:34,085 - INFO - Found 50 text files to process in d:\AL FITRA\STUDI UMM\SEMESTER 6\TEORI\PENALARAN KOMPUTER\SOURCE CODE\PENALARAN KOMPUTER\CBR_Penalararan_Komputer\CBR\data\raw
2025-06-25 10:35:34,089 - INFO - Extracted nomor_perkara from 'penetapan/putusan nomor pid sus' pattern: 1159pk pid sus 2024 for case_001.txt
2025-06-25 10: