In [1]:
import argparse
import resource
import transformers
import torch
import random

import numpy as np

from transformers import AutoModelForCausalLM, AutoTokenizer
from datasets import load_dataset
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def measure_flops(model, sequence_length=50):
    """
    Measure FLOPs and parameter count using ptflops.
    Note: For quantized models, reported FLOPs might not fully reflect low-precision ops.
    """
    from ptflops import get_model_complexity_info

    dummy_input_shape = (1, sequence_length)  
    macs, params = get_model_complexity_info(
        model, dummy_input_shape, as_strings=True,
        print_per_layer_stat=True, verbose=True
    )
    print("=== FLOPs and Parameter Count ===")
    print("MACs:", macs)
    print("Params:", params)


def measure_memory(model, tokenizer, prompt="Test prompt"):
    """
    Measure memory usage during a forward pass using torch.profiler.
    """
    import torch.profiler

    # Create a dummy input
    inputs = tokenizer(prompt, return_tensors="pt")
    inputs = {key: value.to("cpu") for key, value in inputs.items()}

    with torch.profiler.profile(
        activities=[torch.profiler.ProfilerActivity.CPU],
        profile_memory=True,
        record_shapes=True,
    ) as prof:
        model(**inputs)

    print("=== Memory Usage (sorted by CPU memory consumption) ===")
    print(prof.key_averages().table(sort_by="cpu_memory_usage", row_limit=10))


def nshot_chats(nshot_data: list, n: int, question: str) -> dict:

    def question_prompt(s):
        return f'Question: {s}'

    def answer_prompt(s):
        return f'Answer: {s}'

    chats = []

    random.seed(42)
    for qna in random.sample(nshot_data, n):
        chats.append(
            {"role": "user", "content": question_prompt(qna["question"])})
        chats.append(
            {"role": "assistant", "content": answer_prompt(qna["answer"])})

    chats.append({"role": "user", "content": question_prompt(question)+" Let's think step by step. At the end, you MUST finish the sentence with '####' followed by the answer as an integer."})

    return chats


def extract_ans_from_response(answer: str, eos=None):
    if eos:
        answer = answer.split(eos)[0].strip()

    answer = answer.split('####')[-1].strip()

    for remove_char in [',', '$', '%', 'g']:
        answer = answer.replace(remove_char, '')

    try:
        return int(answer)
    except ValueError:
        return answer

In [None]:
args = argparse.Namespace(
    model="meta-llama/Llama-3.2-1B-Instruct",
    token="",
    device="cpu",
    quantize=False,
    eval=["gsm8k"],
)

model = AutoModelForCausalLM.from_pretrained(args.model, token=args.token).to(args.device)
if args.quantize:
    model = torch.quantization.quantize_dynamic(model, {torch.nn.Linear}, dtype=torch.qint8)

tokenizer = AutoTokenizer.from_pretrained(args.model, token=args.token, return_dict=True)

generator = transformers.pipeline("text-generation", model=model, tokenizer=tokenizer, device=args.device, max_new_tokens=512)

def get_response(chats):
    # chats = tokenizer.apply_chat_template(chats, tokenize=False, add_generation_prompt=True)
    gen_text = generator(chats)  # First return sequence
    return gen_text[0]["generated_text"][-1]["content"]


def extract_answer(answer: str, eos=None):
    if eos:
        answer = answer.split(eos)[0].strip()

    answer = answer.split('####')[-1].strip()
    # Removing possible non-numeric characters generated by the LLM
    for remove_char in [',', '$', '%', 'g']:
        answer = answer.replace(remove_char, '')

    try:
        return int(answer)
    except ValueError:
        return answer


Device set to use cpu


In [11]:
# Measure FLOPs and parameter count
# print(f"Model {args.model} has FLOPs of {measure_flops(model)}")
NUMBER_SHOT = 5
correct = 0

if "gsm8k" in args.eval:
    train_data = list(load_dataset("gsm8k", "main", split="train[:20]"))
    test_data = list(load_dataset("gsm8k", "main", split="test"))

    for i in tqdm(range((len(test_data)))):
        messages = nshot_chats(nshot_data=train_data, n=NUMBER_SHOT, question=test_data[i]['question'])
        response = get_response(messages)

        if extract_answer(response) == extract_answer(test_data[i]['answer']):
            correct += 1
    print(f"Accuracy of {args.model}: {correct/len(test_data)}")

  0%|          | 0/1319 [00:00<?, ?it/s]Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
  0%|          | 1/1319 [00:11<4:09:24, 11.35s/it]Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Anwer after #### $20
$20
Anwer after #### 18
18


  0%|          | 2/1319 [00:19<3:33:38,  9.73s/it]Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Anwer after #### 3
3
Anwer after #### 3
3


  0%|          | 3/1319 [00:36<4:42:33, 12.88s/it]Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Anwer after #### 70000
70000
Anwer after #### 70000
70000


  0%|          | 4/1319 [00:47<4:24:17, 12.06s/it]Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Anwer after #### 540
540
Anwer after #### 540
540


  0%|          | 5/1319 [00:56<3:57:24, 10.84s/it]Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Anwer after #### 140
140
Anwer after #### 20
20


  0%|          | 6/1319 [01:07<3:58:34, 10.90s/it]Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Anwer after #### 40
40
Anwer after #### 64
64


  1%|          | 7/1319 [01:18<3:58:40, 10.91s/it]Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.


Anwer after #### 260
260
Anwer after #### 260
260


  1%|          | 7/1319 [01:35<4:57:54, 13.62s/it]


KeyboardInterrupt: 