### ***Install required package***

In [None]:
! pip install ultralytics
! pip install yt-dlp
! pip install openai-whisper
! pip install ollama
! curl -fsSL https://ollama.com/install.sh | sh
! ollama pull openbmb/minicpm-o2.6:8b

### ***list up video file path manually***

In [None]:
import os

test_video_dir = os.path.join("data","20250913", "test")
video_files = [ os.path.join(test_video_dir, s) for s in ["abnormal1.mp4", "abnormal2.mp4", "normal1.mp4", "normal2.mp4", "normal3.mp4", "normal4.mp4"]]

### ***Download videos in url list and add to video file path***

In [None]:
import os
import yt_dlp as ydl

def download_video(url, label="normal", out_dir="videos"):
    # 라벨 하위 폴더 생성
    save_dir = os.path.join(out_dir, label)
    os.makedirs(save_dir, exist_ok=True)

    ydl_opts = {
        'format': 'mp4',
        'outtmpl': os.path.join(save_dir, '%(title)s.%(ext)s'),
        'noplaylist': True,
        'hls_prefer_native': True,  # HLS fragment 안정적으로 다운로드
        'retries': 10,
        'quiet': False
    }

    with ydl.YoutubeDL(ydl_opts) as y:
        info = y.extract_info(url, download=True)
        filename = y.prepare_filename(info)
        return filename, label

# URL + 라벨 매핑
urls_labels = [
    ('https://url/to/your/video1', 'normal'),
    ('https://url/to/your/video2', 'normal2'),
]

video_files = [download_video(url, label) for url, label in urls_labels]

print("다운로드 완료:")
for v, lbl in video_files:
    print(f"{v} -> {lbl}")


### ***Parse label from directory path***
- sub folder name contains its label
- video_files_with_label is a list of dict
- each dict consists of its file path and label

In [None]:
import glob
import os

video_files = []
for root, dirs, files in os.walk(test_video_dir):
   for f in files:
      video_files.append(os.path.join(root ,f))

video_files_with_label = []
for path in video_files:
    label = os.path.basename(os.path.dirname(path))
    video_files_with_label.append({'path':path, 'label':label})

print("찾은 동영상 파일:")
for video in video_files_with_label:
    for k, v in video.items():
      print(k, v)
frame_interval = 1


### ***Download latest classification model***

In [None]:
! wget https://raw.githubusercontent.com/suriseven/abnormality_classifier/main/20250920_epoch30.pt

### ***Load downloaded model***

In [None]:
# You are required to install ultralytics first
from ultralytics import YOLO

# Load the attached classifier
model = YOLO(r"runs\classify\train17\weights\epoch30.pt")

# Frame sampling interval
frame_interval = 1  # process every 10th frame

### ***Check hash value for downloaded model***
- In this example, the hash value should be
    ```
    9547daaf222158b0672a0adab22242cb
    ```

In [None]:
! md5sum /content/20250920_epoch30.pt

### ***Do inference***

In [None]:
import cv2
from collections import Counter

predictions = []  # store (video, frame_id, pred_class, true_class)

for v in video_files_with_label:
    video = v['path']
    true_class = v['label']
    cap = cv2.VideoCapture(video)
    frame_id = 0

    # ground truth from filename
    # true_class = "abnormal" if "abnormal" in video.lower() else "normal"

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_id % frame_interval == 0:
            # run inference
            results = model.predict(frame, imgsz=1088, verbose=False)

            # class names
            names = results[0].names

            # softmax probabilities
            probs = results[0].probs.data.tolist()  # list of floats
            prob_dict = {names[i]: float(p) for i, p in enumerate(probs)}

            # top-1 prediction
            pred_class = names[results[0].probs.top1]

            predictions.append((video, frame_id, pred_class, true_class, prob_dict))
            print((video, frame_id, pred_class, true_class, prob_dict))

        frame_id += 1

    cap.release()

# 3. Frame-level accuracy
correct = sum(1 for (_, _, pred, true, _) in predictions if pred == true)
total = len(predictions)
accuracy = correct / total if total > 0 else 0

print(f"\nFrame-level results")
print(f"Total frames tested: {total}")
print(f"Correct predictions: {correct}")
print(f"Accuracy: {accuracy:.2%}")

# Print sample softmax values
print("\nSample predictions with softmax:")
for v, f, pred, true, prob_dict in predictions[:5]:
    print(f"{v} frame {f}: pred={pred}, true={true}, probs={prob_dict}")

# 4. Video-level (majority vote)
video_results = {}
for video in video_files:
    preds = [pred for (v, _, pred, _, _) in predictions if v == video]
    true_class = "abnormal" if "abnormal" in video.lower() else "normal"
    majority_pred = Counter(preds).most_common(1)[0][0]
    video_results[video] = (majority_pred, true_class)

correct_videos = sum(1 for v, (pred, true) in video_results.items() if pred == true)
accuracy_video = correct_videos / len(video_files)

print("\nVideo-level results:")
for v, (pred, true) in video_results.items():
    print(f"{v}: predicted={pred}, true={true}")
print(f"Video-level accuracy: {accuracy_video:.2%}")

### ***Get representive frames for each video using K-means***

In [None]:
import cv2
import os
import numpy as np
from sklearn.cluster import KMeans

# Number of K-means cluster
n_clusters = 3  
output_dir = r"video_frames"
os.makedirs(output_dir, exist_ok=True)


for video_dict in video_files_with_label:
    video_path = video_dict['path']
    video_label = video_dict['label']
    
    video_preds = [(f, pred, true, prob) for (v, f, pred, true, prob) in predictions if v == video_path]
    if len(video_preds) == 0:
        continue

    abnormal_probs = np.array([p["abnormal"] for (_, _, _, p) in video_preds]).reshape(-1, 1)

    # K-means
    kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    kmeans.fit(abnormal_probs)
    labels = kmeans.labels_
    centers = kmeans.cluster_centers_

    representatives = []

    # Capture for each representive frame
    cap = cv2.VideoCapture(video_path)

    for cluster_id in range(n_clusters):
        cluster_indices = np.where(labels == cluster_id)[0]
        distances = np.abs(abnormal_probs[cluster_indices] - centers[cluster_id])
        closest_idx = cluster_indices[np.argmin(distances)]

        frame_id, pred, true, prob_dict = video_preds[closest_idx]
        
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
        ret, frame = cap.read()
        if ret:
            video_name = os.path.basename(video_path).replace(".mp4","").replace(" ", "_")
            frame_filename = f"{video_name}_cluster{cluster_id}_frame{frame_id}.jpg"
            frame_path = os.path.join(output_dir, video_label, video_name, frame_filename)
            os.makedirs(os.path.join(output_dir, video_label, video_name), exist_ok=True)
            cv2.imwrite(frame_path, frame)
        else:
            frame_path = None
            print(f"Failed to read frame {frame_id} from {video_path}")

        representatives.append({
            "cluster": cluster_id,
            "center": float(centers[cluster_id][0]),
            "frame_id": frame_id,
            "abnormal_prob": float(prob_dict["abnormal"]),
            "pred": pred,
            "true": true,
            "frame_path": frame_path
        })

    cap.release()

    # Add representive frame to dict
    video_dict["representative_frames"] = representatives

### ***Extract text from video using whisper STT model***

In [None]:
import whisper

stt_model = whisper.load_model("base")

for v in video_files_with_label:
    video_path = v['path']
    label = v['label']
    print(video_path)
    result = stt_model.transcribe(video_path)
    text = result["text"]
    print(f"Video: {video_path}, Label: {label}")
    print(f"Transcribed Text:\n{text}\n")
    v['text'] = text


### ***Ask VLM to make a decision based on the given results***

In [None]:
import ollama
import re

# video_files_with_label: representive frame information + extracted text using STT
for i, video_dict in enumerate(video_files_with_label):
    video_label = video_dict['label']
    video_text = video_dict['text']

    for j, rep in enumerate(video_dict["representative_frames"]):
      while True:
        frame_path = rep['frame_path']
        frame_pred = rep['pred']
        frame_prob = rep['abnormal_prob']
        
        prompt = f"""{frame_path}
For the image attached, our classification model has recognized it as {frame_pred} with probability {frame_prob:.4f}.
The image is one frame of the video.
The voice inside video is transcribed as {video_text}
In this case, describe what you can see in that picutre and classify yourself if the content of video is normal or abnormal        
Your response should end with probability score as 0.0 ~ 1.0
If it is normal then the confidence score is near 0.0
If it is abnormal then the confidence score is near 1.0
Please make sure you should create your own text and evaluate the rate number
Remember that your response should end with float number wrapped with two brackets for example
[[0.1]]
[[0.3]]
[[0.86]]
        """

        print(frame_path)
        
        response = ollama.chat(model="openbmb/minicpm-o2.6:8b", messages=[{'role': 'user', 'content': prompt}] )

        text = response['message']['content']

        print(text)

        rating_text = text[-10:]

        numbers = re.findall(r"\[\[([0-9]*\.?[0-9]+)\]\]", text)
        numbers = [float(n) for n in numbers]

        # save ollama response and rating into dict
        video_files_with_label[i]["representative_frames"][j]['ollama_response'] = text
        try:
            video_files_with_label[i]["representative_frames"][j]['ollama_rating'] = numbers[0]
            break
        except:
            continue

In [None]:
# 확인
for v in video_files_with_label:
    print(f"\nVideo: {v['path']}, Label: {v['label']}, Text: {v['text']}")
    for r in v.get("representative_frames", []):
        print(f"[Cluster {r['cluster']}]\n" 
              f"center={r['center']:.4f}, frame_sequence={r['frame_id']}\n"
              f"abnormal_prob={r['abnormal_prob']:.4f}, pred={r['pred']}, true={r['true']}\n"
              f"frame_path={r['frame_path']}\n"
              f"ollama_response={r['ollama_response']}\n"
              f"ollama_rating={r['ollama_rating']}\n")