In [None]:
# !pip install "ray==2.30.0"
!pip install "ray[serve]==2.30.0" peft accelerate requests diffusers transformers bitsandbytes fastapi==0.96  "datasets==2.16.1"

In [None]:
import ray

In [None]:
ray.init(
    address="ray://ray-cluster-kuberay-head-svc:10001",
    runtime_env={
        "pip": [
            "IPython",
            "peft",
            "boto3==1.26",
            "botocore==1.29", 
            "datasets==2.16.1",
            "fastapi==0.96",
            "accelerate>=0.16.0",
            "transformers>=4.26.0",
            "bitsandbytes==0.43.1",
            "numpy<1.24",  # remove when mlflow updates beyond 2.2
            "torch",
        ]
    }
)

In [None]:
from pprint import pprint

pprint(ray.cluster_resources())

In [None]:
model_id = "google/gemma-7b"
revision = "float16"
access_token = "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"

In [None]:
serve.start(detached=False, http_options={'host':"0.0.0.0"})

In [None]:
import pandas as pd

from ray import serve
from starlette.requests import Request


@serve.deployment(ray_actor_options={"num_gpus": 1})
class PredictDeployment:
    def __init__(self, model_id: str, revision: str = None):
        from transformers import AutoModelForCausalLM, AutoTokenizer
        import torch

        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            revision=revision,
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True,
            device_map="auto",  # automatically makes use of all GPUs available to the Actor
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)

    def generate(self, text: str) -> pd.DataFrame:
        input_ids = self.tokenizer(text, return_tensors="pt").input_ids.to(
            self.model.device
        )

        gen_tokens = self.model.generate(
            input_ids,
            do_sample=True,
            temperature=0.9,
            max_length=100,
        )
        return pd.DataFrame(
            self.tokenizer.batch_decode(gen_tokens), columns=["responses"]
        )

    async def __call__(self, http_request: Request) -> str:
        json_request: str = await http_request.json()
        prompts = []
        for prompt in json_request:
            text = prompt["text"]
            if isinstance(text, list):
                prompts.extend(text)
            else:
                prompts.append(text)
        return self.generate(prompts)

In [None]:
deployment = PredictDeployment.bind(model_id=model_id, revision=revision)
serve.run(deployment)

In [None]:
import requests

prompt = (
    "to create a Google Kubernetes Engine cluster"
)

sample_input = {"text": prompt}

output = requests.post("http://ray-cluster-kuberay-head-svc:8000/", json=[sample_input]).json()
print(output)

In [None]:
serve.shutdown()

In [None]:
from datasets import load_dataset

dataset = load_dataset("databricks/databricks-dolly-15k", split='train')

# dataset_shuffled = dataset.shuffle(seed=42)
# dataset = dataset_shuffled.select(range(250))

In [None]:
import ray
ray_dataset = ray.data.from_items(dataset)
# ray_dataset = {
#     "train": ray.data.from_huggingface(dataset),
# }
ray_dataset
train_dataset, validation_dataset = ray_dataset.train_test_split(test_size=50)

In [None]:
use_gpu = True
num_workers = 1
batch_size = 16

In [None]:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True, token=access_token)

def format_dolly(sample):
    instruction = f"### Human: {sample['instruction']}"
    context = f"{sample['context']}" if len(sample["context"]) > 0 else None
    response = f"### Assistant\n{sample['response']}"
    # join all the parts together
    prompt = "\n\n".join([i for i in [instruction, context, response] if i is not None])
    return prompt

# template dataset to add prompt to each sample
def template_dataset(sample):
    sample["text"] = f"{format_dolly(sample)}{tokenizer.eos_token}"
    return sample

dataset = dataset.map(template_dataset, remove_columns=list(dataset.features))

In [None]:
import ray.train
from ray.train.huggingface.transformers import prepare_trainer, RayTrainReportCallback

def train_func(config):
    import torch
    
    print(f"Is CUDA available: {torch.cuda.is_available()}")
    
    from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, Trainer, TrainingArguments, DataCollatorForLanguageModeling
    # from datasets import load_dataset
    import torch
    from peft import LoraConfig, get_peft_model
    
    datasets = load_dataset("databricks/databricks-dolly-15k", split='train').train_test_split(0.1)
    train_dataset = datasets['train'].select(range(1000))
    test_dataset = datasets['test'].select(range(100))
    tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True, token=access_token)
    
    def format_dolly(sample):
        instruction = f"### Human: {sample['instruction']}"
        context = f"{sample['context']}" if len(sample["context"]) > 0 else None
        response = f"### Assistant\n{sample['response']}"
        # join all the parts together
        prompt = "\n\n".join([i for i in [instruction, context, response] if i is not None])
        return prompt

    # template dataset to add prompt to each sample
    def template_dataset(sample):
        sample["text"] = f"{format_dolly(sample)}{tokenizer.eos_token}"
        return sample

    train_dataset = train_dataset.map(template_dataset, remove_columns=list(train_dataset.features))
    test_dataset = test_dataset.map(template_dataset, remove_columns=list(test_dataset.features))

    tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True, token=access_token)
        
    def tokenize_function(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True)
        
    small_train_dataset = (
        train_dataset.select(range(100)).map(tokenize_function, batched=True)
    )
    small_eval_dataset = (
        test_dataset.select(range(10)).map(tokenize_function, batched=True)
    )

	# create LoRA config
    lora_config = LoraConfig(
        r=8,
        lora_alpha=32,
        lora_dropout=0.1,
        bias="none",
        task_type="CAUSAL_LM",
    )

    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
    )

    model = AutoModelForCausalLM.from_pretrained(
        model_id,
        revision=revision,
        torch_dtype=torch.float16,
        # low_cpu_mem_usage=True,
        device_map="auto",  # automatically makes use of all GPUs available to the Actor
        token=access_token,
        quantization_config=quantization_config
    )
        
    # add LoRA adapter layer to the base model
    # model = prepare_model_for_kbit_training(model)
    model = get_peft_model(model, lora_config)

    # max_steps_per_epoch = datasets["train"].count() // (batch_size * num_workers)
    # max_steps_per_epoch = train_dataset.count() //  (batch_size * num_workers)

    training_args = TrainingArguments(
        # name,
        output_dir="checkpoints",
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        learning_rate=config.get("learning_rate", 2e-5),
        num_train_epochs=config.get("epochs", 2),
        weight_decay=config.get("weight_decay", 0.01),
        push_to_hub=False,
        # max_steps=max_steps_per_epoch * config.get("epochs", 2),
        disable_tqdm=True,  # declutter the output a little
        no_cuda=not use_gpu,  # you need to explicitly set no_cuda if you want CPUs
        report_to="none",
    )

    # def compute_metrics(eval_pred):
    #     predictions, labels = eval_pred
    #     if task != "stsb":
    #         predictions = np.argmax(predictions, axis=1)
    #     else:
    #         predictions = predictions[:, 0]
    #     return metric.compute(predictions=predictions, references=labels)

    data_collator = DataCollatorForLanguageModeling(tokenizer, mlm=False)
        
    trainer = Trainer(
        model=model,
        tokenizer=tokenizer,
        args=training_args,
        train_dataset=small_train_dataset,
        eval_dataset=small_eval_dataset,
        # compute_metrics=compute_metrics,
        data_collator=data_collator,
        # max_seq_length=max_seq_length,
        # compute_metrics=compute_metrics,
        # preprocess_logits_for_metrics=preprocess_logits_for_metrics,
        # peft_config=lora_config,
        # formatting_func=formatting_func
    )

    callback = ray.train.huggingface.transformers.RayTrainReportCallback()
    trainer.add_callback(callback)
    trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
    trainer.train()

In [None]:
from ray.train.torch import TorchTrainer
from ray.train import RunConfig, ScalingConfig, CheckpointConfig

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(
        num_workers=num_workers, 
        use_gpu=use_gpu
    ),
    # datasets={
    #     "train": train_dataset,
    #     "eval": validation_dataset,
    # },
    run_config=RunConfig(
        storage_path="gs://ray-llm-bucket/",
        checkpoint_config=CheckpointConfig(
            num_to_keep=1,
            checkpoint_score_attribute="eval_loss",
            checkpoint_score_order="min",
        ),
        # failure_config=FailureConfig(
        #     max_failures=-1,
        # ),
    ),
)

result = trainer.fit()

In [None]:
import pandas as pd

from ray import serve
from starlette.requests import Request

import pandas as pd

from ray import serve
from starlette.requests import Request


@serve.deployment(ray_actor_options={"num_gpus": 1})
class PredictDeployment:
    def __init__(self, model_id: str, revision: str = None):
        from transformers import AutoModelForCausalLM, AutoTokenizer
        import torch

        self.model = AutoModelForCausalLM.from_pretrained(
            "gs://ray-llm-bucket/TorchTrainer_2024-07-10_20-12-26/TorchTrainer_bd37f_00000_0_2024-07-10_13-12-32/checkpoint_000002",
            revision=revision,
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True,
            device_map="auto",  # automatically makes use of all GPUs available to the Actor
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)

    def generate(self, text: str) -> pd.DataFrame:
        input_ids = self.tokenizer(text, return_tensors="pt").input_ids.to(
            self.model.device
        )

        gen_tokens = self.model.generate(
            input_ids,
            do_sample=True,
            temperature=0.9,
            max_length=100,
        )
        return pd.DataFrame(
            self.tokenizer.batch_decode(gen_tokens), columns=["responses"]
        )

    async def __call__(self, http_request: Request) -> str:
        json_request: str = await http_request.json()
        prompts = []
        for prompt in json_request:
            text = prompt["text"]
            if isinstance(text, list):
                prompts.extend(text)
            else:
                prompts.append(text)
        return self.generate(prompts)