In [None]:
!pip install datasets
!pip install evaluate

In [None]:
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict, load_dataset, interleave_datasets, load_from_disk
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, GenerationConfig, TrainingArguments, Trainer
import torch
import time
import evaluate
import pandas as pd
import numpy as np

import warnings
warnings.filterwarnings("ignore")

In [None]:
torch.cuda.is_available()

In [None]:
model_name='t5-small'

tokenizer = AutoTokenizer.from_pretrained(model_name)

original_model = AutoModelForSeq2SeqLM.from_pretrained(model_name, torch_dtype=torch.bfloat16)
original_model = original_model.to('cuda')

**LOAD DATASET**

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
import pandas as pd

df = pd.read_csv("qa.csv")
print(df.head())
print(df.columns)


In [None]:
from datasets import Dataset, DatasetDict
import pandas as pd
from sklearn.model_selection import train_test_split

# 1. Load CSV
df = pd.read_csv("qa.csv")

# 2. Optional: Strip whitespace dari kolom
df["question"] = df["question"].str.strip()
df["context"] = df["context"].str.strip()
df["answer"] = df["answer"].str.strip()

# 3. Split data: 80% train, 10% validation, 10% test
train_df, temp_df = train_test_split(df, test_size=0.2, random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)

# 4. Convert ke HuggingFace Dataset
dataset = DatasetDict({
    "train": Dataset.from_pandas(train_df),
    "validation": Dataset.from_pandas(val_df),
    "test": Dataset.from_pandas(test_df)
})

# 5. Save ke disk (opsional, supaya bisa di-load lagi nanti)
dataset.save_to_disk("qa_dataset")

# 6. Print summary
print(dataset)


In [None]:
from datasets import DatasetDict

# Misalnya dataset kamu bernama `dataset`
dataset = DatasetDict({
    'train': dataset['train'].remove_columns(['__index_level_0__']),
    'validation': dataset['validation'].remove_columns(['__index_level_0__']),
    'test': dataset['test'].remove_columns(['__index_level_0__']),
})


In [None]:
print(dataset)


In [None]:
dataset.save_to_disk("qa_dataset_cleaneddd")

In [None]:
dataset['test'][0]

**Preprocess the Datasets**

In [None]:
def tokenize_function(example):

#     print(len(example["question"]))
    start_prompt = "Tables:\n"
    middle_prompt = "\n\nQuestion:\n"
    end_prompt = "\n\nAnswer:\n"

    data_zip = zip(example['context'], example['question'])
    prompt = [start_prompt + context + middle_prompt + question + end_prompt for context, question in data_zip]
    example['input_ids'] = tokenizer(prompt, padding="max_length", truncation=True, return_tensors="pt").input_ids
    example['labels'] = tokenizer(example['answer'], padding="max_length", truncation=True, return_tensors="pt").input_ids
#     print(prompt[0])
#     print()

    return example

# The dataset actually contains 3 diff splits: train, validation, test.
# The tokenize_function code is handling all data across all splits in batches.

try:
    tokenized_datasets = load_from_disk("tokenized_datasets")
    print("Loaded Tokenized Dataset")
except:
    tokenized_datasets = dataset.map(tokenize_function, batched=True)
    tokenized_datasets = tokenized_datasets.remove_columns(['question', 'context', 'answer'])

    tokenized_datasets.save_to_disk("tokenized_datasets")
    print("Tokenized and Saved Dataset")

In [None]:
print(tokenized_datasets.keys())
print(tokenized_datasets['train'][0].keys())
print(tokenized_datasets['train'][0]['input_ids'][:10])
print(tokenized_datasets['train'][0]['labels'][:10])
print(tokenized_datasets)

In [None]:
print(f"Shapes of the datasets:")
print(f"Training: {tokenized_datasets['train'].shape}")
print(f"Validation: {tokenized_datasets['validation'].shape}")
print(f"Test: {tokenized_datasets['test'].shape}")

print(tokenized_datasets)

**Test the Model with Zero Shot Inferencing**

In [None]:
index = 0

question = dataset['test'][index]['question']
context = dataset['test'][index]['context']
answer = dataset['test'][index]['answer']

prompt = f"""Tables:
{context}

Question:
{question}

Answer:
"""

inputs = tokenizer(prompt, return_tensors='pt')
inputs = inputs.to('cuda')

output = tokenizer.decode(
    original_model.generate(
        inputs["input_ids"],
        max_new_tokens=200,
    )[0],
    skip_special_tokens=True
)

dash_line = '-'.join('' for x in range(100))
print(dash_line)
print(f'INPUT PROMPT:\n{prompt}')
print(dash_line)
print(f'BASELINE HUMAN ANSWER:\n{answer}\n')
print(dash_line)
print(f'MODEL GENERATION - ZERO SHOT:\n{output}')

**Perform Full Fine-Tuning**

In [None]:
try:
    finetuned_model = AutoModelForSeq2SeqLM.from_pretrained("finetuned_model_2_epoch")
    finetuned_model = finetuned_model.to('cuda')
    to_train = False

except:
    to_train = True
    finetuned_model = AutoModelForSeq2SeqLM.from_pretrained(model_name, torch_dtype=torch.bfloat16)
    finetuned_model = finetuned_model.to('cuda')
    tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
%%time

if to_train:
    output_dir = f'./sql-training-{str(int(time.time()))}'

    training_args = TrainingArguments(
        output_dir=output_dir,
        learning_rate=5e-3,
        num_train_epochs=2,
        per_device_train_batch_size=16,     # batch size per device during training
        per_device_eval_batch_size=16,      # batch size for evaluation
        weight_decay=0.01,
        logging_steps=50,
        eval_strategy='steps',        # evaluation strategy to adopt during training
        eval_steps=500,                     # number of steps between evaluation
    )

    trainer = Trainer(
        model=finetuned_model,
        args=training_args,
        train_dataset=tokenized_datasets['train'],
        eval_dataset=tokenized_datasets['validation'],
    )

    trainer.train()

    finetuned_model.save_pretrained("finetuned_model_2_epoch")

In [None]:
finetuned_model = AutoModelForSeq2SeqLM.from_pretrained("finetuned_model_2_epoch")
finetuned_model = finetuned_model.to('cuda')

**Test the Fine Tuned Model with Zero Shot Inferencing**

In [None]:
index = 0
# index = len(dataset['test'])-200

question = dataset['test'][index]['question']
context = dataset['test'][index]['context']
answer = dataset['test'][index]['answer']

prompt = f"""Tables:
{context}

Question:
{question}

Answer:
"""

inputs = tokenizer(prompt, return_tensors='pt')
inputs = inputs.to('cuda')

output = tokenizer.decode(
    finetuned_model.generate(
        inputs["input_ids"],
        max_new_tokens=200,
    )[0],
    skip_special_tokens=True
)

dash_line = '-'.join('' for x in range(100))
print(dash_line)
print(f'INPUT PROMPT:\n{prompt}')
print(dash_line)
print(f'BASELINE HUMAN ANSWER:\n{answer}\n')
print(dash_line)
print(f'FINE-TUNED MODEL - ZERO SHOT:\n{output}')


**Evaluate the Model Quantitatively (with ROUGE Metric)**

In [None]:
# Perform inferences for test dataset. Do 25 only, due to time it takes.

questions = dataset['test'][0:25]['question']
contexts = dataset['test'][0:25]['context']
human_baseline_answers = dataset['test'][0:25]['answer']

original_model_answers = []
finetuned_model_answers = []

for idx, question in enumerate(questions):

    prompt = f"""Tables:
{contexts[idx]}

Question:
{question}

Answer:
"""

    input_ids = tokenizer(prompt, return_tensors="pt").input_ids
    input_ids = input_ids.to('cuda')

    human_baseline_text_output = human_baseline_answers[idx]

    original_model_outputs = original_model.generate(input_ids=input_ids, generation_config=GenerationConfig(max_new_tokens=300))
    original_model_text_output = tokenizer.decode(original_model_outputs[0], skip_special_tokens=True)
    original_model_answers.append(original_model_text_output)

    finetuned_model_outputs = finetuned_model.generate(input_ids=input_ids, generation_config=GenerationConfig(max_new_tokens=300))
    finetuned_model_text_output = tokenizer.decode(finetuned_model_outputs[0], skip_special_tokens=True)
    finetuned_model_answers.append(finetuned_model_text_output)

zipped_summaries = list(zip(human_baseline_answers, original_model_answers, finetuned_model_answers))

df = pd.DataFrame(zipped_summaries, columns = ['human_baseline_answers', 'original_model_answers', 'finetuned_model_answers'])
# df

In [None]:
!pip install rouge_score


In [None]:
rouge = evaluate.load('rouge')

original_model_results = rouge.compute(
    predictions=original_model_answers,
    references=human_baseline_answers[0:len(original_model_answers)],
    use_aggregator=True,
    use_stemmer=True,
)
print('ORIGINAL MODEL:')
print(original_model_results)


finetuned_model_results = rouge.compute(
    predictions=finetuned_model_answers,
    references=human_baseline_answers[0:len(finetuned_model_answers)],
    use_aggregator=True,
    use_stemmer=True,
)
print('FINE-TUNED MODEL:')
print(finetuned_model_results)

In [None]:
accuracy_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")
recall_metric = evaluate.load("recall")

In [None]:
from sklearn.metrics import accuracy_score, f1_score, recall_score
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import string, re

# ====================
# FUNGSI NORMALISASI TEKS
# ====================
def normalize_text(s):
    def remove_articles(text):
        return re.sub(r'\b(a|an|the)\b', ' ', text)
    def white_space_fix(text):
        return ' '.join(text.split())
    def remove_punc(text):
        return ''.join(ch for ch in text if ch not in set(string.punctuation))
    def lower(text):
        return text.lower()
    return white_space_fix(remove_articles(remove_punc(lower(s))))

# ====================
# METRIK MANUAL
# ====================
def compute_exact_match(pred, truth):
    return int(normalize_text(pred) == normalize_text(truth))

def compute_f1(pred, truth):
    pred_tokens = normalize_text(pred).split()
    truth_tokens = normalize_text(truth).split()
    common = set(pred_tokens) & set(truth_tokens)
    if not common:
        return 0.0
    precision = len(common) / len(pred_tokens)
    recall = len(common) / len(truth_tokens)
    return 2 * (precision * recall) / (precision + recall)

def compute_bleu(pred, truth):
    pred_tokens = pred.lower().split()
    truth_tokens = truth.lower().split()
    smoothie = SmoothingFunction().method4
    return sentence_bleu([truth_tokens], pred_tokens, smoothing_function=smoothie)

def compute_execution_match(pred, truth):
    try:
        return int(eval(pred) == eval(truth))
    except:
        return 0

# ====================
# CONTOH JAWABAN
# ====================
finetuned_model_answers = ["3 + 5", "the capital is paris", "no", "1 / 0", "yes"]
human_baseline_answers = ["8", "paris is the capital", "no", "error", "yes"]

# ====================
# EVALUASI
# ====================
em_scores = []
f1_scores = []
bleu_scores = []
execution_scores = []
true_labels = [1] * len(human_baseline_answers)
predicted_labels = []

for pred, truth in zip(finetuned_model_answers, human_baseline_answers):
    em = compute_exact_match(pred, truth)
    f1 = compute_f1(pred, truth)
    bleu = compute_bleu(pred, truth)
    exec_match = compute_execution_match(pred, truth)

    em_scores.append(em)
    f1_scores.append(f1)
    bleu_scores.append(bleu)
    execution_scores.append(exec_match)
    predicted_labels.append(em)  # anggap EM sebagai label biner 1/0

# ====================
# HASIL AKHIR
# ====================
accuracy = accuracy_score(true_labels, predicted_labels)
f1_binary = f1_score(true_labels, predicted_labels)
recall = recall_score(true_labels, predicted_labels)

print("==== EVALUASI MODEL ====")
print(f"Accuracy: {accuracy:.2%}")
print(f"F1 Score: {f1_binary:.2%}")
print(f"Recall: {recall:.2%}")
print(f"Exact Match: {sum(em_scores)/len(em_scores):.2%}")
print(f"Average F1 (Token-Based): {sum(f1_scores)/len(f1_scores):.2%}")
print(f"Average BLEU Score: {sum(bleu_scores)/len(bleu_scores):.2%}")
print(f"Execution Match Score: {sum(execution_scores)/len(execution_scores):.2%}")


In [None]:
!pip install pyswip


In [None]:
!apt-get install swi-prolog


In [None]:
from pyswip import Prolog

# Fungsi Exact Match (EM)
def exact_match(predictions, references):
    """
    Menghitung skor Exact Match (EM) antara prediksi dan referensi.
    """
    em_score = sum([1 if pred == ref else 0 for pred, ref in zip(predictions, references)])
    return em_score / len(references)

# Fungsi Execution Match (XM) dengan PySWIP (Prolog)
def execution_match(predictions, references):
    """
    Menghitung Execution Match (XM) antara prediksi dan referensi menggunakan Prolog.
    """
    prolog = Prolog()
    match_score = 0

    for pred, ref in zip(predictions, references):
        prolog.assertz(f"{pred}")  # Menambahkan klausa ke Prolog
        result = list(prolog.query(ref))  # Mengeksekusi klausa
        if result:
            match_score += 1
        prolog.retractall(f"{pred}")  # Menghapus klausa setelah eksekusi

    return match_score / len(references)

# Contoh penggunaan:
predictions = ["penandatangan(indonesia, menteri_luar_negeri)"]  # Pastikan tidak ada titik ganda
references = ["penandatangan(indonesia, menteri_luar_negeri)"]

# Menghitung metrik EM dan XM
em_score = exact_match(predictions, references)
xm_score = execution_match(predictions, references)

print(f"Exact Match (EM) Score: {em_score}")
print(f"Execution Match (XM) Score: {xm_score}")
