#!pip install av <-- 설치하고 세션 재시작

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from transformers import pipeline
from transformers import VivitImageProcessor

image_processor = VivitImageProcessor.from_pretrained("kkumtori/vivit-b-16x2-kinetics400-finetuned-0505-mediapipe")

video_cls = pipeline(model="kkumtori/vivit-b-16x2-kinetics400-finetuned-0505-mediapipe")
video_cls.image_processor = image_processor

In [None]:
import os
from collections import defaultdict
import numpy as np
import av
import torch
from transformers import VivitImageProcessor, VivitForVideoClassification

def read_video_pyav(container, indices):
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

def sample_frame_indices(clip_len, frame_sample_rate, seg_len):
    converted_len = int(clip_len * frame_sample_rate)
    end_idx = np.random.randint(converted_len, seg_len)
    start_idx = end_idx - converted_len
    indices = np.linspace(start_idx, end_idx, num=clip_len)
    indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64)
    return indices

def process_video_files(folder_path):
    image_processor = video_cls.image_processor
    model = video_cls.model
    feature_dict = defaultdict(list)

    # 폴더명을 클래스 이름으로 사용
    for class_label in os.listdir(folder_path):
        class_path = os.path.join(folder_path, class_label)
        if not os.path.isdir(class_path):
            continue

        # 클래스 별 폴더 내 모든 파일을 탐색
        for filename in os.listdir(class_path):
            if filename.endswith(".mp4"):
                file_path = os.path.join(class_path, filename)
                container = av.open(file_path)

                # 32 프레임 샘플링
                indices = sample_frame_indices(clip_len=32, frame_sample_rate=1, seg_len=container.streams.video[0].frames)
                video = read_video_pyav(container=container, indices=indices)

                # 비디오를 모델에 맞게 준비
                inputs = image_processor(list(video), return_tensors="pt")

                # 모델을 통한 전파
                with torch.no_grad():
                    outputs = model(**inputs, output_hidden_states=True)
                    logits = outputs.logits
                    hidden_states = outputs.hidden_states
                    last_hidden = hidden_states[-1]

                # 클래스별로 마지막 히든 레이어의 특징 저장
                predictions = torch.argmax(logits, dim=-1)
                for idx, prediction in enumerate(predictions):
                    feature_dict[class_label].append(last_hidden[idx].numpy())

    return feature_dict

# 폴더 경로 설정 및 함수 호출
folder_path = '/content/drive/MyDrive/기컴비_텀프/data/train_dataset/mediapipe/train'
all_features = process_video_files(folder_path)
print(all_features)


In [None]:
import numpy as np

def max_pooling_features(feature_dict):
    max_pooled_features = {}
    for class_label, features_list in feature_dict.items():
        features_array = np.array(features_list)
        max_pooled_feature = np.max(features_array, axis=0)
        max_pooled_features[class_label] = max_pooled_feature
    return max_pooled_features

# process_video_files()의 output: feature_dict
# max_pooling_features()의 input: feature_dict
max_pooled_features = max_pooling_features(all_features)
print(max_pooled_features)

In [None]:
print(len(max_pooled_features))

In [None]:
import pickle

# 파일로 저장
with open("/content/drive/MyDrive/max_pooled_features.pkl", 'wb') as f:
    pickle.dump(max_pooled_features, f)

with open("/content/drive/MyDrive/all_features.pkl", 'wb') as f:
    pickle.dump(all_features, f)

# 파일 불러오기
# with open("/content/drive/MyDrive/max_pooled_features.pkl", 'rb') as f:
#     loaded_dict = pickle.load(f)