<a href="https://colab.research.google.com/github/xbeat/CPF/blob/main/AI/CPF3_SML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import json
import random

vulnerability_templates = {
    "1.1": {"patterns": ["CEO requests: {action} now."], "actions": ["transfer funds", "share credentials"]},
    "2.1": {"patterns": ["URGENT: {action} in 1hr."], "actions": ["approve transfer", "reset password"]},
    "3.1": {"patterns": ["I helped you, please {action}."], "actions": ["share file", "approve request"]}
}

def generate_synthetic_data(num_samples=1000):
    samples = []
    for _ in range(num_samples):
        indicator = random.choice(list(vulnerability_templates.keys()))
        template = random.choice(vulnerability_templates[indicator]["patterns"])
        action = random.choice(vulnerability_templates[indicator]["actions"])
        text = template.format(action=action)
        severity = random.choice(["green", "yellow", "red"])
        samples.append({"text": text, "label": indicator, "severity": severity})
    with open("/content/drive/MyDrive/synthetic_data.json", "w") as f:
        json.dump(samples, f, indent=2)
    return samples

# Run in Colab
generate_synthetic_data()

In [None]:
!ls /content/drive/MyDrive/synthetic_data.json

In [None]:
!pip install transformers datasets torch

from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset

# Load data
dataset = load_dataset("json", data_files="/content/drive/MyDrive/synthetic_data.json", split="train")

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")  # era microsoft/phi-3-mini-4k-instruct

# Preprocessing
def preprocess(examples):
    tokenized = tokenizer(examples["text"], truncation=True, padding="max_length", max_length=128)
    labels = {"green": 0, "yellow": 1, "red": 2}
    tokenized["label"] = [labels[sev] for sev in examples["severity"]]
    return tokenized

dataset = dataset.map(preprocess, batched=True)
train_dataset, eval_dataset = dataset.train_test_split(test_size=0.2).values()

# Model - stesso modello del tokenizer
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=3)

# Training
args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=8,  # Aumentato
    learning_rate=2e-5,  # Learning rate ottimale
    warmup_steps=100,  # Warmup
    weight_decay=0.01,  # Regularizzazione
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    report_to="none"
)

trainer = Trainer(model=model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset)
trainer.train()

# Save to Hugging Face -
trainer.push_to_hub("CPF3-org/cpf-poc-model")

In [None]:
# FINE-TUNING
!pip install transformers datasets torch huggingface_hub

from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset
from huggingface_hub import HfApi

# Load data
dataset = load_dataset("json", data_files="/content/drive/MyDrive/synthetic_data.json", split="train")

# Tokenizer e preprocessing
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

def preprocess(examples):
    tokenized = tokenizer(examples["text"], truncation=True, padding="max_length", max_length=128)
    labels = {"green": 0, "yellow": 1, "red": 2}
    tokenized["label"] = [labels[sev] for sev in examples["severity"]]
    return tokenized

dataset = dataset.map(preprocess, batched=True)
train_dataset, eval_dataset = dataset.train_test_split(test_size=0.2).values()

# Model
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=3)

# Training
args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=8,  # Aumentato
    learning_rate=2e-5,  # Learning rate ottimale
    warmup_steps=100,  # Warmup
    weight_decay=0.01,  # Regularizzazione
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    report_to="none"
)

trainer = Trainer(model=model, args=args, train_dataset=train_dataset, eval_dataset=eval_dataset)
trainer.train()

# UPLOAD CORRETTO (sostituisce trainer.push_to_hub)
trainer.save_model("./cpf-model-final")
tokenizer.save_pretrained("./cpf-model-final")

api = HfApi()
api.upload_folder(
    folder_path="./cpf-model-final",
    repo_id="CPF3-org/cpf-poc-model",
    repo_type="model"
)

print("✅ Modello caricato in CPF3-org/cpf-poc-model")

In [None]:
# In Colab, verifica distribuzione:
import json
with open("/content/drive/MyDrive/synthetic_data.json", "r") as f:
    data = json.load(f)

severity_count = {}
for item in data:
    sev = item["severity"]
    severity_count[sev] = severity_count.get(sev, 0) + 1

print(severity_count)

In [None]:
# Test diretto del modello
from transformers import pipeline
model = pipeline("text-classification", model="CPF3-org/cpf-poc-model")

tests = [
    "CEO requests: transfer funds now.",
    "URGENT: approve transfer in 1hr.",
    "Normal meeting tomorrow."
]

for text in tests:
    result = model(text)
    print(f"'{text}' -> {result}")