In [None]:
import os
os.environ["TRANSFORMERS_NO_TF"] = "1"      # Disable TensorFlow, use PyTorch only
os.environ['WANDB_DISABLED'] = "true"       # Disable Weights & Biases logging


In [None]:
import re
import pandas as pd
import numpy as np
import torch
from tqdm import tqdm
from datasets import Dataset
from transformers import (
    AutoModelForCausalLM, 
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer,
    GenerationConfig,
    DataCollatorForLanguageModeling
)
from trl import SFTTrainer
from peft import prepare_model_for_kbit_training, LoraConfig, get_peft_model

In [None]:
# specify params
SEED = 42
CSV_PATH = "data10000.csv"                  # Path to your dataset file
MODEL_NAME = "meta-llama/Meta-Llama-3-8B-Instruct"
MAX_LEN = 6144                              # Based on p99≈5985, saves compute

# load CSV
dataset = pd.read_csv(CSV_PATH)

# Create the text column 
dataset["text"] = dataset.apply(
    lambda row: f"Given the following description: {row['description']}, generate the corresponding SVG string.\n{row['svg']}",
    axis=1
)

# load tokenizer & SVG token expansion
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)

svg_corpus = dataset['svg'].tolist()
svg_tokens = set()
for svg in svg_corpus:
    tags = re.findall(r"</?\w+", svg)        # Match SVG/XML tags
    svg_tokens.update(tags)
    attrs = re.findall(r'\b\w+=', svg)       # Match attributes (fill=, stroke=, etc.)
    svg_tokens.update(attrs)

new_tokens = [t for t in svg_tokens if t not in tokenizer.get_vocab()]
print(f"Adding {len(new_tokens)} new tokens...")
tokenizer.add_tokens(new_tokens)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.model_max_length = MAX_LEN

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Adding 10 new tokens...


In [None]:
# train test valid split: 8-1-1 split
ds_all = Dataset.from_pandas(dataset[["text"]], preserve_index=False)
training, temp = ds_all.train_test_split(test_size=0.2, seed=SEED).values()
validation, testing = temp.train_test_split(test_size=0.5, seed=SEED).values()
print(f"training: {len(training)}, validation: {len(validation)}, testing: {len(testing)}")

training: 8011, validation: 1001, testing: 1002


In [None]:
# Tokenization + masking
def tokenize_and_mask(ex):
    enc = tokenizer(
        ex["text"],
        truncation=True,
        max_length=MAX_LEN,
        padding=False,  # Dynamic padding
    )
    input_ids = enc["input_ids"]
    text = ex["text"]
    svg_start = text.find("<svg")
    if svg_start == -1:
        labels = input_ids.copy()
    else:
        prompt_part = text[:svg_start]
        prompt_len = len(tokenizer(prompt_part, add_special_tokens=False)["input_ids"])
        labels = [-100] * prompt_len + input_ids[prompt_len:]
    enc["labels"] = labels
    return enc

for name, ds in [("training", training), ("validation", validation), ("testing", testing)]:
    ds = ds.map(tokenize_and_mask, batched=False, remove_columns=["text"])
    ds.set_format(type="torch")
    if name == "training": training = ds
    elif name == "validation": validation = ds
    else: testing = ds

# Data collator for Trainer/SFTTrainer
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False, pad_to_multiple_of=8)

print("Dataset is ready.")

Map:   0%|          | 0/8011 [00:00<?, ? examples/s]

Map:   0%|          | 0/1001 [00:00<?, ? examples/s]

Map:   0%|          | 0/1002 [00:00<?, ? examples/s]

Dataset is ready.


In [None]:
# Model loading (4-bit QLoRA)
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
)

model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    quantization_config=bnb_config,
    device_map="auto",
    trust_remote_code=True,
    torch_dtype=torch.float16,
)
model.resize_token_embeddings(len(tokenizer))  # adjust for new tokens

# Prepare for LoRA
model = prepare_model_for_kbit_training(model)

lora_config = LoraConfig(
    r=8,
    lora_alpha=32,
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM",
)
model = get_peft_model(model, lora_config)
model.gradient_checkpointing_enable()
model.print_trainable_parameters()



Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

trainable params: 3,407,872 || all params: 8,033,751,040 || trainable%: 0.042419437483589235


In [None]:
# training setup
from transformers import TrainerCallback
from datetime import datetime, timedelta
import time

training_args = TrainingArguments(
    output_dir="qlora_lora_out",
    per_device_train_batch_size=1,
    gradient_accumulation_steps=16,
    num_train_epochs=3,
    learning_rate=2e-4,
    fp16=True,
    logging_steps=1,          # update logs every step → smoother tqdm + ETA
    save_strategy="epoch",
    save_total_limit=2,
    report_to="none",
    disable_tqdm=False,       # keep tqdm enabled
)
# self-deine callback: ETA
class ETAProgressCallback(TrainerCallback):
    """Show elapsed time, ETA and estimated finish time during training."""
    def __init__(self, every_steps: int = 20):
        self.every_steps = every_steps
        self._start_time = None

    def on_train_begin(self, args, state, control, **kwargs):
        self._start_time = time.time()
        print(f"🚀 Training started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        if state.max_steps:
            print(f"Total steps (estimated): {state.max_steps}")

    def on_log(self, args, state, control, **kwargs):
        if state.global_step and state.global_step % self.every_steps == 0:
            elapsed = time.time() - self._start_time
            # fall back if max_steps is not set yet
            total_steps = state.max_steps or (state.global_step / max(1e-9, state.epoch)) * state.num_train_epochs
            progress = min(1.0, state.global_step / max(1, total_steps))
            if progress > 0:
                remaining = elapsed * (1 / progress - 1)
                eta_dt = datetime.now() + timedelta(seconds=remaining)
                from tqdm import tqdm as _tqdm
                _tqdm.write(
                    f"[step {state.global_step}/{int(total_steps)}] "
                    f"elapsed: {int(elapsed//3600)}h {int((elapsed%3600)//60)}m {int(elapsed%60)}s | "
                    f"ETA: {int(remaining//3600)}h {int((remaining%3600)//60)}m {int(remaining%60)}s "
                    f"(~{eta_dt.strftime('%H:%M:%S')})"
                )

    def on_train_end(self, args, state, control, **kwargs):
        total = time.time() - self._start_time
        print(f"✅ Training finished at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"⏱ Total time: {int(total//3600)}h {int((total%3600)//60)}m {int(total%60)}s")

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=training,
    eval_dataset=validation,
    data_collator=data_collator,
    callbacks=[ETAProgressCallback(every_steps=20)],  # change to 10/50 as you like
)

# ===== Step 9: Train + save =====
start_time = time.time()
train_output = trainer.train()
elapsed = time.time() - start_time
print(f"\n✅ Training complete in {int(elapsed//3600)}h {int(elapsed%3600//60)}m {int(elapsed%60)}s")
print("Metrics:", train_output.metrics)

model.save_pretrained("qlora_meta_llama3_8B")
tokenizer.save_pretrained("qlora_meta_llama3_8B")
print("💾 Saved to qlora_meta_llama3_8B")


🚀 Training started at 2025-08-12 12:09:01
Total steps (estimated): 1500


KeyboardInterrupt: 

In [None]:
# Save model and tokenizer
model.save_pretrained("qlora_meta_llama3_8B")
tokenizer.save_pretrained("qlora_meta_llama3_8B")

print("Training complete. Model and tokenizer saved.")