In [None]:
!pip install --no-deps bitsandbytes accelerate xformers==0.0.29.post3 peft trl triton cut_cross_entropy #unsloth_zoo
#!pip install "unsloth_zoo @ git+https://github.com/unslothai/unsloth-zoo.git"
!pip install sentencepiece protobuf "datasets>=3.4.1"  hf_transfer
!pip install --no-deps unsloth
!pip install --no-deps git+https://github.com/huggingface/transformers.git # Only for Gemma 3N
!pip install --no-deps --upgrade timm # Only for Gemma 3N
!pip install --upgrade unsloth unsloth_zoo huggingface-hub

In [None]:
from datasets import load_dataset
from unsloth.chat_templates import standardize_data_formats

In [None]:
#!huggingface-cli login

In [None]:
from PIL import Image
import numpy as np
import random

In [None]:
def create_dummy_image():
    dummy_array = np.zeros((224, 224, 3), dtype=np.uint8)
    return Image.fromarray(dummy_array)

def create_text_placeholder_image():
    img = Image.new('RGB', (224, 224), color='white')
    from PIL import ImageDraw, ImageFont
    draw = ImageDraw.Draw(img)
    draw.text((50, 100), "PREGUNTA DE TEXTO", fill='black')
    return img

DUMMY_IMAGE = create_dummy_image()

In [None]:
dataset_name = "sergioq2/coffe"

In [None]:
dataset_text = load_dataset(dataset_name, split = "train[:5100]")

In [None]:
def transform_to_conversations_format(example):
    conversations = [
        {
            "from": "human",
            "value": example["preguntas"]
        },
        {
            "from": "gpt",
            "value": example["respuestas"]
        }
    ]

    return {
        "conversations": conversations,
        "source": "coffe_dataset"
    }

In [None]:
transformed_dataset_text = dataset_text.map(transform_to_conversations_format)


In [None]:
columns_to_remove = ["Unnamed: 0", "texto", "preguntas", "respuestas"]
transformed_dataset_text = transformed_dataset_text.remove_columns(columns_to_remove)

In [None]:
formatted_dataset_text = standardize_data_formats(transformed_dataset_text)

In [None]:
def convert_text_to_messages(sample):
    assistant_msg = None

    for conv in sample['conversations']:
        if conv['role'] == 'user':
            user_msg = conv['content']
        elif conv['role'] == 'assistant':
            assistant_msg = conv['content']

    return {
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": user_msg}
                ],
            },
            {
                "role": "assistant",
                "content": [
                    {"type": "text", "text": assistant_msg}
                ]
            },
        ],
        "source": sample.get('source', 'coffe_dataset'),
        "original_text": sample.get('text', '')
    }


In [None]:
text_dataset_converted = [convert_text_to_messages(s) for s in formatted_dataset_text]
print(f"Text converted: {len(text_dataset_converted)}")

In [None]:
def add_dummy_image_to_text_dataset(text_dataset):
    modified_dataset = []

    for sample in text_dataset:
        new_sample = {
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": sample["messages"][0]["content"][0]["text"]},
                        {"type": "image", "image": DUMMY_IMAGE}
                    ]
                },
                {
                    "role": "assistant",
                    "content": sample["messages"][1]["content"]
                }
            ]
        }
        modified_dataset.append(new_sample)

    return modified_dataset


In [None]:
text_dataset_images = add_dummy_image_to_text_dataset(text_dataset_converted)

In [None]:
text_dataset_images[0]

Function calling

In [None]:
fc_dataset = "sergioq2/functioncalling_coffedata"

In [None]:
dataset_fc = load_dataset(fc_dataset, split = "train[:2700]")

In [None]:
dataset_fc

In [None]:
def transform_fc_to_conversations_format(example):
    conversations = [
        {
            "from": "human",
            "value": example["query"]
        },
        {
            "from": "gpt",
            "value": example["function"]
        }
    ]
    return {
        "conversations": conversations,
        "source": "function_calling_dataset"
    }

In [None]:
transformed_dataset_fc = dataset_fc.map(transform_fc_to_conversations_format)

In [None]:
columns_to_remove_fc = ["query", "function"]
transformed_dataset_fc = transformed_dataset_fc.remove_columns(columns_to_remove_fc)

In [None]:
formatted_dataset_fc = standardize_data_formats(transformed_dataset_fc)

In [None]:
fc_dataset_converted = [convert_text_to_messages(s) for s in formatted_dataset_fc]

In [None]:
fc_dataset_images = add_dummy_image_to_text_dataset(fc_dataset_converted)

In [None]:
fc_dataset_images[0]

In [None]:
!pip install roboflow

Image

In [None]:
import os
ROBOFLOW_API_KEY = os.getenv("ROBOFLOW_API_KEY", "your_roboflow_api_key_here")

In [None]:
from roboflow import Roboflow
rf = Roboflow(api_key=ROBOFLOW_API_KEY)
project = rf.workspace("detection-3nbwx").project("coffe-mw9n0")
version = project.version(2)
dataset = version.download("multiclass")

In [None]:
import pandas as pd
from datasets import Dataset, Features, Image, Value
from PIL import Image as PILImage
import os

base_path = '/content/coffe-2/train'
csv_path = os.path.join(base_path, '_classes.csv')

df = pd.read_csv(csv_path)
image_paths = []
texts = []

class_columns = [col.strip() for col in df.columns if col != 'filename']
print(f"Cleaned class names detected: {class_columns}")

class_name_mapping = {
    'Broca': 'Broca',
    'Mancha': 'Mancha de hierro',
    'Ojo': 'Ojo de gallo',
    'Rosado': 'Mal Rosado',
    'Roya': 'Roya'
}

for index, row in df.iterrows():
    filename = row['filename']
    image_full_path = os.path.join(base_path, filename)

    if os.path.exists(image_full_path):
        image_paths.append(image_full_path)

        active_labels = [col.strip() for col, value in row.drop('filename').items() if value == 1]

        mapped_labels = [class_name_mapping.get(label, label) for label in active_labels]

        if mapped_labels:
            if len(mapped_labels) == 1:
                texts.append(f"La enfermedad que tiene la planta es {mapped_labels[0]}")
            else:
                texts.append(f"Las enfermedades que tiene la planta son {', '.join(mapped_labels)}")
        else:
            texts.append("La planta está sana")

    else:
        print(f"Warning: Image not found at path: {image_full_path}. Skipping.")

data = {
    'image': image_paths,
    'text': texts
}

features = Features({
    'image': Image(),
    'text': Value(dtype='string')
})

dataset_images = Dataset.from_dict(data, features=features)

In [None]:
import pandas as pd
from datasets import Dataset, Features, Image, Value
from PIL import Image as PILImage, ImageEnhance, ImageFilter
import random
from collections import Counter

def apply_random_augmentation(image):
    augmentations = [
        lambda img: ImageEnhance.Brightness(img).enhance(random.uniform(0.95, 1.05)),  # ±5% brillo
        lambda img: ImageEnhance.Contrast(img).enhance(random.uniform(0.95, 1.05)),    # ±5% contraste
        lambda img: img.rotate(random.randint(-3, 3), expand=True, fillcolor=(255, 255, 255)),  # ±3 grados
        lambda img: img.filter(ImageFilter.GaussianBlur(radius=random.uniform(0.1, 0.3))),  # Blur muy ligero
        lambda img: ImageEnhance.Color(img).enhance(random.uniform(0.98, 1.02)),  # ±2% saturación
        lambda img: ImageEnhance.Sharpness(img).enhance(random.uniform(0.98, 1.02)),   # ±2% nitidez
    ]

    augmentation = random.choice(augmentations)
    return augmentation(image)

def augment_dataset(dataset_images, multiplier_roya_broca=7, multiplier_others=4):
    all_images = []
    all_texts = []

    for i in range(len(dataset_images)):
        original_image = dataset_images[i]['image']
        original_text = dataset_images[i]['text']

        all_images.append(original_image)
        all_texts.append(original_text)

        if "Roya" in original_text or "Broca" in original_text:
            num_augmentations = multiplier_roya_broca - 1
        else:
            num_augmentations = multiplier_others - 1

        if i % 100 == 0:
            print(f"Processing image {i+1}/{len(dataset_images)}")

        for aug_idx in range(num_augmentations):
            try:
                img_copy = original_image.copy()
                augmented_img = apply_random_augmentation(img_copy)
                all_images.append(augmented_img)
                all_texts.append(original_text)
            except Exception as e:
                print(f"Error in image {i}: {e}")
                continue

    new_data = {
        'image': all_images,
        'text': all_texts
    }

    features = Features({
        'image': Image(),
        'text': Value(dtype='string')
    })

    augmented_dataset = Dataset.from_dict(new_data, features=features)

    class_counts = Counter()
    for text in augmented_dataset['text']:
        if "enfermedad que tiene" in text:
            disease = text.split("es ")[-1]
            class_counts[disease] += 1
        elif "enfermedades que tiene" in text:
            diseases = text.split("son ")[-1].split(", ")
            for disease in diseases:
                class_counts[disease.strip()] += 1

    return augmented_dataset

augmented_dataset = augment_dataset(
    dataset_images,
    multiplier_roya_broca=4,
    multiplier_others=2
)

print(f"\nDataset ready: {len(augmented_dataset)} images")

In [None]:
def convert_image_to_messages(sample):
    """Convert simple format into messages format"""
    return {
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "¿Qué enfermedad tiene esta planta de café?"},
                    {"type": "image", "image": sample["image"]},
                ],
            },
            {
                "role": "assistant",
                "content": [
                    {"type": "text", "text": sample["text"]}
                ]
            },
        ],
    }

In [None]:
image_dataset_converted = [convert_image_to_messages(s) for s in augmented_dataset]
print(f"Images converted: {len(image_dataset_converted)}")

In [None]:
def fix_assistant_content_format(dataset):
    fixed_dataset = []

    for sample in dataset:
        new_sample = sample.copy()

        if isinstance(sample['messages'][1]['content'], list):
            text_content = sample['messages'][1]['content'][0]['text']
            new_sample['messages'][1]['content'] = text_content

        fixed_dataset.append(new_sample)

    return fixed_dataset

In [None]:
image_dataset_converted = fix_assistant_content_format(image_dataset_converted)
text_dataset_images = fix_assistant_content_format(text_dataset_images)
fc_dataset_images = fix_assistant_content_format(fc_dataset_images)

In [None]:
image_dataset_converted[0]

In [None]:
text_dataset_images[0]

In [None]:
fc_dataset_images[0]

In [None]:
combined_dataset_full = image_dataset_converted + text_dataset_images + fc_dataset_images
random.shuffle(combined_dataset_full)

In [None]:
from unsloth import FastVisionModel
import torch

model, tokenizer = FastVisionModel.from_pretrained(
    "unsloth/gemma-3n-E2B-it",
    load_in_4bit = True,
    use_gradient_checkpointing = "unsloth",
)

In [None]:
model = FastVisionModel.get_peft_model(
    model,
    finetune_vision_layers     = True,
    finetune_language_layers   = True,
    finetune_attention_modules = True,
    finetune_mlp_modules       = True,

    r =32,
    lora_alpha = 64,
    lora_dropout = 0.03,
    bias = "none",
    random_state = 3407,
    use_rslora = True,
    loftq_config = None,
    target_modules = "all-linear",
    modules_to_save=[
        "lm_head",
        "embed_tokens",
    ],
)

In [None]:
import os
os.environ["PYTORCH_CUDA_COMPILE_DISABLE"] = "1"
os.environ["PYTORCH_DISABLE_DYNAMO"] = "1"

In [None]:
from unsloth.trainer import UnslothVisionDataCollator
from trl import SFTTrainer, SFTConfig

FastVisionModel.for_training(model)

trainer = SFTTrainer(
    model=model,
    train_dataset=combined_dataset_full,
    processing_class=tokenizer,
    data_collator=UnslothVisionDataCollator(model, tokenizer),
    args = SFTConfig(
        per_device_train_batch_size = 4,
        gradient_accumulation_steps = 8,
        gradient_checkpointing = True,

        gradient_checkpointing_kwargs = {"use_reentrant": False},
        max_grad_norm = 0.3,
        warmup_ratio = 0.05,
        max_steps = 100,
        num_train_epochs = 3,
        learning_rate = 2e-4,
        logging_steps = 1,
        save_strategy="steps",
        optim = "adamw_torch_fused",
        weight_decay = 0.05,
        lr_scheduler_type = "cosine",
        seed = 3407,
        output_dir = "outputs",
        report_to = "none",
        remove_unused_columns = False,
        dataset_text_field = "",
        dataset_kwargs = {"skip_prepare_dataset": True}
    )
)

In [None]:
trainer_stats = trainer.train()

In [None]:
import os
HUGGINGFACE_HUB_TOKEN = os.getenv("HUGGINGFACE_HUB_TOKEN", "your_huggingface_token_here")

In [None]:
if True:
    model.push_to_hub_merged(
        "sergioq2/gemma-3N-finetune-coffe_q4_off", tokenizer,
        token = HUGGINGFACE_HUB_TOKEN
    )