In [1]:
import requests
import PyPDF2
import json
import re
import nltk
import os
import numpy as np
import torch
import gradio as gr
from io import BytesIO
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers import T5ForConditionalGeneration, TrainingArguments, Trainer
from sentence_transformers import SentenceTransformer
from torch.nn.functional import cosine_similarity

In [2]:
# Download NLTK tokenizer if not available
nltk.download('punkt_tab')
from nltk.tokenize import sent_tokenize

[nltk_data] Downloading package punkt_tab to
[nltk_data]     /home/jbodrenko/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [3]:
# Model for Labeling the dataset with questions and answers
model_name = "tiiuae/Falcon3-1B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [4]:
# Define file path
dataset_file = "directive_dataset.json"
directive_pdf_url = "https://eur-lex.europa.eu/legal-content/EN/TXT/PDF/?uri=CELEX:32018L1972"

# Fetch and parse the directive text
def fetch_directive_pdf(url):
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception("Failed to fetch the directive PDF")
    
    pdf_file = BytesIO(response.content)
    reader = PyPDF2.PdfReader(pdf_file)
    text = "\n".join(page.extract_text() for page in reader.pages if page.extract_text())

    # Fix hyphenation and normalize spaces
    text = re.sub(r"(\w+)-\s+(\w+)", r"\1\2", text)  # Remove hyphenation
    text = re.sub(r"\s+", " ", text).strip()  # Normalize spaces
    text = re.sub(r'(\d{2}\.\d{2}\.\d{4})', r'[\1]', text)  # Wrap dates in square brackets
    text = re.sub(r'\bL\s+\d{3}/\d{2}\s+EN\b', r'[L 321/98 EN]', text)  # Wrap references

    return text

# Prepare dataset for Hugging Face tokenizers
def prepare_huggingface_dataset(text, tokenizer, max_length=500, min_length=10):
    sentences = sent_tokenize(text)
    sentence_list = []
    sent_lengths = []

    for sentence in sentences:
        tokenized_sentence = tokenizer(sentence, truncation=False, padding=False)
        sentence_length = len(tokenized_sentence['input_ids'])  # Token length

        if min_length < sentence_length <= max_length:
            sentence_list.append({"text": sentence})
            sent_lengths.append(sentence_length)
            
    sentence_lengths = {
        "avg_len": np.mean(sent_lengths),
        "max_len": max(sent_lengths),
        "min_len": min(sent_lengths), 
        "median_len": np.median(sent_lengths),
        "std_len": np.std(sent_lengths)
    }
    
    print(f"Median sentence length: {sentence_lengths['median_len']}\nAvg sentence length: {sentence_lengths['avg_len']}\nSentence length std: {sentence_lengths['std_len']}\nMax sentence length: {sentence_lengths['max_len']}\nMin sentence length: {sentence_lengths['min_len']}")

    return Dataset.from_list(sentence_list), sentence_lengths

# Load or create dataset
def load_or_create_dataset(tokenizer):
    if os.path.exists(dataset_file):
        print("Loading dataset from file...")
        with open(dataset_file, "r", encoding="utf-8") as f:
            data = json.load(f)
            dataset = Dataset.from_list(data[0])
            sentence_lengths = data[1]
            print(f"Median sentence length: {sentence_lengths['median_len']}\nAvg sentence length: {sentence_lengths['avg_len']}\nSentence length std: {sentence_lengths['std_len']}\nMax sentence length: {sentence_lengths['max_len']}\nMin sentence length: {sentence_lengths['min_len']}")
            
    else:
        print("Fetching and processing directive...")
        directive_text = fetch_directive_pdf(directive_pdf_url)
        dataset, sent_length = prepare_huggingface_dataset(directive_text, tokenizer)
        with open(dataset_file, "w", encoding="utf-8") as f:
            json.dump([dataset.to_list(),sent_length], f, indent=4, ensure_ascii=False)
    
    return dataset

In [5]:
# Load or create dataset and show basic statistics of snippet length (in tokens)
dataset = load_or_create_dataset(tokenizer)

Loading dataset from file...
Median sentence length: 56.0
Avg sentence length: 66.8248807975726
Sentence length std: 47.018184659819724
Max sentence length: 487
Min sentence length: 11


In [6]:
# Checking dataset structure
dataset

Dataset({
    features: ['text'],
    num_rows: 2307
})

In [7]:
def generate_qa(example, tokenizer, model):
    """
    Generates a structured question-answer pair from input text, ensuring proper extraction.
    """
    text = example['text']

    # Few-shot prompt for structured output
    prompt = (
        "Generate a meaningful question-answer pair from the following directive text.\n"
        f"Text: {text}\n"
        "Question:"
    )

    # Ensure padding token is correctly set
    tokenizer.pad_token = tokenizer.pad_token or tokenizer.eos_token  

    # Tokenize input with proper padding and truncation
    inputs = tokenizer(
        prompt, 
        return_tensors="pt", 
        truncation=True, 
        padding="max_length", 
        max_length=256
    )

    # Generate response
    with torch.no_grad():
        outputs = model.generate(
            inputs['input_ids'],
            attention_mask=inputs['attention_mask'], # which tokens to ignore in input
            max_length=512,  # truncation length
            num_return_sequences=1, # produce single answer per input
            pad_token_id=tokenizer.eos_token_id # what padding token was used
        )

    # Decode and clean output text
    output_text = tokenizer.decode(outputs[0], skip_special_tokens=True).strip()

    # Improved regex to extract the first valid Q&A pair
    match = re.search(r'(?:Question|Q):\s*(.*?)\s*(?:Answer|A):\s*(.*)', output_text, re.DOTALL)

    # If result contains q-a pair in requested format
    if match:
        question = match.group(1).strip()
        answer = match.group(2).strip()

        # Clean up potential artifacts
        question = re.sub(r'^(question_\d+:|Solution:|\s*<\|assistant\|>\s*)', '', question, flags=re.IGNORECASE).strip()
        answer = re.sub(r'^(answer_\d+:)', '', answer, flags=re.IGNORECASE).strip()

        return {'question': question, 'answers': {'text': [answer]}}

    # Otherwise assume annotation result is invalid
    return {'question': None, 'answers': {'text': [None]}}


In [8]:
# Load or create subset of snippets for demo/testing purposes
subset_path = 'directive_subset.json'
model = AutoModelForCausalLM.from_pretrained(model_name) #model name is defined with the tokenizer before
def load_or_create_subset(dataset, subset_size, qa_generator, subset_path=subset_path, seed=None):
    '''Wrapper to reduce repetitive annotation work.'''
    
    subset_path = subset_path.replace('.json', f'_{subset_size}.json')
    
    if os.path.exists(subset_path):
        print(f"Loading subset from {subset_path}")
        with open(subset_path, "r", encoding="utf-8") as f:
            subset = Dataset.from_list(json.load(f))            
    else:
        print(f"Generating subset of {subset_size} snippets...")
        
        # Randomly selecting subset of text snippets from the dataset (uniform prob.)
        subset = dataset.shuffle(seed=seed).select(range(subset_size))
        
        # Generate a question-answer pair for each text snippet
        subset = subset.map(lambda example: generate_qa(example, tokenizer, model))
        
        # Save to file
        with open(subset_path, "w", encoding="utf-8") as f:
            json.dump(subset.to_list(), f, indent=4, ensure_ascii=False)
    
    return subset

In [9]:
subset_size = 50
qa_subset = load_or_create_subset(dataset=dataset, subset_size=subset_size, qa_generator=generate_qa, seed=None)

Loading subset from directive_subset_50.json


In [10]:
# Dataset state after annotation
qa_subset

Dataset({
    features: ['text', 'question', 'answers'],
    num_rows: 50
})

In [11]:
# Keeping only snippets where annotation results are valid
valid_indices = [i for i,entry in enumerate(qa_subset) if entry['question'] is not None]
qa_subset = qa_subset.select(valid_indices)

In [12]:
# Dataset state after removing snippets with invalid annotations
qa_subset

Dataset({
    features: ['text', 'question', 'answers'],
    num_rows: 25
})

In [13]:
# Dummy Train-test split without shuffling (need to be improved)
valid_len = len(valid_indices)
train_indices = round(0.8*valid_len)
test_indices = train_indices
train_set = qa_subset.select(range(train_indices))
test_set = qa_subset.select(range(train_indices, valid_len))

In [14]:
# Train set summary
train_set

Dataset({
    features: ['text', 'question', 'answers'],
    num_rows: 20
})

In [15]:
# Test set summary
test_set

Dataset({
    features: ['text', 'question', 'answers'],
    num_rows: 5
})

In [16]:
# Viewing the dataset contents to identify potential issues (
print(test_set[0])
print(train_set[0])

{'text': 'This would be the case for exam ple if network operators were to restr ict unreasonably end-user choice for access to internet portals and services.', 'question': 'What would be the case for example if network operators restricted unreasonably end-user choice for access to internet portals and services?', 'answers': {'text': ['network operators']}}
{'text': 'Those barriers should be reduced by the applicability of the same rules ensur ing a high common level of prot ection across the Union.', 'question': 'what is the main idea of the directive?', 'answers': {'text': ['those barriers should be reduced by the applicability of the same rules ensur ing a high common level of prot ection across the Union.']}}


In [17]:
# Tokenizer to convert text to number and add some attributes required by the qa model to be fine-tuned
model_checkpoint = "google/flan-t5-base"  # or "t5-small", "t5-large"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

def preprocess_function_no_context(examples):
    """
    Tokenizes question-answer pairs for training a generative model.
    The model is trained to generate answers from the given questions.
    """
    # Combine directive text and question
    inputs = [f"Context: {t} Question: {q}" for t, q in zip(examples["text"], examples["question"])]

    # Tokenize the input (context + question)
    model_inputs = tokenizer(
        inputs,  
        max_length=512,
        truncation=True,
        padding="max_length"
    )

    # Extract answers - empty strings need to be dealt with
    answers_text = [ans["text"][0] if ans["text"] else "" for ans in examples["answers"]]
    
    # Tokenize answers as labels
    labels = tokenizer(
        answers_text, 
        max_length=256, 
        truncation=True,
        padding="max_length"
    )

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

In [18]:
# Tokenize the annotated datasets
tokenized_train = train_set.map(preprocess_function_no_context, batched=True)
tokenized_test = test_set.map(preprocess_function_no_context, batched=True)

Map:   0%|          | 0/20 [00:00<?, ? examples/s]

Map:   0%|          | 0/5 [00:00<?, ? examples/s]

In [19]:
# The model to fine-tune
model = T5ForConditionalGeneration.from_pretrained(model_checkpoint)

In [20]:
# Defining training parameters
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy="epoch",
    learning_rate=3e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
)



In [21]:
# Setting-up the training wrapper
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_test,  # If you split it earlier
)

In [22]:
# Fine-tuning
save_path = f"{subset_size}_snippet_{model_checkpoint.replace('/','_')}"
if not os.path.isfile(save_path):
    # Training
    trainer.train()

    # Saving the model to a file
    trainer.model.save_pretrained(f'{save_path}.model')
    
    # Retrieving the trained model from the trainer
    model = trainer.model
    tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
else:
    # Load model from saved results & tokenizer for its base model
    model = T5ForConditionalGeneration.from_pretrained(f'{save_path}.model')
    tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

Passing a tuple of `past_key_values` is deprecated and will be removed in Transformers v4.48.0. You should pass an instance of `EncoderDecoderCache` instead, e.g. `past_key_values=EncoderDecoderCache.from_legacy_cache(past_key_values)`.


Epoch,Training Loss,Validation Loss
1,No log,43.994389
2,No log,41.347374
3,No log,40.388622


In [28]:
# Load a sentence embedding model
embedder = SentenceTransformer("all-MiniLM-L6-v2")  # Efficient & fast

In [24]:
# Convert text snippets into embeddings to find most relevant context based on the question
document_embeddings = embedder.encode(qa_subset['text'], convert_to_tensor=True)

In [25]:
# Use cosine similarity on document and question embeddings to find the most relevant context snippet 
def retrieve_context(question, documents, document_embeddings, embedder):
    question_embedding = embedder.encode(question, convert_to_tensor=True)
    
    # Compute similarity scores
    similarities = cosine_similarity(question_embedding, document_embeddings)
    
    # Retrieve the most similar passage
    best_idx = torch.argmax(similarities).item()
    return documents[best_idx]

question = "What should the national regulator y author ities do to maintain access and competition in the market?"
retrieved_context = retrieve_context(question, qa_subset['text'], document_embeddings, embedder)
print("Retrieved Context:", retrieved_context)

Retrieved Context: How ever , the national regulator y author ities should still be able to imp ose obliga tions and conditions on under takings that control access to end-users in order to maintain access and comp etition in that market.


In [26]:
# Combine snippet and question and use the model to produce the answer
def generate_answer(question, model, tokenizer, context):
    input_text = f"Question: {question} Context: {context}"
    
    inputs = tokenizer(input_text, return_tensors="pt", truncation=True, padding=True)
    output = model.generate(**inputs, max_length=200)
    
    return tokenizer.decode(output[0], skip_special_tokens=True)

# Example
answer = generate_answer(question, model, tokenizer, retrieved_context)
print("Generated Answer:", answer)

Generated Answer: imp ose obliga tions and conditions on under takings


In [29]:
def generate_answer_from_model(question, model, tokenizer, context):
    # Incorporate
    input_text = f"Question: {question} Context: {context}"
    
    inputs = tokenizer(input_text, return_tensors="pt", truncation=True, padding=True)
    output = model.generate(**inputs, max_length=200)
    
    return tokenizer.decode(output[0], skip_special_tokens=True)

def generate_answer(question):
    # Get relevant context
    retrieved_context = retrieve_context(question, qa_subset['text'], document_embeddings, embedder)

    # Produce the answer with fine-tuned model using the question and context as input
    return generate_answer_from_model(question, model, tokenizer, retrieved_context)

# Running gradio demo interface
textbox = gr.Textbox(label="Type your question here:", placeholder="What is the directive about?", lines=10)

gr.Interface(fn=generate_answer, inputs=textbox, outputs="text").launch()


* Running on local URL:  http://127.0.0.1:7861

To create a public link, set `share=True` in `launch()`.


