**프루닝 및 baseline 모델과의 속도차이 비교**

In [None]:
from huggingface_hub import login
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch, time
import torch.nn as nn

# 필수 패키지 설치
!pip install -q "torch>=2.4.0" "transformers>=4.51.3" accelerate sentencepiece

# 허깅페이스 로그인
!huggingface-cli login

# 모델 로드
model_id = "google/gemma-3-1b-pt"
tokenizer = AutoTokenizer.from_pretrained(model_id)

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    dtype=torch.float32,
    low_cpu_mem_usage=True,
    device_map="auto",
    attn_implementation="eager"
)

#레이어명 확인
for name, module in model.named_modules():
    if isinstance(module, nn.Linear):
        print(name)

model.eval()
print(model.__class__.__name__, model.config.model_type, model.config.hidden_size)


**추론 함수 및 프루닝 전 모델 추론 속도 테스트**

In [None]:
def quick_infer(model, tokenizer, prompt="Hello", max_new_tokens=80):
    model_cpu = model.to("cpu") #cpu로 런타임 유형 바꿔서 추론

    inputs = tokenizer(prompt, return_tensors="pt")
    start = time.time()
    with torch.no_grad():
        output = model_cpu.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=True, temperature=0.2, top_p=0.8)
    elapsed = time.time() - start
    text = tokenizer.decode(output[0], skip_special_tokens=True)
    print(f"\n--- Prompt ---\n{prompt}\n")
    print(f"--- Output ---\n{text}\n")
    print(f"[Elapsed] {elapsed:.2f} sec for {max_new_tokens} tokens on CPU\n")
    return elapsed

# 예시 실행 (프루닝 전 baseline 측정)
quick_infer(model, tokenizer, prompt="Is the temperature okay now?", max_new_tokens=100)

In [None]:
def find_transformer_layers(model):
    container = getattr(model, "model", None) or getattr(model, "transformer", None) or model
    return getattr(container, "layers", None)

def get_mlp(block):
    mlp = block.mlp
    return mlp.gate_proj, mlp.up_proj, mlp.down_proj

@torch.no_grad()
def structured_prune_mlp(model, keep_ratio=0.9):
    layers = find_transformer_layers(model)
    gate0, up0, down0 = get_mlp(layers[0])
    inter = down0.in_features
    new_inter = max(16, int(round(inter * keep_ratio)))
    print(f"[INFO] intermediate {inter} → {new_inter} (keep {keep_ratio*100:.0f}%)")

    for li, block in enumerate(layers):
        gate, up, down = get_mlp(block)
        score = torch.norm(up.weight, dim=1) + torch.norm(gate.weight, dim=1)
        keep_idx = torch.topk(score, k=new_inter).indices.sort()[0]

        def clone_linear(old, new_out, new_in, sel_out=None, sel_in=None):
            new = nn.Linear(new_in, new_out, bias=old.bias is not None,
                            dtype=old.weight.dtype, device=old.weight.device)
            W = old.weight.data
            if sel_out is not None: W = W[sel_out, :]
            if sel_in is not None:  W = W[:, sel_in]
            new.weight.data.copy_(W)
            if old.bias is not None:
                b = old.bias.data
                if sel_out is not None: b = b[sel_out]
                new.bias.data.copy_(b)
            return new

        new_gate = clone_linear(gate, new_inter, gate.in_features, sel_out=keep_idx)
        new_up   = clone_linear(up,   new_inter, up.in_features,   sel_out=keep_idx)
        new_down = clone_linear(down, down.out_features, new_inter, sel_in=keep_idx)

        block.mlp.gate_proj, block.mlp.up_proj, block.mlp.down_proj = new_gate, new_up, new_down
        if li % 4 == 0:
            print(f"  - layer {li}: kept {new_inter}/{inter}")

    model.config.intermediate_size = new_inter
    print("[DONE] structured pruning complete.")

In [None]:
model = model.to("cuda")
structured_prune_mlp(model, keep_ratio=0.9)

**프루닝된 모델 추론 속도 테스트**

In [None]:
quick_infer(model, tokenizer, "Is the temperature okay now?", max_new_tokens=100)

**규칙 기반 엔진**

In [None]:
def sensor_status(temp_c, hum, co2, lux):
    # 기준
    T_MIN, T_MAX = 20.0, 25.0
    H_MIN, H_MAX = 65.0, 75.0
    C_MIN, C_MAX = 800, 1000
    L_MIN, L_MAX = 15000, 70000

    def tri(v, lo, hi):
        if v < lo: return "LOW"
        if v > hi: return "HIGH"
        return "OK"

    return {
        "Temp": tri(temp_c, T_MIN, T_MAX),
        "Humidity": tri(hum, H_MIN, H_MAX),
        "CO2": tri(co2, C_MIN, C_MAX),
        "Light": tri(lux, L_MIN, L_MAX),
    }

def case_id_from_ok(status):
    ok = {k: (v=="OK") for k,v in status.items()}
    key = (
        ok["Temp"], ok["Humidity"], ok["CO2"],
        ok["Light"]
    )
    cid_map = {
        (True, True, True, True): 1,
        (True, True, True, False): 2,
        (True, True, False, True): 3,
        (True, True, False, False): 4,
        (True, False, True, True): 5,
        (True, False, True, False): 6,
        (True, False, False, True): 7,
        (True, False, False, False): 8,
        (False, True, True, True): 9,
        (False, True, True, False): 10,
        (False, True, False, True): 11,
        (False, True, False, False): 12,
        (False, False, True, True): 13,
        (False, False, True, False): 14,
        (False, False, False, True): 15,
        (False, False, False, False): 16,
    }
    return cid_map[key]

def build_instruction(temp_c, hum_pct, co2_ppm, light_lux, user_question):
    status = sensor_status(temp_c, hum_pct, co2_ppm, light_lux)
    cid = case_id_from_ok(status)
    status_line = f"Status: Temp={status['Temp']}, Humidity={status['Humidity']}, CO2={status['CO2']}, Light={status['Light']}"

    summaries = {
        1:  "All parameters are within the optimal range.",
        2:  "Only light is abnormal.",
        3:  "Only CO2 is abnormal.",
        4:  "CO2 and light are abnormal.",
        5:  "Only humidity is abnormal.",
        6:  "Humidity and light are abnormal.",
        7:  "Humidity and CO2 are abnormal.",
        8:  "Humidity, CO2, and light are abnormal.",
        9:  "Only temperature is abnormal.",
        10: "Temperature and light are abnormal.",
        11: "Temperature and CO2 are abnormal.",
        12: "Temperature, CO2, and light are abnormal.",
        13: "Temperature and humidity are abnormal.",
        14: "Temperature, humidity, and light are abnormal.",
        15: "Temperature, humidity, and CO2 are abnormal.",
        16: "All parameters are abnormal."
    }

    instruction = f"Case #{cid}: {summaries[cid]}\n{status_line}\nQ: {user_question}"

    return instruction

**프루닝된 모델을 LoRA 파인튜닝**

In [None]:
from peft import LoraConfig, get_peft_model, TaskType

model = model.to("cuda")

lora_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=16, lora_alpha=16, lora_dropout=0.05,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj",
                    "up_proj", "down_proj", "gate_proj"]
)
model = get_peft_model(model, lora_config)

In [None]:
from datasets import load_dataset

# JSONL 파일 불러오기
dataset = load_dataset("json", data_files="/content/smartfarm_qa_balanced_v5.jsonl")["train"]
dataset = dataset.train_test_split(test_size=0.2, seed=42)   # train:eval = 80:20

train_dataset = dataset["train"]
eval_dataset  = dataset["test"]

# 토크나이징 함수 (batched=True 지원)
def tokenize_fn(examples):
    texts = [f"{ins}\nA: {out} {tokenizer.eos_token}"
             for ins, out in zip(examples["instruction"], examples["output"])]
    return tokenizer(texts, truncation=True, max_length=256, padding="max_length")

train_dataset = train_dataset.map(tokenize_fn, batched=True)
eval_dataset  = eval_dataset.map(tokenize_fn, batched=True)

print(train_dataset[0])  # 토크나이즈 결과 확인

In [None]:
import wandb
import transformers
from transformers import TrainingArguments, Trainer, DataCollatorForLanguageModeling, EarlyStoppingCallback
from transformers.trainer_utils import IntervalStrategy

wandb.login()
wandb.init(project="tomato-qa", name="tomato-qa-pruning-lora")

data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

training_args = TrainingArguments(
    output_dir="./lora-tomato",
    logging_dir="./logs",
    per_device_train_batch_size=4,
    gradient_accumulation_steps=2,
    learning_rate=5e-5,
    num_train_epochs=3,
    logging_strategy="steps",
    logging_steps=1000,
    save_strategy="steps",
    save_steps=3000,
    eval_strategy=IntervalStrategy.STEPS,
    eval_steps=3000,
    report_to="wandb",
    run_name="tomato-qa-pruning-lora",
    fp16=True,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=data_collator,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

print("🔗 대시보드 URL:", wandb.run.get_url())
trainer.train()

In [None]:
user_question = "Is the enviroment okay now?"
prompt = build_instruction(
    temp_c=24, hum_pct=75.0, co2_ppm=1900, light_lux=15000,
    user_question=user_question
)
quick_infer(model, tokenizer, prompt, max_new_tokens=100)

**GGUF 변환**

In [None]:
from peft import PeftModel
import os, torch

merged_out = "/content/merged_gemma3_1b_pruned"  # 프루닝+LoRA 병합 결과 경로
os.makedirs(merged_out, exist_ok=True)

# 현재 model이 PeftModel인지 확인
print(type(model))

# LoRA 병합
model = model.merge_and_unload()  # PEFT 어댑터를 가중치에 통합
model.save_pretrained(merged_out)
tokenizer.save_pretrained(merged_out)

print("LoRA 병합 완료:", merged_out)

In [None]:
# 1. llama.cpp 설치
!git clone --depth 1 https://github.com/ggerganov/llama.cpp
%cd llama.cpp

# 2. CMake로 빌드
!cmake -B build
!cmake --build build -j4

# 3. 변환 스크립트에 필요한 라이브러리 설치
!pip install mistral_common sentencepiece

In [None]:
# 4. Hugging Face 모델 -> GGUF 변환 (EOS/EOG 메타데이터 강제 지정)
!python3 convert_hf_to_gguf.py \
  --outfile /content/gemma3_1b-pruned-f16.gguf \
  --outtype f16 \
  --metadata tokenizer.ggml.eos_token_id=1 \
  --metadata special_eog_token_ids="1,106" \
  /content/merged_gemma3_1b_pruned

**4비트 양자화**

In [None]:
# Q4_K_M 양자화 실행
!/content/llama.cpp/build/bin/llama-quantize \
  /content/gemma3_1b-pruned-f16.gguf \
  /content/gemma3_1b-pruned-q4_k_m.gguf \
  q4_k_m

# 산출물 확인
!ls -lh /content/*.gguf

**Q5_K_M 양자화**

In [None]:
# Q5_K_M 양자화 실행
!/content/llama.cpp/build/bin/llama-quantize \
  /content/gemma3_1b-pruned-f16.gguf \
  /content/gemma3_1b-pruned-q5_k_m.gguf \
  q5_k_m

# 변환된 모델 파일 확인
!ls -lh /content/*.gguf

**래퍼 실햄 함수**

In [None]:
import subprocess

def run_with_preprocessing(user_question,
                           temp_c=22, hum_pct=10, co2_ppm=10, light_lux=10,
                           model_path="/content/gemma3_1b-pruned-q5_k_m.gguf"):

    # 전처리
    prompt = build_instruction(temp_c, hum_pct, co2_ppm, light_lux, user_question)
    prompt = f"{prompt}\nA:"

    # llama.cpp 실행
    result = subprocess.run([
        "./build/bin/llama-cli",
        "-m", model_path,
        "-p", prompt,
        "-n", "256",
        "--temp", "0.2",
        "--top-p", "0.9",
        "--top-k", "40",
        "--repeat-penalty", "1.2",
        "--threads", "4",
        "-c", "512"
    ], capture_output=True, text=True)

    raw_out = result.stdout
    err_out = result.stderr

    print("=== Raw Output ===")
    print(raw_out)

    # A: 이후 부분만 추출
    if "A:" in raw_out:
        answer = raw_out.split("A:", 1)[-1]
    else:
        answer = raw_out

    # EOS 잘라내기
    answer = answer.split("</s>")[0].strip()

    print("\n=== Model Output ===")
    print(answer)


In [None]:
# 모델 추론 테스트
while True:
    question = input("\n❓ 질문을 입력하세요 (종료하려면 exit): ")
    if question.lower() == "exit":
        break
    run_with_preprocessing(question) # 라즈베리파이에서는 실시간 센싱값 받아서 매개변수에 넘겨줘야 함

**어댑터 저장**

In [None]:
# LoRA 어댑터 저장
model.save_pretrained("/content/lora_adapter")
tokenizer.save_pretrained("/content/lora_adapter")