In [1]:
import os
import json
import re
import nltk
import numpy as np
import pandas as pd
from tqdm import tqdm
from nltk.tokenize import word_tokenize
from typing import List
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.model_selection import GridSearchCV
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer, CrossEncoder
from rank_bm25 import BM25Okapi

# Setup NLTK and directories
def setup_environment():
    nltk.download('punkt', quiet=True)
    os.makedirs('data/results', exist_ok=True)

# Preprocess text
def preprocess_text(text: str) -> str:
    try:
        text = str(text).lower()
        text = re.sub(r'\b(?:putusan|nomor|tahun|pengadilan|hakim)\b', '', text)
        text = re.sub(r'uu\s+no', 'undang-undang nomor', text)
        text = re.sub(r'pasal\s+\d+', 'pasal', text)
        text = re.sub(r'[^\w\s]', ' ', text)
        text = re.sub(r'\s+', ' ', text).strip()
        return ' '.join(word_tokenize(text)) if text else 'empty'
    except Exception as e:
        print(f"Error preprocessing text: {e}")
        return 'empty'

# Load data
def load_data():
    try:
        df = pd.read_csv('data/processed/cases.csv')
        texts = df['ringkasan_fakta'].fillna('').apply(preprocess_text).tolist()
        case_ids = df['case_id'].tolist()

        with open('data/eval/queries.json', 'r', encoding='utf-8') as f:
            queries = json.load(f)
            case_solutions = {item['case_id']: item.get('solution', '') for item in queries}
        
        return df, texts, case_ids, queries, case_solutions
    except Exception as e:
        print(f"Error loading data: {e}")
        raise

# Setup TF-IDF
def setup_tfidf(texts: List[str]) -> tuple:
    stop_words = [
        'dan', 'di', 'dari', 'ke', 'pada', 'dengan', 'untuk', 'yang', 'ini', 'itu',
        'adalah', 'tersebut', 'sebagai', 'oleh', 'atau', 'tetapi', 'karena', 'jika',
        'dalam', 'bagi', 'tentang', 'melalui', 'serta', 'maka', 'lagi', 'sudah',
        'belum', 'hanya', 'saja', 'bahwa', 'apa', 'siapa', 'bagaimana', 'kapan',
        'dimana', 'kenapa', 'sejak', 'hingga', 'agar', 'supaya', 'meskipun', 'walau',
        'kecuali', 'terhadap', 'antara', 'selain', 'setiap', 'sebelum', 'sesudah'
    ]
    vectorizer = TfidfVectorizer(max_features=4000, ngram_range=(1, 3), stop_words=stop_words)
    tfidf_matrix = vectorizer.fit_transform(texts)
    return vectorizer, tfidf_matrix

# Extract features for Logistic Regression and SVM
def extract_features(query_vec, doc_vec, query_text: str, doc_text: str) -> np.ndarray:
    query_vec = query_vec.toarray()[0]
    doc_vec = doc_vec.toarray()[0]
    cos_sim = cosine_similarity([query_vec], [doc_vec])[0][0]
    query_words = set(query_text.split())
    doc_words = set(doc_text.split())
    overlap = len(query_words.intersection(doc_words)) / max(len(query_words), 1)
    coverage = overlap
    return np.concatenate([query_vec, doc_vec, [cos_sim, overlap, coverage]])

# Prepare training data
def prepare_training_data(queries, case_ids, texts, vectorizer, tfidf_matrix):
    X_train = []
    y_train = []
    for item in queries:
        query = preprocess_text(item['query'])
        query_vec = vectorizer.transform([query])
        true_id = item['case_id']
        try:
            true_idx = case_ids.index(true_id)
        except ValueError:
            print(f"Case ID {true_id} not found in case_ids")
            continue
        true_vec = tfidf_matrix[true_idx]
        pos_features = extract_features(query_vec, true_vec, query, texts[true_idx])
        neg_indices = [i for i in range(len(case_ids)) if i != true_idx]
        neg_samples = np.random.choice(neg_indices, size=min(10, len(neg_indices)), replace=False)
        for neg_idx in neg_samples:
            neg_vec = tfidf_matrix[neg_idx]
            neg_features = extract_features(query_vec, neg_vec, query, texts[neg_idx])
            X_train.append(pos_features - neg_features)
            y_train.append(1)
            X_train.append(neg_features - pos_features)
            y_train.append(0)
    return np.array(X_train), np.array(y_train)

# Train models
def train_models(X_train, y_train):
    param_grid = {'C': [0.01, 0.1, 1, 10, 100, 1000]}
    
    # Logistic Regression
    logreg = GridSearchCV(LogisticRegression(max_iter=5000, class_weight='balanced'), param_grid, cv=3, scoring='accuracy')
    logreg.fit(X_train, y_train)
    print(f'Best Logistic Regression Parameters: {logreg.best_params_}')
    print(f'Best CV Accuracy (LogReg): {logreg.best_score_:.2f}')
    
    # SVM
    svm = GridSearchCV(LinearSVC(max_iter=5000, class_weight='balanced'), param_grid, cv=3, scoring='accuracy')
    svm.fit(X_train, y_train)
    print(f'Best SVM Parameters: {svm.best_params_}')
    print(f'Best CV Accuracy (SVM): {svm.best_score_:.2f}')
    
    return logreg, svm

# Setup Indo-BERT and BM25
def setup_indobert_and_bm25(texts: List[str]):
    try:
        bi_encoder = SentenceTransformer('indobenchmark/indobert-base-p1')
        cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
        doc_embeddings = bi_encoder.encode(texts, convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=True)
        bm25 = BM25Okapi([t.split() for t in texts])
        return bi_encoder, cross_encoder, doc_embeddings, bm25
    except Exception as e:
        print(f"Error setting up Indo-BERT/BM25: {e}")
        raise

# Retrieval functions
def logreg_retrieve(query: str, vectorizer, tfidf_matrix, case_ids, texts, logreg, k: int = 5) -> List[str]:
    query = preprocess_text(query)
    query_vec = vectorizer.transform([query])
    scores = []
    for i in range(tfidf_matrix.shape[0]):
        doc_vec = tfidf_matrix[i]
        features = extract_features(query_vec, doc_vec, query, texts[i])
        score = logreg.decision_function([features])[0]
        scores.append((case_ids[i], score))
    scores.sort(key=lambda x: x[1], reverse=True)
    return [x[0] for x in scores[:k]]

def svm_retrieve(query: str, vectorizer, tfidf_matrix, case_ids, texts, svm, k: int = 5) -> List[str]:
    query = preprocess_text(query)
    query_vec = vectorizer.transform([query])
    scores = []
    for i in range(tfidf_matrix.shape[0]):
        doc_vec = tfidf_matrix[i]
        features = extract_features(query_vec, doc_vec, query, texts[i])
        score = svm.decision_function([features])[0]
        scores.append((case_ids[i], score))
    scores.sort(key=lambda x: x[1], reverse=True)
    return [x[0] for x in scores[:k]]

def indobert_retrieve(query: str, bi_encoder, cross_encoder, doc_embeddings, bm25, case_ids, texts, k: int = 10, alpha: float = 0.6) -> List[str]:
    query = preprocess_text(query)
    query_vec = bi_encoder.encode([query], convert_to_numpy=True, normalize_embeddings=True)[0]
    sim_scores = cosine_similarity([query_vec], doc_embeddings)[0]
    bm25_scores = bm25.get_scores(query.split())
    bm25_scores /= np.max(bm25_scores) + 1e-10
    combined = alpha * sim_scores + (1 - alpha) * bm25_scores
    top_k_idx = np.argsort(combined)[-k:][::-1]
    rerank_pairs = [[query, texts[i]] for i in top_k_idx]
    rerank_scores = cross_encoder.predict(rerank_pairs)
    reranked_idx = np.argsort(rerank_scores)[::-1][:5]
    return [case_ids[top_k_idx[i]] for i in reranked_idx]

# Predict outcome
def predict_outcome(query: str, retrieve_fn, case_solutions) -> tuple:
    top_5_ids = retrieve_fn(query)
    solutions = [case_solutions.get(cid, '') for cid in top_5_ids]
    filtered = [s for s in solutions if s not in ['', None, 'nan']]
    predicted = max(set(filtered), key=filtered.count) if filtered else 'Tidak ditemukan'
    return predicted, top_5_ids

# Main function
def main():
    setup_environment()
    
    # Load data
    df, texts, case_ids, queries, case_solutions = load_data()
    
    # Setup TF-IDF
    vectorizer, tfidf_matrix = setup_tfidf(texts)
    
    # Prepare and train models
    X_train, y_train = prepare_training_data(queries, case_ids, texts, vectorizer, tfidf_matrix)
    logreg, svm = train_models(X_train, y_train)
    
    # Setup Indo-BERT and BM25
    bi_encoder, cross_encoder, doc_embeddings, bm25 = setup_indobert_and_bm25(texts)
    
    # Run predictions
    results_logreg = []
    results_svm = []
    results_indobert = []
    
    for i in tqdm(range(len(df)), desc='Predicting'):
        query_text = df.loc[i, 'ringkasan_fakta']
        case_id = df.loc[i, 'case_id']
        
        # Logistic Regression
        pred_logreg, top_ids_logreg = predict_outcome(query_text, 
                                                     lambda q: logreg_retrieve(q, vectorizer, tfidf_matrix, case_ids, texts, logreg), 
                                                     case_solutions)
        results_logreg.append({
            'query_id': case_id,
            'predicted_solution': pred_logreg,
            'top_5_case_ids': ', '.join(top_ids_logreg)
        })
        
        # SVM
        pred_svm, top_ids_svm = predict_outcome(query_text, 
                                               lambda q: svm_retrieve(q, vectorizer, tfidf_matrix, case_ids, texts, svm), 
                                               case_solutions)
        results_svm.append({
            'query_id': case_id,
            'predicted_solution': pred_svm,
            'top_5_case_ids': ', '.join(top_ids_svm)
        })
        
        # Indo-BERT
        pred_indobert, top_ids_indobert = predict_outcome(query_text, 
                                                         lambda q: indobert_retrieve(q, bi_encoder, cross_encoder, doc_embeddings, bm25, case_ids, texts), 
                                                         case_solutions)
        results_indobert.append({
            'query_id': case_id,
            'predicted_solution': pred_indobert,
            'top_5_case_ids': ', '.join(top_ids_indobert)
        })
    
    # Save results
    pd.DataFrame(results_logreg).to_csv('data/results/logreg_predictions.csv', index=False, encoding='utf-8')
    pd.DataFrame(results_svm).to_csv('data/results/svm_predictions.csv', index=False, encoding='utf-8')
    pd.DataFrame(results_indobert).to_csv('data/results/indobert_predictions.csv', index=False, encoding='utf-8')
    
    print('✅ Saved to data/results/logreg_predictions.csv')
    print('✅ Saved to data/results/svm_predictions.csv')
    print('✅ Saved to data/results/indobert_predictions.csv')

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        print(f"An error occurred: {e}")


Best Logistic Regression Parameters: {'C': 1000}
Best CV Accuracy (LogReg): 0.64




Best SVM Parameters: {'C': 100}
Best CV Accuracy (SVM): 0.63


No sentence-transformers model found with name indobenchmark/indobert-base-p1. Creating a new one with mean pooling.


Batches:   0%|          | 0/2 [00:00<?, ?it/s]

Predicting: 100%|██████████| 60/60 [00:39<00:00,  1.52it/s]

✅ Saved to data/results/logreg_predictions.csv
✅ Saved to data/results/svm_predictions.csv
✅ Saved to data/results/indobert_predictions.csv



