In [3]:
pip install -U kubeflow-training

Collecting kubeflow-training
  Obtaining dependency information for kubeflow-training from https://files.pythonhosted.org/packages/bc/d8/216bcea878fb7b1dcb02b15e6e95564dc45003f4c6e7c241344b93fbf1f6/kubeflow_training-1.9.0-py3-none-any.whl.metadata
  Downloading kubeflow_training-1.9.0-py3-none-any.whl.metadata (1.7 kB)
Collecting kubernetes>=27.2.0 (from kubeflow-training)
  Obtaining dependency information for kubernetes>=27.2.0 from https://files.pythonhosted.org/packages/df/14/a59acfe4b3095f2a4fd8d13b348853a69c8f1ed4bce9af00d1b31351a88e/kubernetes-32.0.0-py2.py3-none-any.whl.metadata
  Downloading kubernetes-32.0.0-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting retrying>=1.3.3 (from kubeflow-training)
  Obtaining dependency information for retrying>=1.3.3 from https://files.pythonhosted.org/packages/8f/04/9e36f28be4c0532c0e9207ff9dc01fb13a2b0eb036476a213b0000837d0e/retrying-1.3.4-py3-none-any.whl.metadata
  Downloading retrying-1.3.4-py3-none-any.whl.metadata (6.9 kB)
Collecting 

In [30]:
def train_func():
    import os
    import logging
    from transformers import (
        AutoModelForCausalLM,
        AutoTokenizer,
        TrainingArguments,
        DataCollatorForLanguageModeling,
    )
    from trl import SFTTrainer
    from datasets import load_dataset
    from datasets.distributed import split_dataset_by_node
    from peft import LoraConfig, get_peft_model

    log_formatter = logging.Formatter(
        "%(asctime)s %(levelname)-8s %(message)s", "%Y-%m-%dT%H:%M:%SZ"
    )
    logger = logging.getLogger(__file__)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(log_formatter)
    logger.addHandler(console_handler)
    logger.setLevel(logging.INFO)

    # Create system prompt
    system_message = """Solve the given high school math problem by providing a clear explanation of each step leading to the final solution.

    Provide a detailed breakdown of your calculations, beginning with an explanation of the problem and describing how you derive each formula, value, or conclusion. Use logical steps that build upon one another, to arrive at the final answer in a systematic manner.

    # Steps

    1. **Understand the Problem**: Restate the given math problem and clearly identify the main question and any important given values.
    2. **Set Up**: Identify the key formulas or concepts that could help solve the problem (e.g., algebraic manipulation, geometry formulas, trigonometric identities).
    3. **Solve Step-by-Step**: Iteratively progress through each step of the math problem, justifying why each consecutive operation brings you closer to the solution.
    4. **Double Check**: If applicable, double check the work for accuracy and sense, and mention potential alternative approaches if any.
    5. **Final Answer**: Provide the numerical or algebraic solution clearly, accompanied by appropriate units if relevant.

    # Notes

    - Always clearly define any variable or term used.
    - Wherever applicable, include unit conversions or context to explain why each formula or step has been chosen.
    - Assume the level of mathematics is suitable for high school, and avoid overly advanced math techniques unless they are common at that level.
    """

    model_name = "meta-llama/Llama-3.2-1B-Instruct"
    model = AutoModelForCausalLM.from_pretrained(
        pretrained_model_name_or_path=model_name,
    )
    tokenizer = AutoTokenizer.from_pretrained(
        pretrained_model_name_or_path=model_name,
    )
    tokenizer.pad_token = tokenizer.eos_token

    # Freeze model parameters
#    for param in model.parameters():
#        param.requires_grad = False

    # Inspired by https://medium.com/@alexandros_chariton/how-to-fine-tune-llama-3-2-instruct-on-your-own-data-a-detailed-guide-e5f522f397d7
    def format_dataset(example):
        messages = [
            {"role": "system", "content": system_message},
            {"role": "user", "content": example['question']},
            {"role": "assistant", "content": example['answer']}
        ]
        prompt = tokenizer.apply_chat_template(
            messages, tokenize=False, add_generation_prompt=False
        )
        return {"prompt": prompt}

    def tokenize_dataset(example):
        tokens = tokenizer(example['prompt'], padding="max_length")
        # Set padding token labels to -100 to ignore them in loss calculation
        tokens['labels'] = [
            -100 if token == tokenizer.pad_token_id else token for token in tokens['input_ids']
        ]
        return tokens

    dataset = load_dataset("openai/gsm8k", "main")
    train_data = dataset["train"].map(format_dataset, remove_columns=['question', 'answer'])
    eval_data = dataset["test"].map(format_dataset, remove_columns=['question', 'answer'])
    print(train_data['prompt'][0])
#    train_data = train_data.map(tokenize_dataset, remove_columns=['question', 'answer', 'prompt'])
#    eval_data = eval_data.map(tokenize_dataset, remove_columns=['question', 'answer', 'prompt'])

#    lora_config = LoraConfig(r=4, lora_alpha=16, lora_dropout=0.1, bias="none")
#    model.enable_input_require_grads()
#    model = get_peft_model(model, lora_config)

    trainer = SFTTrainer(
        model=model,
        train_dataset=train_data,
        eval_dataset=eval_data,
        tokenizer=tokenizer,
        dataset_text_field="prompt",
        args=TrainingArguments(output_dir="/tmp",
                               per_device_train_batch_size=1,
                               per_device_eval_batch_size=1,
                               num_train_epochs=8,
                               logging_dir="/logs",
                               eval_strategy="epoch",
                               save_strategy="no"),
    )

#    trainer.data_collator = DataCollatorForLanguageModeling(
#        tokenizer,
#        pad_to_multiple_of=8,
#        mlm=False,
#    )

    # Train and save the model.
    trainer.train()
    trainer.save_model()
    logger.info("parallel_mode: '{0}'".format(trainer.args.parallel_mode))
    logger.info("is_model_parallel: '{0}'".format(trainer.is_model_parallel))
    logger.info("model_wrapped: '{0}'".format(trainer.model_wrapped))

In [31]:
from kubeflow.training import TrainingClient

In [33]:
from kubernetes.client import (
    V1EnvVar,
    V1EnvVarSource,
    V1SecretKeySelector
)

TrainingClient().create_job(
    job_kind="PyTorchJob",
    name="pytorch-ddp2",
    train_func=train_func,
    num_workers=1,
    num_procs_per_worker="auto",
    resources_per_worker={"gpu": 2},
    base_image="quay.io/modh/training:py311-cuda121-torch241",
    env_vars=[
        V1EnvVar(name="HF_TOKEN", value_from=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(key="HF_TOKEN", name="hf-token"))),
        V1EnvVar(name="NCCL_DEBUG", value="INFO"),
#        V1EnvVar(name="TOKENIZERS_PARALLELISM", value="false"),
    ],
)
