<a href="https://colab.research.google.com/github/villurignanesh-sbu/PWA_App/blob/main/ESE577_Fina_Project_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install google-generativeai
!pip install datasets
!pip install -U bitsandbytes
!pip install transformers
!pip install -U peft
!pip install -U "huggingface_hub[cli]"
!pip install -U trl
!pip install pytesseract
!pip install evaluate

Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Downloading pytesseract-0.3.13-py3-none-any.whl (14 kB)
Installing collected packages: pytesseract
Successfully installed pytesseract-0.3.13


In [None]:
import google.generativeai as genai
from datasets import Dataset, DatasetDict
import pandas as pd
from peft import LoraConfig, prepare_model_for_kbit_training, get_peft_model
from transformers import AutoModelForCausalLM, AutoTokenizer, \
    BitsAndBytesConfig, TrainingArguments, pipeline, logging
import torch
from trl import SFTTrainer
from sklearn.model_selection import train_test_split
from PIL import Image
import pytesseract
from evaluate import load

create dataset, training and testing

In [None]:
qa_pairs_data = pd.read_csv('qa_pairs_merged.csv')
print(qa_pairs_data.head)
X = qa_pairs_data['Question']
y = qa_pairs_data['Answer']
X_train, X_test, y_train , y_test = train_test_split(X,y,test_size=0.30,random_state=42, shuffle=True)
train_data = pd.concat([X_train,y_train],axis=1)
test_data = pd.concat([X_test,y_test],axis=1)
print("Training")
print(train_data.head)
print("Testing")
print(test_data.head)

In [None]:
# Convert the pandas DataFrames to Hugging Face datasets
dataset_train = Dataset.from_pandas(train_data)
dataset_test = Dataset.from_pandas(test_data)

# Create a DatasetDict containing both train and test datasets
dataset = DatasetDict({"train": dataset_train, "test": dataset_test})

In [None]:
!huggingface-cli login --token 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

In [None]:
!pip3 install -U bitsandbytes
import bitsandbytes as bnb
print(bnb.__version__)

In [None]:
# Reference: https://huggingface.co/blog/4bit-transformers-bitsandbytes
base_model = "mistralai/Mistral-7B-Instruct-v0.2"
bnb_config = BitsAndBytesConfig(
   load_in_4bit=True,
   bnb_4bit_quant_type="nf4",
   bnb_4bit_use_double_quant=True,
   bnb_4bit_compute_dtype=torch.bfloat16
)

model = AutoModelForCausalLM.from_pretrained(
    base_model,
    quantization_config=bnb_config
)
model.config.use_cache = False
model.config.pretraining_tp = 1
model.gradient_checkpointing_enable()

In [None]:
# Tokenize the data
tokenizer = AutoTokenizer.from_pretrained(base_model, trust_remote_code=True)
tokenizer.padding_side = "right"
tokenizer.pad_token = tokenizer.eos_token
tokenizer.add_eos_token = True
tokenizer.bos_token, tokenizer.eos_token

In [None]:
# LoRA config -- Skeleton
model = prepare_model_for_kbit_training(model)
# Reference: https://huggingface.co/docs/peft/en/quicktour
peft_config = LoraConfig(
    r=8,
    lora_alpha=32,
    lora_dropout=0.1
)
model = get_peft_model(model, peft_config)

In [None]:
# Hyperparameters -- Skeleton
training_arguments = TrainingArguments(
    output_dir="./ese577_model",
    num_train_epochs=3,  # Number of epochs
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    learning_rate=5e-5,
    warmup_steps=10,
    logging_dir="./logs",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    load_best_model_at_end=True,
    report_to="none"
)


In [None]:
# Load pre-defined metrics
metric_f1 = load("f1")
metric_exact_match = load("exact_match")

def compute_metrics(eval_preds):
    """
    Computes Exact Match (EM) and F1 score for the predictions.
    eval_preds: Tuple containing (predictions, labels)
    """
    predictions, references = eval_preds

    # Post-process predictions and references
    processed_predictions = [pred.strip() for pred in predictions]
    processed_references = [ref.strip() for ref in references]

    # Compute metrics
    f1 = metric_f1.compute(predictions=processed_predictions, references=processed_references)
    em = metric_exact_match.compute(predictions=processed_predictions, references=processed_references)

    # Return results as a dictionary
    return {"f1": f1["f1"], "exact_match": em["exact_match"]}


In [None]:
# Trainer
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    peft_config=peft_config,
    max_seq_length=512,
    dataset_text_field="Question",
    tokenizer=tokenizer,
    args=training_arguments,
    packing=False,
    compute_metrics=compute_metrics
)
trainer.train()

In [None]:
# Save the model
trainer.model.save_pretrained("ESE577_chatbot")
model.config.use_cache = True
model.eval()

In [None]:
# Set logging verbosity
logging.set_verbosity(logging.CRITICAL)

# Define models and pipelines
base_pipe = pipeline(task="text-generation", model=base_model, tokenizer=tokenizer, max_length=512, truncation=True)
finetuned_pipe = pipeline(task="text-generation", model=model, tokenizer=tokenizer, max_length=512, truncation=True)

def build_prompt(question):
    """
    Build a formatted prompt for ESE577 questions.
    """
    return f"<s>[INST]@ESE577. {question}. [/INST]"

def extract_text_from_image(image_path):
    """
    Extract text from an image using OCR (Tesseract).
    """
    try:
        image = Image.open(image_path)
        text = pytesseract.image_to_string(image)
        return text.strip()
    except Exception as e:
        print(f"Error processing image: {e}")
        return None

def upload_and_process_image():
    """
    Use Google Colab's upload functionality to handle image input.
    """
    print("Please upload an image file:")
    uploaded = files.upload()  # Upload files

    if not uploaded:
        print("No file uploaded. Please try again.")
        return None, None

    # Get the uploaded file path
    image_path = list(uploaded.keys())[0]
    text = extract_text_from_image(image_path)

    if not text:
        print("Could not extract text from image. Please try again.")
        return None, None

    print("\nExtracted text from image:")
    print(text)
    print()

    question = input("Enter your question about the extracted text: ").strip()
    if not question:
        print("No question provided.")
        return None, None

    return question, text

def process_text_input():
    """
    Handle text-based questions.
    """
    question = input("Enter your ESE577-related question: ").strip()
    if not question:
        print("No question provided.")
        return None, None
    return question, question

def get_model_responses(question, context):
    """
    Generate and display responses from both the base and fine-tuned models.
    """
    prompt = build_prompt(question)

    # Get responses from models
    base_answer = base_pipe(question=question, context=context)
    fine_tuned_answer = finetuned_pipe(question=question, context=context)

    print("\nPretrained Model Response:")
    print(base_answer["answer"])
    print(base_answer[0]["generated_text"])

    print("\nFine-Tuned Model Response:")
    print(fine_tuned_answer[0]["generated_text"])

    print()

# Main loop
while True:
    input_type = input("Enter input type ('text'/'image') or press Enter to exit: ").lower().strip()

    if not input_type:
        print("Exiting...")
        break

    if input_type not in ['text', 'image']:
        print("Invalid input type. Please enter 'text' or 'image'.")
        continue

    # Process input type
    if input_type == 'text':
        question, context = process_text_input()
    else:
        question, context = upload_and_process_image()

    # Skip iteration if no valid input
    if not question or not context:
        continue

    # Get and display responses
    get_model_responses(question, context)
