In [0]:
%pip install --upgrade mlflow
%pip install --upgrade databricks-sdk
%pip install --upgrade accelerate
%pip install --upgrade bitsandbytes
%pip install --upgrade nvidia-ml-py3
%pip install --upgrade transformers

dbutils.library.restartPython()

In [0]:
from utils import print_gpu_utilization, print_available_vram, test_load_gpu

In [0]:
print_gpu_utilization()

In [0]:
print_available_vram()

### Log in to HF due to gated model 

In [0]:
from huggingface_hub import login

login(token=dbutils.secrets.get('william_smith_secrets', 'HF_KEY'))

### Load in 8 bit due to v100 having 16 GB of VRAM 

In [0]:
import torch
import torch.distributed as dist
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import transformers

model_id = "meta-llama/Meta-Llama-3-8B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_id, padding_side="left")

quantization_config = BitsAndBytesConfig(load_in_8bit=True)

# Set pad_token 
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    quantization_config=quantization_config,
    device_map="cuda",
)

In [0]:
print_gpu_utilization()

```
def pipeline(
    task: str = None,
    model: Optional[Union[str, "PreTrainedModel", "TFPreTrainedModel"]] = None,
    config: Optional[Union[str, PretrainedConfig]] = None,
    tokenizer: Optional[Union[str, PreTrainedTokenizer, "PreTrainedTokenizerFast"]] = None,
    feature_extractor: Optional[Union[str, PreTrainedFeatureExtractor]] = None,
    image_processor: Optional[Union[str, BaseImageProcessor]] = None,
    processor: Optional[Union[str, ProcessorMixin]] = None,
    framework: Optional[str] = None,
    revision: Optional[str] = None,
    use_fast: bool = True,
    token: Optional[Union[str, bool]] = None,
    device: Optional[Union[int, str, "torch.device"]] = None,
    device_map=None,
    torch_dtype=None,
    trust_remote_code: Optional[bool] = None,
    model_kwargs: Dict[str, Any] = None,
    pipeline_class: Optional[Any] = None,
    **kwargs,
)
```

In [0]:
from transformers import GenerationConfig

gen_config = GenerationConfig(
        max_length=20,
        do_sample=False)

pipeline = transformers.pipeline(
  task = "text-generation", 
  model=model, 
  tokenizer=tokenizer,
  model_kwargs={
    # "torch_dtype": torch.bfloat16,
    "quantization_config": BitsAndBytesConfig(load_in_8bit=True)
  }, 
  generation_config=gen_config,
  device_map=f"{model.device}",
  return_full_text=False,
)

In [0]:
%sh 

nvidia-smi

In [0]:
with torch.no_grad():
  print(pipeline("What is the capital of France"))

In [0]:
print_gpu_utilization()


### Load the data

In [0]:
medquad_df = spark.read.table("will_smith.datasets.medquad_qna")
display(medquad_df)

In [0]:
qtype_list = medquad_df.select("qtype").distinct().rdd.map(lambda row: row.qtype).collect()
print(qtype_list)

## Generate inference 

In [0]:
first_question_text = medquad_df.select("question").limit(2).collect()[1]["question"]

In [0]:
user_input = f"Classify the following question: {first_question_text} as one of the categories: {qtype_list} without explanation. Return only the category name and output less than 15 tokens."
print(user_input)

In [0]:
with torch.no_grad():
  display(pipeline(user_input))

In [0]:
import timeit

start_time = timeit.default_timer()

with torch.no_grad():
    outputs = pipeline(user_input)

elapsed_single_batch = timeit.default_timer() - start_time
print(f"Time taken: {elapsed_single_batch} seconds")

In [0]:
display(outputs)

## Continous batch

In [0]:
batch_questions = [row.question for row in medquad_df.select("question").limit(50).collect()]

In [0]:
updated_batch_questions = [f"Classify the following question: {question} as one of the categories: {qtype_list} without explanation. Return only the category name and output less than 15 tokens." for question in batch_questions]

In [0]:
def micro_batch_inference(pipeline, input_list, batch_size: int = 64):

  import timeit
  start_time = timeit.default_timer()
  output_list = []

  for i in range(0, len(input_list), batch_size):
    
    print(f"Generating inferences on batch {i} to {i+batch_size - 1}...")

    if i + batch_size > len(input_list):
      input  =  input_list[i:]
    else:  
      input = input_list[i:i+batch_size]

    with torch.no_grad():
      generated_texts = pipeline(input, pad_token_id=pipeline.tokenizer.eos_token_id)

    for item in generated_texts:
      output_list.append(item)

    print(f"Items completed: {len(output_list)}")

    # Free memory
    del generated_texts  # Delete the outputs tensor
    torch.cuda.empty_cache()  # Clear the GPU memory cache

  batch_generation_elapsed = timeit.default_timer() - start_time
  print(f"Time taken: {batch_generation_elapsed} seconds")

  return output_list, batch_generation_elapsed

In [0]:
output_list, batch_generation_elapsed = micro_batch_inference(pipeline, updated_batch_questions, 10)

In [0]:
(len(output_list))

In [0]:
for item in output_list:
  print(item)

## Free up resources

In [0]:
print_gpu_utilization()

In [0]:
%restart_python