<a href="https://colab.research.google.com/github/matteraggi/FineTuningAI/blob/main/ProgettoFINAL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**INSTALLS**

In [1]:
!pip install --upgrade transformers torch datasets peft tf-keras accelerate bitsandbytes trl evaluate datasets radon zss torchvision
# IMPORTANTE! RIAVVIARE IL RUNTIME DOPO L'ESECUZIONE

Collecting transformers
  Downloading transformers-4.52.4-py3-none-any.whl.metadata (38 kB)
Collecting torch
  Downloading torch-2.7.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (29 kB)
Collecting datasets
  Downloading datasets-3.6.0-py3-none-any.whl.metadata (19 kB)
Collecting tf-keras
  Downloading tf_keras-2.19.0-py3-none-any.whl.metadata (1.8 kB)
Collecting bitsandbytes
  Downloading bitsandbytes-0.46.0-py3-none-manylinux_2_24_x86_64.whl.metadata (10 kB)
Collecting trl
  Downloading trl-0.18.1-py3-none-any.whl.metadata (11 kB)
Collecting evaluate
  Downloading evaluate-0.4.3-py3-none-any.whl.metadata (9.2 kB)
Collecting radon
  Downloading radon-6.0.1-py2.py3-none-any.whl.metadata (8.2 kB)
Collecting zss
  Downloading zss-1.2.0.tar.gz (9.8 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting torchvision
  Downloading torchvision-0.22.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (6.1 kB)
Collecting sympy>=1.13.3 (from torch)
  Downloading sympy-1.14.0-py3-non

**IMPORTS**

In [1]:
# Importing stock ml libraries
import numpy as np
import pandas as pd
import torch
import gc
from tabulate import tabulate
from datasets import load_dataset
from trl import SFTTrainer
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, BitsAndBytesConfig
from peft import LoraConfig, get_peft_model
from google.colab import files
import json
import random
from peft import PeftModel


**Svuota la cache**

In [2]:
# Garbace collect
gc.collect()
# Svuota la cache
torch.cuda.empty_cache()

**VARIABLES**

In [3]:
# The model that you want to train from the Hugging Face hub

model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"


# The instruction dataset to use
dataset_name = "bigcode/self-oss-instruct-sc2-exec-filter-50k"
# Fine-tuned model name
new_model = "Our_Finetuned_Model"

# Dimension of dataset subset used
dataset_range = 4000

################################################################################
# QLoRA parameters
################################################################################

# LoRA attention dimension. It determines the size and parameter count of the low-rank adaptation
lora_r = 64  # XXXXXXXXXXX
# Alpha parameter for LoRA scaling factor that determines the impact of the low-rank matrices on the original model's output.
# Controls the overall strength of the low-rank adaptation.
lora_alpha = 2*lora_r  # "Often set to 2-4 times lora_r"
# Dropout probability for LoRA layers (considera aumento a 0.1-0.2)
lora_dropout = 0.1

################################################################################
# bitsandbytes parameters
################################################################################

# Activate 4-bit precision base model loading
use_4bit = True
# Compute dtype for 4-bit base models
bnb_4bit_compute_dtype = "float16"
# Quantization type (fp4 or nf4)
bnb_4bit_quant_type = "nf4"
# Activate nested quantization for 4-bit base models (double quantization)
use_nested_quant = False  # "Double quantization can sometimes improve performance but increases complexity.  It's often left disabled initially."

################################################################################
# TrainingArguments parameters
################################################################################

# Output directory where the model predictions and checkpoints will be stored
output_dir = "./results"

# Enable fp16/bf16 training (set bf16 to True with an A100)
fp16 = True
bf16 = False  # CONFERMO CHE NON E' SUPPORTATO SU COLAB

# Number of training epochs
num_train_epochs = 3  # Sembra arrivare a un plateau già alla terza
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4  # Abbassare in caso di training più lunghi

# Batch size per training e per evaluation
per_device_train_batch_size = per_device_eval_batch_size = 2
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = 4 # Moltiplica la batch_size vera per ottenere quella simulata. Dinimuire per velocità, alzare per stabilità.

# Enable gradient checkpointing
gradient_checkpointing = True

# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3  # Stabilizza il training, ma rallentandolo. Range ottimale 0.1-0.5

# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.1  # Aumentare in caso di overfitting a 0.1-0.2

# Optimizer to use
optim = "paged_adamw_32bit"

# Learning rate schedule
lr_scheduler_type = "cosine"

# Number of training steps (overrides num_train_epochs)
max_steps = -1  # '-1' mantains num_train_epochs

# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.05 # Aumentare a 0.1 in caso di instabilità eccessiva iniziale

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True

# Save checkpoint every tot updates steps
save_steps = 100
# Log every tot updates steps
logging_steps = 100
# Evaluation strategy ("no", "epochs", "steps")
eval_strategy = "steps"
# Evaluate the model every tot steps (if the strategy is "steps")
eval_steps = 100

################################################################################
# SFT parameters
################################################################################

# Maximum sequence length to use; tronca l'input a tot tokens
max_seq_length = 1024

# Pack multiple short examples in the same input sequence to increase efficiency
packing = False  # "Can improve efficiency if your dataset has many short sequences."

# Load the entire model on the GPU 0
device_map = {"": 0}

### IGNORAMI - Carica il modello di base per allenarlo

In [None]:
compute_dtype = getattr(torch, bnb_4bit_compute_dtype)
bnb_config = BitsAndBytesConfig(
  load_in_4bit=use_4bit,
  bnb_4bit_quant_type=bnb_4bit_quant_type,
  bnb_4bit_compute_dtype=compute_dtype,
  bnb_4bit_use_double_quant=use_nested_quant,
)
# Il warning di sliding window è un false warning: https://huggingface.co/deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B/discussions/27

# Load base model
base_model = AutoModelForCausalLM.from_pretrained(
  model_name,
  quantization_config=bnb_config,
  device_map=device_map
)

# Load base tokenizer
tokenizer_base = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
print(tokenizer_base.pad_token, tokenizer_base.eos_token)
tokenizer_base.pad_token = tokenizer_base.eos_token
print(tokenizer_base.pad_token, tokenizer_base.eos_token)
tokenizer_base.padding_side = 'right'

config.json:   0%|          | 0.00/679 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.55G [00:00<?, ?B/s]

Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.


generation_config.json:   0%|          | 0.00/181 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/3.07k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

<｜end▁of▁sentence｜> <｜end▁of▁sentence｜>
<｜end▁of▁sentence｜> <｜end▁of▁sentence｜>


In [None]:
# Ultime configurazioni per il modello
base_model.config.use_cache = False
base_model.config.pretraining_tp = 1
if gradient_checkpointing:
  base_model.gradient_checkpointing_enable()  # Attiva gradient checkpointing per ridurre l'uso di memoria

# for name, module in base_model.named_modules():
#    print(name)

# Load LoRA configuration
# Example configuration for target modules (can vary per model)
peft_config = LoraConfig(
  lora_alpha=lora_alpha,
  lora_dropout=lora_dropout,
  r=lora_r,
  bias="none",
  task_type="CAUSAL_LM",
  #target_modules=["q_proj", "v_proj"] # also ["o_proj", "k_proj"] ?
)

print(base_model)

Qwen2ForCausalLM(
  (model): Qwen2Model(
    (embed_tokens): Embedding(151936, 1536)
    (layers): ModuleList(
      (0-27): 28 x Qwen2DecoderLayer(
        (self_attn): Qwen2Attention(
          (q_proj): Linear4bit(in_features=1536, out_features=1536, bias=True)
          (k_proj): Linear4bit(in_features=1536, out_features=256, bias=True)
          (v_proj): Linear4bit(in_features=1536, out_features=256, bias=True)
          (o_proj): Linear4bit(in_features=1536, out_features=1536, bias=False)
        )
        (mlp): Qwen2MLP(
          (gate_proj): Linear4bit(in_features=1536, out_features=8960, bias=False)
          (up_proj): Linear4bit(in_features=1536, out_features=8960, bias=False)
          (down_proj): Linear4bit(in_features=8960, out_features=1536, bias=False)
          (act_fn): SiLU()
        )
        (input_layernorm): Qwen2RMSNorm((1536,), eps=1e-06)
        (post_attention_layernorm): Qwen2RMSNorm((1536,), eps=1e-06)
      )
    )
    (norm): Qwen2RMSNorm((1536,), eps

### Carica il modello finetunato da git per valutarlo

In [4]:
#Da Git:
!apt-get install git
!rm -rf FineTuningAI
!git clone https://github.com/matteraggi/FineTuningAI.git

#Dal Drive (cambiare path manualmente):
#from google.colab import drive
#drive.mount('/content/drive')
#!cp -r /content/drive/MyDrive/checkpoint_800 /content/

# Percorso alla directory del modello
checkpoint_path = "/content/FineTuningAI/models/DS_Finetuned_6"

compute_dtype = getattr(torch, bnb_4bit_compute_dtype)
bnb_config = BitsAndBytesConfig(
  load_in_4bit=use_4bit,
  bnb_4bit_quant_type=bnb_4bit_quant_type,
  bnb_4bit_compute_dtype=compute_dtype,
  bnb_4bit_use_double_quant=use_nested_quant,
)
# Il warning di sliding window è un false warning: https://huggingface.co/deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B/discussions/27

# Load (partially) finetuned model
finetuned_model = AutoModelForCausalLM.from_pretrained(
  checkpoint_path,
  device_map=device_map,
  quantization_config=bnb_config,
)

# Load tokenizer from checkpoint
tokenizer_finetuned = AutoTokenizer.from_pretrained(checkpoint_path)
tokenizer_finetuned.pad_token = tokenizer_finetuned.eos_token
tokenizer_finetuned.padding_side = 'right'

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
git is already the newest version (1:2.34.1-1ubuntu1.12).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.
Cloning into 'FineTuningAI'...
remote: Enumerating objects: 386, done.[K
remote: Counting objects: 100% (4/4), done.[K
remote: Compressing objects: 100% (4/4), done.[K
remote: Total 386 (delta 0), reused 0 (delta 0), pack-reused 382 (from 2)[K
Receiving objects: 100% (386/386), 679.29 MiB | 27.19 MiB/s, done.
Resolving deltas: 100% (214/214), done.
Updating files: 100% (80/80), done.


config.json:   0%|          | 0.00/679 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.55G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/181 [00:00<?, ?B/s]

In [5]:
# Ultime configurazioni per il modello
finetuned_model.config.use_cache = False
finetuned_model.config.pretraining_tp = 1
if gradient_checkpointing:
  finetuned_model.gradient_checkpointing_enable()  # Attiva gradient checkpointing per ridurre l'uso di memoria

# for name, module in finetuned_model.named_modules():
#    print(name)

# Load LoRA configuration
# Example configuration for target modules (can vary per model)
peft_config = LoraConfig(
  lora_alpha=lora_alpha,
  lora_dropout=lora_dropout,
  r=lora_r,
  bias="none",
  task_type="CAUSAL_LM",
  #target_modules=["q_proj", "v_proj"] # o ["o_proj", "k_proj"] ?
)

print(finetuned_model)

Qwen2ForCausalLM(
  (model): Qwen2Model(
    (embed_tokens): Embedding(151936, 1536)
    (layers): ModuleList(
      (0-27): 28 x Qwen2DecoderLayer(
        (self_attn): Qwen2Attention(
          (q_proj): lora.Linear4bit(
            (base_layer): Linear4bit(in_features=1536, out_features=1536, bias=True)
            (lora_dropout): ModuleDict(
              (default): Dropout(p=0.1, inplace=False)
            )
            (lora_A): ModuleDict(
              (default): Linear(in_features=1536, out_features=64, bias=False)
            )
            (lora_B): ModuleDict(
              (default): Linear(in_features=64, out_features=1536, bias=False)
            )
            (lora_embedding_A): ParameterDict()
            (lora_embedding_B): ParameterDict()
            (lora_magnitude_vector): ModuleDict()
          )
          (k_proj): Linear4bit(in_features=1536, out_features=256, bias=True)
          (v_proj): lora.Linear4bit(
            (base_layer): Linear4bit(in_features=1536, o

### Caricamento e tokenizzazione dataset

**DATASET LOAD AND PROCESSING**

In [None]:
# Load dataset (you can process it here)
dataset = load_dataset(dataset_name, split="train")
# Use only a random subset of the dataset 'dataset_range'-wide
dataset = dataset.shuffle(seed=42).select(range(dataset_range))

# # Split the subset in training and validation
# split_dataset = dataset.train_test_split(test_size=0.2, seed=42)
# train_dataset = split_dataset["train"]
# val_dataset = split_dataset["test"]

# Use the full dataset for training only
train_dataset = dataset
# Per evitare OOM, è stata rimossa la validazione. Alla fine il codice verrà sempre valutato con humaneval.

README.md:   0%|          | 0.00/1.04k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/90.1M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/50661 [00:00<?, ? examples/s]

In [None]:
from statistics import mean, median

lengths = []

for ex in dataset:
    # Ricostruisci l'input esattamente come nel preprocessing
    full_text = f"{ex['instruction'].strip()}\n{ex['seed'].strip()}"  # strip è opzionale
    # Tokenizzazione completa con special tokens (esattamente come nel preprocessing)
    input_ids = tokenizer_base(full_text, truncation=False)["input_ids"]
    #input_ids = tokenizer_finetuned(full_text, truncation=False)["input_ids"]
    lengths.append(len(input_ids))

# Analisi
max_len = max(lengths)
avg_len = mean(lengths)
med_len = median(lengths)
over_512 = sum(l > 512 for l in lengths) / len(lengths) * 100
over_1024 = sum(l > 1024 for l in lengths) / len(lengths) * 100
over_2048 = sum(l > 2048 for l in lengths) / len(lengths) * 100

print(f"Lunghezza massima: {max_len} token")
print(f"Lunghezza media: {avg_len:.2f} token")
print(f"Lunghezza mediana: {med_len} token")
print(f"Percentuale > 512 token: {over_512:.2f}%")
print(f"Percentuale > 1024 token: {over_1024:.2f}%")
print(f"Percentuale > 2048 token: {over_2048:.2f}%")

Lunghezza massima: 1575 token
Lunghezza media: 238.48 token
Lunghezza mediana: 205.0 token
Percentuale > 512 token: 4.20%
Percentuale > 1024 token: 0.38%
Percentuale > 2048 token: 0.00%


In [None]:
# # Preprocessing per SFTTrainer: inserisce tutto in un singolo campo (separando istruzione e seed con /n) e applica truncation/padding
# def preprocess_function(examples):
#     inputs = []
#     for instr, seed in zip(examples["instruction"], examples["seed"]):
#         inputs.append(f"{instr.strip()}\n{seed.strip()}")

#     model_inputs = tokenizer(inputs, truncation=True, padding="max_length", max_length=max_seq_length)
#     model_inputs["labels"] = model_inputs["input_ids"].copy()
#     return model_inputs

# # Applica il preprocessing
# tokenized_train_dataset = train_dataset.map(preprocess_function, batched=True)
# #tokenized_val_dataset = val_dataset.map(preprocess_function, batched=True)

In [None]:
#print dataset columns
print(dataset.column_names)
#print dataset example
print(dataset[0])
#print dataset example with instruction and seed
print(f"Instruction: {dataset[0]['instruction']}")
print(f"Seed: {dataset[0]['seed']}")

['fingerprint', 'sha1', 'seed', 'response', 'concepts', 'prompt', 'instruction', 'id']
{'fingerprint': None, 'sha1': 'a1ea8d1a52a57874b576afc6a4c45e7d624d409e', 'seed': 'def box_enum(typ, val, c):\n    """\n    Fetch an enum member given its native value.\n    """\n    valobj = c.box(typ.dtype, val)\n    # Call the enum class with the value object\n    cls_obj = c.pyapi.unserialize(c.pyapi.serialize_object(typ.instance_class))\n    return c.pyapi.call_function_objargs(cls_obj, (valobj,))', 'response': 'Here is a Python function that implements this approach:\n\n```python\ndef serialize_objects(objects):\n    """\n    Serialize a list of objects into a list of dictionaries.\n    Each dictionary represents the object\'s class name and attributes.\n    """\n    serialized_objects = []\n    for obj in objects:\n        serialized_object = {\n            "class_name": obj.__class__.__name__,\n            "attributes": {}\n        }\n        for attr_name, attr_value in obj.__dict__.items():

In [None]:
def preprocess_function(examples):
    input_ids_list = []
    label_ids_list = []
    attention_mask_list = []

    for instr, seed in zip(examples["instruction"], examples["seed"]):
        prompt = instr.strip() + "\n" # Aggiungi uno /n alla fine di ogni insturction
        completion = seed.strip()   # gli strip non sono necessari nel nostro dataset, non cambiano nulla

        prompt_ids = tokenizer_base(prompt, add_special_tokens=False)["input_ids"]
        completion_ids = tokenizer_base(completion, add_special_tokens=False)["input_ids"]

        input_ids = prompt_ids + completion_ids
        labels = [-100] * len(prompt_ids) + completion_ids  # 'svalutiamo' l'istruzione stessa per fare in modo che il modello impari solo a generare codice

        # Padding/truncation
        if len(input_ids) > max_seq_length:
            input_ids = input_ids[:max_seq_length]
            labels = labels[:max_seq_length]
        else:
            pad_len = max_seq_length - len(input_ids)
            input_ids += [tokenizer_base.pad_token_id] * pad_len
            labels += [-100] * pad_len

        attention_mask = [1 if token != tokenizer_base.pad_token_id else 0 for token in input_ids] # 1 per token validi, 0 per padding (in DS, padding=eos)

        input_ids_list.append(input_ids)
        label_ids_list.append(labels)
        attention_mask_list.append(attention_mask)

    return {
        "input_ids": input_ids_list,  # frasi tokenizzate
        "labels": label_ids_list, # maschera corretta per supervisionare solo la parte target
        "attention_mask": attention_mask_list
    }

# Applica il preprocessing
tokenized_train_dataset = train_dataset.map(preprocess_function, batched=True)
# tokenized_val_dataset = val_dataset.map(preprocess_function, batched=True)

Map:   0%|          | 0/4000 [00:00<?, ? examples/s]

In [None]:
vocab_size_non = non_finetuned_model.config.vocab_size
vocab_size_finetuned = finetuned_model.config.vocab_size
print("Vocab size for non finetuned model:", vocab_size_non, "\nVocab size for finetuned model:", vocab_size_finetuned)
print("Max input_id:", max([max(x) for x in tokenized_train_dataset["input_ids"] if len(x) > 0]))

Vocab size for non finetuned model: 49152 
Vocab size for finetuned model: 151936
Max input_id: 151643


### Training

In [None]:
# Set training parameters
training_arguments = TrainingArguments(
  output_dir=output_dir,
  num_train_epochs=num_train_epochs,
  per_device_train_batch_size=per_device_train_batch_size,
  gradient_accumulation_steps=gradient_accumulation_steps,
  optim=optim,
  # eval_strategy=eval_strategy,
  # eval_steps=eval_steps,
  save_steps=save_steps,
  logging_steps=logging_steps,
  learning_rate=learning_rate,
  weight_decay=weight_decay,
  fp16=fp16,
  bf16=bf16,
  max_grad_norm=max_grad_norm,
  max_steps=max_steps,
  warmup_ratio=warmup_ratio,
  group_by_length=group_by_length,
  lr_scheduler_type=lr_scheduler_type,
  report_to="tensorboard",
  gradient_checkpointing=gradient_checkpointing,
  label_names=["labels"], # Risolve: No label_names provided for model class `PeftModelForCausalLM`. Since `PeftModel` hides base models input arguments, if label_names is not given, label_names can't be set automatically within `Trainer`. Note that empty label_names list will be used instead.
)

# Train the model with the modified configuration
trainer = SFTTrainer(
  model=base_model,
  train_dataset=tokenized_train_dataset,
  #eval_dataset=tokenized_val_dataset,
  peft_config=peft_config,
  args=training_arguments,
)

Truncating train dataset:   0%|          | 0/4000 [00:00<?, ? examples/s]

In [None]:
# Start training

# Training da zero
trainer.train()

# Training se già parzialmente allenato
#trainer.train(resume_from_checkpoint=checkpoint_path)

Step,Training Loss
100,1.9457
200,0.3332
300,0.3082
400,0.3141
500,0.3164
600,0.3039
700,0.3044
800,0.3099
900,0.2944
1000,0.3009


TrainOutput(global_step=1500, training_loss=0.41304088465372724, metrics={'train_runtime': 10447.7841, 'train_samples_per_second': 1.149, 'train_steps_per_second': 0.144, 'total_flos': 1.14457602686976e+17, 'train_loss': 0.41304088465372724})

### Save model / checkpoint

**NON RUNNARE DI DEFAULT**
Se ha finito l'allenamento, salva il modello e pushalo sul git

In [None]:
# Save the fine-tuned model
#trainer.model.save_pretrained(new_model)
trainer.save_model("./results/DS_Finetuned_3")

In [None]:
from google.colab import userdata
!apt-get install git
!git config --global user.email {userdata.get('GitEmail')}
!git config --global user.name {userdata.get('GitUsername')}
!git clone https://github.com/matteraggi/FineTuningAI.git
!cd FineTuningAI  # Go to the *existing* FineTuningAI directory
!mv results/DS_Finetuned_3 FineTuningAI/models/DS_Finetuned_3/
!cd FineTuningAI && git add models/DS_Finetuned_3
!cd FineTuningAI && git commit -m "Saved 'DS_Finetuned_3', new finetuned model."
!git config --global credential.helper store
!cd FineTuningAI && git push https://{userdata.get('PAT')}@github.com/matteraggi/FineTuningAI.git main

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
git is already the newest version (1:2.34.1-1ubuntu1.12).
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.
fatal: destination path 'FineTuningAI' already exists and is not an empty directory.
mv: cannot stat 'results/DS_Finetuned_3': No such file or directory
[main c5a9fbb] Saved 'DS_Finetuned_3', new finetuned model.
 7 files changed, 757946 insertions(+)
 create mode 100644 models/DS_Finetuned_3/README.md
 create mode 100644 models/DS_Finetuned_3/adapter_config.json
 create mode 100644 models/DS_Finetuned_3/adapter_model.safetensors
 create mode 100644 models/DS_Finetuned_3/special_tokens_map.json
 create mode 100644 models/DS_Finetuned_3/tokenizer.json
 create mode 100644 models/DS_Finetuned_3/tokenizer_config.json
 create mode 100644 models/DS_Finetuned_3/training_args.bin
Enumerating objects: 13, done.
Counting objects: 100% (13/13), done.
Delta compression using up to 2

**NON RUNNARE DI DEFAULT**
Salva il checkpoint sul drive (cambiare manualmente il nome della cartella)

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!cp -r /content/results/DS_Finetuned_3 /content/drive/MyDrive/

### HumanEval evaluation

**Caricare il benchmark HumanEval**
Dopo il fine-tuning, devi confrontare il tuo modello con uno pre-addestrato (StarCoder2 senza fine-tuning) e con il modello fine-tunato.

In [None]:
gc.collect()
torch.cuda.empty_cache()
#del non_finetuned_model

In [6]:
SYSTEM_PROMPT = (
    "# Complete the following Python function by implementing its logic.\n"
    "# Return only the code of the function. Do not include comments like TODO or placeholders.\n"
    "# Do not leave the function body empty or incomplete.\n"
    "# If examples or docstrings are present, use them to infer the logic.\n"
    "# Make sure the function is correct and complete.\n\n"
)
#SYSTEM_PROMPT = """Complete the following python function as described in its description.
#                    In said description, there are also lines beginning with >>> that symbolize execution examples, useful for understanding how the function should work.\n"""

In [7]:
# Configurazione
base_model_name = "bigcode/starcoder2-3b"   # Usiamo starcoder per il confronto
device = "cuda" if torch.cuda.is_available() else "cpu"
print("device in use:", device) #DEBUG

# Caricamento del dataset HumanEval e selezione casuale di 10 esempi
humaneval = load_dataset("openai/openai_humaneval", split="test")
#sampled_humaneval = random.sample(list(humaneval), 10)








###############################################################################
sampled_humaneval = random.sample(list(humaneval), 30)

# Salva il set di test una sola volta
with open("fixed_sampled_humaneval.json", "w") as f:
    json.dump(sampled_humaneval, f, indent=4)
# Carica il dataset salvato
#with open("fixed_sampled_humaneval.json", "r") as f:
    #sampled_humaneval = json.load(f)
###############################################################################






# Configurazione quantizzazione     # Rimosso 04/05/2025
# quantization_config = BitsAndBytesConfig(
#     load_in_4bit=True,
#     bnb_4bit_quant_type="nf4",
#     bnb_4bit_compute_dtype=torch.float16,
# )
compute_dtype = getattr(torch, bnb_4bit_compute_dtype)  # Aggiunto 04/05/2025
bnb_config = BitsAndBytesConfig(
  load_in_4bit=use_4bit,
  bnb_4bit_quant_type=bnb_4bit_quant_type,
  bnb_4bit_compute_dtype=compute_dtype,
  bnb_4bit_use_double_quant=use_nested_quant,
)

# #DEBUGGING
# import os
# os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
#base_model_name = "bigcode/starcoder2-3b"   # Usiamo starcoder per il confronto

# Caricamento del modello base
non_finetuned_model = AutoModelForCausalLM.from_pretrained(
    base_model_name, device_map=device_map, quantization_config=bnb_config  #cambiata device_map da '"auto"' a 'device_map'. Cambiata quantizzazione come visto sopra.
)
non_finetuned_model.eval()

# Load starcoder (base model) tokenizer
tokenizer_base = AutoTokenizer.from_pretrained(base_model_name)
tokenizer_base.pad_token = tokenizer_base.eos_token
tokenizer_base.padding_side = 'right'

# Funzione per generare codice con il prompt modificato
def generate_code(model, prompt, tokenizer):
    # Aggiungiamo un contesto iniziale per istruire il modello correttamente
    full_prompt = SYSTEM_PROMPT + prompt
    inputs = tokenizer(full_prompt, return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=256, # era 200
            do_sample=True,
            temperature=0.8,
            top_p=0.95, # NUOVO
            repetition_penalty=1.1, # NUOVO
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=tokenizer.pad_token_id,
        )
    return tokenizer.decode(outputs[0], skip_special_tokens=True)



# Valutazione dei modelli
base_results = []
fine_tuned_results = []

for i, example in enumerate(sampled_humaneval):
    prompt = example['prompt']

    base_code = generate_code(non_finetuned_model, prompt, tokenizer_base)
    base_results.append({"prompt": prompt, "code": base_code})

    fine_tuned_code = generate_code(finetuned_model, prompt, tokenizer_finetuned)
    fine_tuned_results.append({"prompt": prompt, "code": fine_tuned_code})

    print(f"Processed example {i+1}/10")
    gc.collect()
    torch.cuda.empty_cache()

# Salvataggio dei risultati in JSON
results = {"base_results": base_results, "fine_tuned_results": fine_tuned_results}

with open("model_results.json", "w") as f:
    json.dump(results, f, indent=4)

# Creazione e stampa tabella
df = pd.DataFrame([
    {"Prompt": b['prompt'], "Non-Finetuned Code": b['code'], "Finetuned Code": f['code']}
    for b, f in zip(base_results, fine_tuned_results)
])

files.download("model_results.json")

print(tabulate(df, headers="keys", tablefmt="grid", numalign="left", stralign="left"))

device in use: cuda


README.md:   0%|          | 0.00/6.52k [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/83.9k [00:00<?, ?B/s]

Generating test split:   0%|          | 0/164 [00:00<?, ? examples/s]

config.json:   0%|          | 0.00/700 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/12.1G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/7.88k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/777k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/442k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.06M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/958 [00:00<?, ?B/s]

Processed example 1/10
Processed example 2/10
Processed example 3/10
Processed example 4/10
Processed example 5/10
Processed example 6/10
Processed example 7/10
Processed example 8/10
Processed example 9/10
Processed example 10/10
Processed example 11/10
Processed example 12/10
Processed example 13/10
Processed example 14/10
Processed example 15/10
Processed example 16/10
Processed example 17/10
Processed example 18/10
Processed example 19/10
Processed example 20/10
Processed example 21/10
Processed example 22/10
Processed example 23/10
Processed example 24/10
Processed example 25/10
Processed example 26/10
Processed example 27/10
Processed example 28/10
Processed example 29/10
Processed example 30/10


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

+----+-----------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

In [8]:
from datasets import load_dataset
import re
import json
import torch
import gc
import pandas as pd
from tabulate import tabulate
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from google.colab import files

# Configurazione
base_model_name = "bigcode/starcoder2-3b"
device = "cuda" if torch.cuda.is_available() else "cpu"
print("device in use:", device)

# Caricamento del dataset HumanEval
humaneval = load_dataset("openai/openai_humaneval", split="test")

#sampled_humaneval = random.sample(list(humaneval), 10)


###############################################################################
#sampled_humaneval = random.sample(list(humaneval), 10)

# Salva il set di test una sola volta
#with open("fixed_sampled_humaneval.json", "w") as f:
    #json.dump(sampled_humaneval, f, indent=4)
# Carica il dataset salvato
with open("fixed_sampled_humaneval.json", "r") as f:
    sampled_humaneval = json.load(f)
###############################################################################


# === Funzione per estrarre la docstring dal codice ===
def estrai_docstring(code):
    """Estrae il contenuto tra triple virgolette singole o doppie."""
    match = re.search(r'("""|\'\'\')(.*?)(\1)', code, re.DOTALL)
    return match.group(2).strip() if match else ""

# === Quantizzazione ===
compute_dtype = getattr(torch, bnb_4bit_compute_dtype)
bnb_config = BitsAndBytesConfig(
    load_in_4bit=use_4bit,
    bnb_4bit_quant_type=bnb_4bit_quant_type,
    bnb_4bit_compute_dtype=compute_dtype,
    bnb_4bit_use_double_quant=use_nested_quant,
)

# === Carica modello base ===
non_finetuned_model = AutoModelForCausalLM.from_pretrained(
    base_model_name, device_map=device_map, quantization_config=bnb_config
)
non_finetuned_model.eval()

# === Tokenizer ===
tokenizer_base = AutoTokenizer.from_pretrained(base_model_name)
tokenizer_base.pad_token = tokenizer_base.eos_token
tokenizer_base.padding_side = 'right'

# === Funzione generazione ===
def generate_code_2(model, prompt, tokenizer):
    if not prompt.strip():
        return ""
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=256,
            do_sample=True,
            temperature=0.8,
            top_p=0.95,
            repetition_penalty=1.1,
            eos_token_id=tokenizer.eos_token_id,
            pad_token_id=tokenizer.pad_token_id,
        )
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# === Valutazione ===
base_results_2 = []
fine_tuned_results_2 = []

for i, example in enumerate(sampled_humaneval):
    docstring = estrai_docstring(example["prompt"])
    if not docstring.strip():
        print(f"⚠️  Prompt vuoto per esempio {i+1}, salto...")
        continue

    prompt_2 = docstring

    base_code_2 = generate_code_2(non_finetuned_model, prompt_2, tokenizer_base)
    base_results_2.append({"prompt": prompt_2, "code": base_code_2})

    fine_tuned_code_2 = generate_code_2(finetuned_model, prompt_2, tokenizer_finetuned)
    fine_tuned_results_2.append({"prompt": prompt_2, "code": fine_tuned_code_2})

    print(f"✅ Processed example {i+1}/10")
    gc.collect()
    torch.cuda.empty_cache()

# === Salva risultati ===
results = {"base_results_2": base_results_2, "fine_tuned_results_2": fine_tuned_results_2}
with open("model_results.json", "w") as f:
    json.dump(results, f, indent=4)

# === Tabella riassuntiva ===
df = pd.DataFrame([
    {"Prompt": b['prompt'], "Non-Finetuned Code": b['code'], "Finetuned Code": f['code']}
    for b, f in zip(base_results_2, fine_tuned_results_2)
])
files.download("model_results.json")
print(tabulate(df, headers="keys", tablefmt="grid", numalign="left", stralign="left"))


device in use: cuda
✅ Processed example 1/10
✅ Processed example 2/10
✅ Processed example 3/10
✅ Processed example 4/10
✅ Processed example 5/10
✅ Processed example 6/10
✅ Processed example 7/10
✅ Processed example 8/10
✅ Processed example 9/10
✅ Processed example 10/10
✅ Processed example 11/10
✅ Processed example 12/10
✅ Processed example 13/10
✅ Processed example 14/10
✅ Processed example 15/10
✅ Processed example 16/10
✅ Processed example 17/10
✅ Processed example 18/10
✅ Processed example 19/10
✅ Processed example 20/10
✅ Processed example 21/10
✅ Processed example 22/10
✅ Processed example 23/10
✅ Processed example 24/10
✅ Processed example 25/10
✅ Processed example 26/10
✅ Processed example 27/10
✅ Processed example 28/10
✅ Processed example 29/10
✅ Processed example 30/10


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

+----+----------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|    | Prompt         

### Save/Load results

**SAVE IN DRIVE**

In [None]:
from google.colab import drive
drive.mount('/content/drive')
# Save as JSON in Drive
with open("/content/drive/MyDrive/model_results.json", "w") as f:
    json.dump(results, f, indent=4)



Mounted at /content/drive


**SAVE IN GIT**

In [None]:
from google.colab import userdata
import os

# Install git (se non già installato)
!apt-get install git -qq

# Configura Git (modo corretto per Colab)
git_email = userdata.get('GitEmail')
git_username = userdata.get('GitUsername')
pat = userdata.get('PAT')

!git config --global user.email "{git_email}"
!git config --global user.name "{git_username}"

# Clona il repository (se non esiste già o forza il reclone)
if not os.path.exists('FineTuningAI'):
    !git clone https://{pat}@github.com/matteraggi/FineTuningAI.git
else:
    !rm -rf FineTuningAI
    !git clone https://{pat}@github.com/matteraggi/FineTuningAI.git

# Copia i risultati
!cp model_results.json FineTuningAI/results/

# Commit e push
%cd FineTuningAI
!git add results/model_results.json
!git commit -m "Save model results $(date +'%Y-%m-%d %H:%M:%S')"
!git push origin main
%cd ..

Cloning into 'FineTuningAI'...
remote: Enumerating objects: 241, done.[K
remote: Counting objects: 100% (29/29), done.[K
remote: Compressing objects: 100% (23/23), done.[K
remote: Total 241 (delta 9), reused 15 (delta 3), pack-reused 212 (from 1)[K
Receiving objects: 100% (241/241), 364.12 MiB | 33.41 MiB/s, done.
Resolving deltas: 100% (124/124), done.
Updating files: 100% (46/46), done.
/content/FineTuningAI
[main 424aab1] Save model results 2025-04-06 11:16:46
 1 file changed, 86 insertions(+), 86 deletions(-)
 rewrite results/model_results.json (98%)
Enumerating objects: 7, done.
Counting objects: 100% (7/7), done.
Delta compression using up to 2 threads
Compressing objects: 100% (3/3), done.
Writing objects: 100% (4/4), 4.15 KiB | 4.15 MiB/s, done.
Total 4 (delta 1), reused 0 (delta 0), pack-reused 0
remote: Resolving deltas: 100% (1/1), completed with 1 local object.[K
To https://github.com/matteraggi/FineTuningAI.git
   57a9714..424aab1  main -> main
/content


**LOAD FROM DRIVE**

In [None]:
from google.colab import drive
drive.mount('/content/drive')
import json

# Load JSON file from Drive
with open("/content/drive/MyDrive/model_results.json", "r") as f:
    results = json.load(f)

# Now results['base_results'] and results['fine_tuned_results'] hold the data
base_results = results['base_results']
fine_tuned_results = results['fine_tuned_results']


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**LOAD FROM GIT**

In [None]:
import json
!apt-get install git
!git clone https://github.com/matteraggi/FineTuningAI.git

with open("FineTuningAI/results/model_results.json", "r") as f:
    results = json.load(f)

base_results = results['base_results']
fine_tuned_results = results['fine_tuned_results']

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
git is already the newest version (1:2.34.1-1ubuntu1.12).
0 upgraded, 0 newly installed, 0 to remove and 30 not upgraded.
fatal: destination path 'FineTuningAI' already exists and is not an empty directory.


### **Complex checking** holy shit me vs God he loses

In [9]:
!pip install pylint

Collecting pylint
  Downloading pylint-3.3.7-py3-none-any.whl.metadata (12 kB)
Collecting astroid<=3.4.0.dev0,>=3.3.8 (from pylint)
  Downloading astroid-3.3.10-py3-none-any.whl.metadata (4.4 kB)
Collecting isort!=5.13,<7,>=4.2.5 (from pylint)
  Downloading isort-6.0.1-py3-none-any.whl.metadata (11 kB)
Collecting mccabe<0.8,>=0.6 (from pylint)
  Downloading mccabe-0.7.0-py2.py3-none-any.whl.metadata (5.0 kB)
Collecting tomlkit>=0.10.1 (from pylint)
  Downloading tomlkit-0.13.2-py3-none-any.whl.metadata (2.7 kB)
Downloading pylint-3.3.7-py3-none-any.whl (522 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m522.6/522.6 kB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading astroid-3.3.10-py3-none-any.whl (275 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m275.4/275.4 kB[0m [31m25.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading isort-6.0.1-py3-none-any.whl (94 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m94.2/94.2 kB[

In [10]:
#LAVORA SU QUESTO!

import ast
import radon.complexity
import subprocess
import json
import tempfile
import zss
import re
import os
import pandas as pd
from tabulate import tabulate
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from datasets import load_dataset
import torch
import random
import gc
from radon.complexity import cc_visit
import io
import sys
import difflib

# Configurazione
os.environ["HF_ALLOW_CODE_EVAL"] = "1"
base_model_name = "bigcode/starcoder2-3b"
device = "cuda" if torch.cuda.is_available() else "cpu"

# Caricamento del modello base
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
)
device_map = {"": device}
non_finetuned_model = AutoModelForCausalLM.from_pretrained(
    base_model_name, device_map=device_map, quantization_config=bnb_config
)
non_finetuned_model.eval()

# Tokenizer
tokenizer_base = AutoTokenizer.from_pretrained(base_model_name)
tokenizer_base.pad_token = tokenizer_base.eos_token
tokenizer_base.padding_side = 'right'

SYSTEM_PROMPT = "Generate Python code for the following task:\n"

def estrai_docstring(code):
    match = re.search(r'("""|\'\'\')(.*?)(\1)', code, re.DOTALL)
    return match.group(2).strip() if match else ""

class CodeEvaluator:
    def __init__(self):
        self.tokenizer = None
        self.model = None

    def initialize_models(self):
        self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)
        self.model = AutoModelForCausalLM.from_pretrained(
            base_model_name, device_map={"": device}, quantization_config=bnb_config
        ).eval()
    def run_pylint_analysis(self, code: str) -> dict:
      results = {
        "errors": 0,
        "warnings": 0,
        "conventions": 0,
        "refactors": 0,
        "messages": []
      }

      with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as temp_file:
        temp_file.write(code)
        temp_file_path = temp_file.name

      try:
        # Run pylint and capture the output
        completed = subprocess.run(
            ["pylint", "--disable=all", "--enable=E,W,C,R", "--score=n", temp_file_path],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        #print("--- STDOUT ---")
        #print(completed.stdout)
        #print("--- STDERR ---")
        #print(completed.stderr)
        # Parse stdout line by line
        for line in completed.stdout.splitlines():
          results["messages"].append(line)

          if re.search(r'\bE\d{4}\b', line):
            results["errors"] += 1
          elif re.search(r'\bW\d{4}\b', line):
            results["warnings"] += 1
          elif re.search(r'\bC\d{4}\b', line):
            results["conventions"] += 1
          elif re.search(r'\bR\d{4}\b', line):
            results["refactors"] += 1

      except Exception as e:
        results["messages"].append(f"Errore durante analisi pylint: {str(e)}")

      finally:
        os.remove(temp_file_path)

      return results


    def extract_pure_code(self, generated: str, prompt: str) -> str:
      """
      Rimuove il prompt ripetuto dal codice generato e isola solo la parte di codice Python.
      Supporta codice dentro
python ...
 oppure codice nudo.
      """
      # 1. Rimuove prompt (se presente all'inizio)
      generated = generated.strip()
      if generated.startswith(prompt.strip()):
        generated = generated[len(prompt):].strip()

      # 2. Estrae codice da blocchi markdown ```python
      match = re.search(r"```(?:python)?(.*?)```", generated, re.DOTALL)
      if match:
        return match.group(1).strip()

      # 3. Ritorna testo 'pulito'
      return generated.strip()

    def preprocess_code(self, code: str) -> str:
        if not code:
            return ""
        matches = re.findall(r'(def\s+\w+\(.*?\):.*?(?=\n\s*def\s|\Z))', code, re.DOTALL)
        code = matches[0] if matches else code
        code = re.sub(r'#.*', '', code)
        code = re.sub(r'\.\.\..*', '', code)
        code = re.sub(r'\s+\n', '\n', code)
        return code.strip()

    def is_valid_python(self, code: str) -> bool:
      try:
        ast.parse(code)
        return True
      except:
        return False
    def safe_parse(self, code):
      try:
        return ast.parse(code)
      except:
        return None
    def is_sum_function_correct(self, code: str) -> bool:
      try:
        tree = safe_parse(code)
        if not tree:
          return False
        for node in ast.walk(tree):
            if isinstance(node, ast.FunctionDef) and node.name == "sum":
                args = node.args.args
                if len(args) == 2:
                    for n in ast.walk(node):
                        if isinstance(n, ast.Return) and isinstance(n.value, ast.BinOp):
                            if isinstance(n.value.op, ast.Add):
                                return True
        return False
      except:
        return False
    def calculate_complexity(self, code: str) -> float:
      try:
        blocks = cc_visit(code)
        return sum(b.complexity for b in blocks) / len(blocks) if blocks else 0
      except:
        return 0


    def ast_to_node(self, node):
        if isinstance(node, ast.AST):
            children = [self.ast_to_node(child) for child in ast.iter_child_nodes(node)]
            return zss.Node(type(node).__name__, children)
        return zss.Node(str(node))

    def count_control_structures(self, code: str) -> int:
      try:
        tree = safe_parse(code)
        if not tree:
          return 0
        return sum(isinstance(n, (ast.If, ast.For, ast.While)) for n in ast.walk(tree))
      except:
        return 0

    def has_function_def(self, code: str) -> bool:
      try:
        tree = safe_parse(code)
        if not tree:
          return False
        return any(isinstance(n, ast.FunctionDef) for n in ast.walk(tree))
      except:
        return False
    def evaluate_code(self, code: str, canonical: str = None) -> dict:
      clean_code = code.strip()
      tree = self.safe_parse(clean_code)
      if tree:
        implementation_complete = any(isinstance(n, (ast.Return, ast.Yield, ast.Raise)) for n in ast.walk(tree))
      else:
        implementation_complete = False
      return {
        "syntactic_correct": self.is_valid_python(clean_code),
        "is_sum_function_correct": self.is_sum_function_correct(clean_code),
        "avg_complexity": self.calculate_complexity(clean_code),
        "code_length": len(clean_code),
        "has_function_def": self.has_function_def(clean_code),
        "has_return": "return" in clean_code.lower(),
        "implementation_complete": implementation_complete,
        "num_structures": self.count_control_structures(clean_code),
        **self.run_pylint_analysis(clean_code)
      }


    def generate_code(self, model, prompt, tokenizer):
        if not prompt.strip():
            return ""
        inputs = tokenizer(prompt, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=256,
                do_sample=True,
                temperature=0.8,
                top_p=0.95,
                repetition_penalty=1.1,
                eos_token_id=tokenizer.eos_token_id,
                pad_token_id=tokenizer.pad_token_id,
            )
        return tokenizer.decode(outputs[0], skip_special_tokens=True)


    def run_evaluation(self):
        humaneval = load_dataset("openai/openai_humaneval", split="test")
        with open("fixed_sampled_humaneval.json", "r") as f:
            sampled_humaneval = json.load(f)

        self.initialize_models()
        results = {"base": [], "fine_tuned": []}

        for example in sampled_humaneval:
          prompt = example['prompt']
          canonical = example['canonical_solution']

          raw_base = self.generate_code(non_finetuned_model, SYSTEM_PROMPT + prompt, tokenizer_base)
          base_code = self.extract_pure_code(raw_base, prompt)
          base_metrics = self.evaluate_code(base_code, canonical)

          raw_finetuned = self.generate_code(finetuned_model, SYSTEM_PROMPT + prompt, tokenizer_finetuned)
          fine_tuned_code = self.extract_pure_code(raw_finetuned, prompt)
          fine_tuned_metrics = self.evaluate_code(fine_tuned_code, canonical)

          results["base"].append({
                "prompt": prompt[:100] + "..." if len(prompt) > 100 else prompt,
                "code": base_code[:200] + "..." if len(base_code) > 200 else base_code,
                **base_metrics
            })

          results["fine_tuned"].append({
                "prompt": prompt[:100] + "..." if len(prompt) > 100 else prompt,
                "code": fine_tuned_code[:200] + "..." if len(fine_tuned_code) > 200 else fine_tuned_code,
                **fine_tuned_metrics
          })
          gc.collect()

        self.analyze_results(results)

    def analyze_results(self, results):
        base_df = pd.DataFrame(results["base"])
        fine_tuned_df = pd.DataFrame(results["fine_tuned"])

        print("\n=== Metriche Aggregate ===")
        agg_metrics = [
            ["Sintatticamente Corretto", base_df["syntactic_correct"].mean(), fine_tuned_df["syntactic_correct"].mean()],
            ["Errori pylint", base_df["errors"].mean(), fine_tuned_df["errors"].mean()],
            ["Warning pylint", base_df["warnings"].mean(), fine_tuned_df["warnings"].mean()],
            ["Complessità Ciclomatica", base_df["avg_complexity"].mean(), fine_tuned_df["avg_complexity"].mean()],
            ["Implementazione Completa", base_df["implementation_complete"].mean(), fine_tuned_df["implementation_complete"].mean()]
        ]
        print(tabulate(agg_metrics, headers=["Metrica", "Base", "Fine-Tuned"], floatfmt=".2f", tablefmt="grid"))

        print("\n=== Esempi Dettagliati ===")
        valid_examples = 0
        for base, fine_tuned in zip(results["base"], results["fine_tuned"]):
          #if not base["syntactic_correct"] and not fine_tuned["syntactic_correct"]:
            #continue
          valid_examples += 1
          print(f"\nEsempio {valid_examples}:")
          print(f"Prompt: {base['prompt']}\n")

          print("🔹 Base Model:")
          print(f"Codice:\n{base['code']}")
          for k, v in base.items():
            if k not in ['prompt', 'code', 'errors', 'warnings', 'conventions', 'refactors']:
                print(f"- {k}: {v}")

          if "messages" in base:
            print("Messaggi Pylint:")
            for msg in base["messages"]:
              print(f"  • {msg}")

          print("\n🔸 Fine-Tuned Model:")
          print(f"Codice:\n{fine_tuned['code']}")
          for k, v in fine_tuned.items():
            if k not in ['prompt', 'code', 'errors', 'warnings', 'conventions', 'refactors']:
                print(f"- {k}: {v}")

          if "messages" in fine_tuned:
            print("Messaggi Pylint:")
            for msg in fine_tuned["messages"]:
              print(f"  • {msg}")

        if valid_examples == 0:
            print("Nessun esempio sintatticamente corretto trovato.")

if __name__ == "__main__":
    evaluator = CodeEvaluator()
    evaluator.run_evaluation()


=== Metriche Aggregate ===
+--------------------------+--------+--------------+
| Metrica                  |   Base |   Fine-Tuned |
| Sintatticamente Corretto |   0.07 |         0.33 |
+--------------------------+--------+--------------+
| Errori pylint            |   0.93 |         1.03 |
+--------------------------+--------+--------------+
+--------------------------+--------+--------------+
| Complessità Ciclomatica  |   0.18 |         0.33 |
+--------------------------+--------+--------------+
| Implementazione Completa |   0.07 |         0.13 |
+--------------------------+--------+--------------+

=== Esempi Dettagliati ===

Esempio 1:
Prompt: 

def sort_even(l: list):
    """This function takes a list l and returns a list l' such that
    l'...

🔹 Base Model:
Codice:
Generate Python code for the following task:


def sort_even(l: list):
    """This function takes a list l and returns a list l' such that
    l' is identical to l in the odd indicies, while its value...
- syntacti

In [11]:
#LAVORA SU QUESTO (PROMPT DOCSTRING)!

import ast
import radon.complexity
import subprocess
import json
import tempfile
import zss
import re
import os
import pandas as pd
from tabulate import tabulate
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from datasets import load_dataset
import torch
import random
import gc
from radon.complexity import cc_visit
import io
import sys
import difflib
# Configurazione
os.environ["HF_ALLOW_CODE_EVAL"] = "1"
base_model_name = "bigcode/starcoder2-3b"
device = "cuda" if torch.cuda.is_available() else "cpu"

# Caricamento del modello base
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
)
device_map = {"": device}
non_finetuned_model = AutoModelForCausalLM.from_pretrained(
    base_model_name, device_map=device_map, quantization_config=bnb_config
)
non_finetuned_model.eval()

# Tokenizer
tokenizer_base = AutoTokenizer.from_pretrained(base_model_name)
tokenizer_base.pad_token = tokenizer_base.eos_token
tokenizer_base.padding_side = 'right'

SYSTEM_PROMPT = "Generate Python code for the following task:\n"



class CodeEvaluator:
    def __init__(self):
        self.tokenizer = None
        self.model = None
    def estrai_docstring(self, code):
      """Estrae il contenuto tra triple virgolette singole o doppie."""
      match = re.search(r'("""|\'\'\')(.*?)(\1)', code, re.DOTALL)
      return match.group(2).strip() if match else ""
    def initialize_models(self):
        self.tokenizer = AutoTokenizer.from_pretrained(base_model_name)
        self.model = AutoModelForCausalLM.from_pretrained(
            base_model_name, device_map={"": device}, quantization_config=bnb_config
        ).eval()
    def run_pylint_analysis(self, code: str) -> dict:
      results = {
        "errors": 0,
        "warnings": 0,
        "conventions": 0,
        "refactors": 0,
        "messages": []
      }

      with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as temp_file:
        temp_file.write(code)
        temp_file_path = temp_file.name

      try:
        # Run pylint and capture the output
        completed = subprocess.run(
            ["pylint", "--disable=all", "--enable=E,W,C,R", "--score=n", temp_file_path],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        #print("--- STDOUT ---")
        #print(completed.stdout)
        #print("--- STDERR ---")
        #print(completed.stderr)
        # Parse stdout line by line
        for line in completed.stdout.splitlines():
          results["messages"].append(line)

          if re.search(r'\bE\d{4}\b', line):
            results["errors"] += 1
          elif re.search(r'\bW\d{4}\b', line):
            results["warnings"] += 1
          elif re.search(r'\bC\d{4}\b', line):
            results["conventions"] += 1
          elif re.search(r'\bR\d{4}\b', line):
            results["refactors"] += 1

      except Exception as e:
        results["messages"].append(f"Errore durante analisi pylint: {str(e)}")

      finally:
        os.remove(temp_file_path)

      return results


    def extract_pure_code(self, generated: str, prompt: str) -> str:
      """
      Rimuove il prompt ripetuto dal codice generato e isola solo la parte di codice Python.
      Supporta codice dentro
python ...
 oppure codice nudo.
      """
      # 1. Rimuove prompt (se presente all'inizio)
      generated = generated.strip()
      if generated.startswith(prompt.strip()):
        generated = generated[len(prompt):].strip()

      # 2. Estrae codice da blocchi markdown ```python
      match = re.search(r"```(?:python)?(.*?)```", generated, re.DOTALL)
      if match:
        return match.group(1).strip()

      # 3. Ritorna testo 'pulito'
      return generated.strip()

    def preprocess_code(self, code: str) -> str:
        if not code:
            return ""
        matches = re.findall(r'(def\s+\w+\(.*?\):.*?(?=\n\s*def\s|\Z))', code, re.DOTALL)
        code = matches[0] if matches else code
        code = re.sub(r'#.*', '', code)
        code = re.sub(r'\.\.\..*', '', code)
        code = re.sub(r'\s+\n', '\n', code)
        return code.strip()

    def is_valid_python(self, code: str) -> bool:
      try:
        ast.parse(code)
        return True
      except:
        return False
    def safe_parse(self, code):
      try:
        return ast.parse(code)
      except:
        return None
    def is_sum_function_correct(self, code: str) -> bool:
      try:
        tree = safe_parse(code)
        if not tree:
          return False
        for node in ast.walk(tree):
            if isinstance(node, ast.FunctionDef) and node.name == "sum":
                args = node.args.args
                if len(args) == 2:
                    for n in ast.walk(node):
                        if isinstance(n, ast.Return) and isinstance(n.value, ast.BinOp):
                            if isinstance(n.value.op, ast.Add):
                                return True
        return False
      except:
        return False
    def calculate_complexity(self, code: str) -> float:
      try:
        blocks = cc_visit(code)
        return sum(b.complexity for b in blocks) / len(blocks) if blocks else 0
      except:
        return 0


    def ast_to_node(self, node):
        if isinstance(node, ast.AST):
            children = [self.ast_to_node(child) for child in ast.iter_child_nodes(node)]
            return zss.Node(type(node).__name__, children)
        return zss.Node(str(node))

    def count_control_structures(self, code: str) -> int:
      try:
        tree = safe_parse(code)
        if not tree:
          return 0
        return sum(isinstance(n, (ast.If, ast.For, ast.While)) for n in ast.walk(tree))
      except:
        return 0

    def has_function_def(self, code: str) -> bool:
      try:
        tree = safe_parse(code)
        if not tree:
          return False
        return any(isinstance(n, ast.FunctionDef) for n in ast.walk(tree))
      except:
        return False
    def evaluate_code(self, code: str, canonical: str = None) -> dict:
      clean_code = code.strip()
      tree = self.safe_parse(clean_code)
      if tree:
        implementation_complete = any(isinstance(n, (ast.Return, ast.Yield, ast.Raise)) for n in ast.walk(tree))
      else:
        implementation_complete = False
      return {
        "syntactic_correct": self.is_valid_python(clean_code),
        "is_sum_function_correct": self.is_sum_function_correct(clean_code),
        "avg_complexity": self.calculate_complexity(clean_code),
        "code_length": len(clean_code),
        "has_function_def": self.has_function_def(clean_code),
        "has_return": "return" in clean_code.lower(),
        "implementation_complete": implementation_complete,
        "num_structures": self.count_control_structures(clean_code),
        **self.run_pylint_analysis(clean_code)
      }


    def generate_code(self, model, prompt, tokenizer):
        if not prompt.strip():
            return ""
        inputs = tokenizer(prompt, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=256,
                do_sample=True,
                temperature=0.8,
                top_p=0.95,
                repetition_penalty=1.1,
                eos_token_id=tokenizer.eos_token_id,
                pad_token_id=tokenizer.pad_token_id,
            )
        return tokenizer.decode(outputs[0], skip_special_tokens=True)


    def run_evaluation(self):
        humaneval = load_dataset("openai/openai_humaneval", split="test")
        with open("fixed_sampled_humaneval.json", "r") as f:
            sampled_humaneval = json.load(f)

        self.initialize_models()
        results = {"base": [], "fine_tuned": []}
        for example in sampled_humaneval:
          prompt = estrai_docstring(example["prompt"])
          canonical = example['canonical_solution']

          raw_base = self.generate_code(non_finetuned_model, SYSTEM_PROMPT + prompt, tokenizer_base)
          base_code = self.extract_pure_code(raw_base, prompt)
          base_metrics = self.evaluate_code(base_code, canonical)

          raw_finetuned = self.generate_code(finetuned_model, SYSTEM_PROMPT + prompt, tokenizer_finetuned)
          fine_tuned_code = self.extract_pure_code(raw_finetuned, prompt)
          fine_tuned_metrics = self.evaluate_code(fine_tuned_code, canonical)

          results["base"].append({
                "prompt": prompt[:100] + "..." if len(prompt) > 100 else prompt,
                "code": base_code[:200] + "..." if len(base_code) > 200 else base_code,
                **base_metrics
            })

          results["fine_tuned"].append({
                "prompt": prompt[:100] + "..." if len(prompt) > 100 else prompt,
                "code": fine_tuned_code[:200] + "..." if len(fine_tuned_code) > 200 else fine_tuned_code,
                **fine_tuned_metrics
          })
          gc.collect()

        self.analyze_results(results)

    def analyze_results(self, results):
        base_df = pd.DataFrame(results["base"])
        fine_tuned_df = pd.DataFrame(results["fine_tuned"])

        print("\n=== Metriche Aggregate ===")
        agg_metrics = [
            ["Sintatticamente Corretto", base_df["syntactic_correct"].mean(), fine_tuned_df["syntactic_correct"].mean()],
            ["Errori pylint", base_df["errors"].mean(), fine_tuned_df["errors"].mean()],
            ["Warning pylint", base_df["warnings"].mean(), fine_tuned_df["warnings"].mean()],
            ["Complessità Ciclomatica", base_df["avg_complexity"].mean(), fine_tuned_df["avg_complexity"].mean()],
            ["Implementazione Completa", base_df["implementation_complete"].mean(), fine_tuned_df["implementation_complete"].mean()]
        ]
        print(tabulate(agg_metrics, headers=["Metrica", "Base", "Fine-Tuned"], floatfmt=".2f", tablefmt="grid"))

        print("\n=== Esempi Dettagliati ===")
        valid_examples = 0
        for base, fine_tuned in zip(results["base"], results["fine_tuned"]):
          #if not base["syntactic_correct"] and not fine_tuned["syntactic_correct"]:
            #continue
          valid_examples += 1
          print(f"\nEsempio {valid_examples}:")
          print(f"Prompt: {base['prompt']}\n")

          print("🔹 Base Model:")
          print(f"Codice:\n{base['code']}")
          for k, v in base.items():
            if k not in ['prompt', 'code', 'errors', 'warnings', 'conventions', 'refactors']:
                print(f"- {k}: {v}")

          if "messages" in base:
            print("Messaggi Pylint:")
            for msg in base["messages"]:
              print(f"  • {msg}")

          print("\n🔸 Fine-Tuned Model:")
          print(f"Codice:\n{fine_tuned['code']}")
          for k, v in fine_tuned.items():
            if k not in ['prompt', 'code', 'errors', 'warnings', 'conventions', 'refactors']:
                print(f"- {k}: {v}")

          if "messages" in fine_tuned:
            print("Messaggi Pylint:")
            for msg in fine_tuned["messages"]:
              print(f"  • {msg}")

        if valid_examples == 0:
            print("Nessun esempio sintatticamente corretto trovato.")

if __name__ == "__main__":
    evaluator = CodeEvaluator()
    evaluator.run_evaluation()


=== Metriche Aggregate ===
+--------------------------+--------+--------------+
| Metrica                  |   Base |   Fine-Tuned |
| Sintatticamente Corretto |   0.07 |         0.50 |
+--------------------------+--------+--------------+
| Errori pylint            |   0.93 |         0.70 |
+--------------------------+--------+--------------+
+--------------------------+--------+--------------+
| Complessità Ciclomatica  |   0.13 |         1.33 |
+--------------------------+--------+--------------+
| Implementazione Completa |   0.07 |         0.37 |
+--------------------------+--------+--------------+

=== Esempi Dettagliati ===

Esempio 1:
Prompt: This function takes a list l and returns a list l' such that
    l' is identical to l in the odd ind...

🔹 Base Model:
Codice:
Generate Python code for the following task:
This function takes a list l and returns a list l' such that
    l' is identical to l in the odd indicies, while its values at the even indicies are equal
...
- syntacti