<a href="https://colab.research.google.com/github/mazenhider/-/blob/main/classication_video_blip.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ========================================
# STEP 0: تثبيت المكتبات المطلوبة
# ========================================
!pip install opencv-python tqdm gdown --quiet

import os
import shutil
from tqdm import tqdm

# ========================================
# STEP 1: ربط Google Drive
# ========================================
from google.colab import drive
drive.mount('/content/drive')

# مسار التخزين داخل الدرايف (يمكن تغييره لو تحب)
base_drive_path = "/content/drive/MyDrive/UCF101_project"
os.makedirs(base_drive_path, exist_ok=True)

zip_path = os.path.join(base_drive_path, "UCF-101.zip")

# رابط النسخة المضغوطة (~6.5GB)
hf_url = "https://huggingface.co/datasets/quchenyuan/UCF101-ZIP/resolve/main/UCF-101.zip"

# تحميل الملف (لو مش موجود مسبقاً في الدرايف)
if not os.path.exists(zip_path):
    print("📥 تحميل UCF101 ZIP ...")
    !gdown {hf_url} -O {zip_path}

# ========================================
# STEP 2: فك ضغط ZIP داخل الدرايف
# ========================================
dataset_folder = os.path.join(base_drive_path, "UCF-101")
if not os.path.exists(dataset_folder):
    print("📂 فك ضغط UCF101 ...")
    !unzip -q {zip_path} -d {base_drive_path}

print(f"✅ تم فك الضغط، المجلد موجود في: {dataset_folder}")

# ========================================
# STEP 3: اختيار الفئات المطلوبة
# ========================================
selected_classes = ["BasketballDunk", "HorseRiding", "PlayingGuitar", "PushUps"]
subset_folder = os.path.join(base_drive_path, "videos_subset")
os.makedirs(subset_folder, exist_ok=True)

# نسخ الفيديوهات المختارة إلى مجلد جديد
for cls in selected_classes:
    src_folder = os.path.join(dataset_folder, cls)
    dst_folder = os.path.join(subset_folder, cls)
    os.makedirs(dst_folder, exist_ok=True)

    if not os.path.exists(src_folder):
        print(f"⚠️ لم يتم العثور على الفئة: {cls}")
        continue

    for file in os.listdir(src_folder):
        if file.endswith(".avi"):
            shutil.copy2(os.path.join(src_folder, file), dst_folder)

print(f"✅ تم نسخ الفيديوهات المختارة إلى: {subset_folder}")

# ========================================
# STEP 4: إنشاء ملف ZIP للفيديوهات المختارة
# ========================================
zip_subset_path = os.path.join(base_drive_path, "videos_subset.zip")
shutil.make_archive(zip_subset_path.replace(".zip", ""), 'zip', subset_folder)
print(f"✅ تم إنشاء ملف ZIP: {zip_subset_path}")

# ========================================
# STEP 5: حساب حجم المجلد
# ========================================
def get_folder_size(folder_path):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(folder_path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            total_size += os.path.getsize(fp)
    return total_size

size_bytes = get_folder_size(subset_folder)
size_gb = size_bytes / (1024**3)

print(f"💾 حجم المجلد videos_subset: {size_gb:.2f} GB")


Mounted at /content/drive
📥 تحميل UCF101 ZIP ...
Downloading...
From: https://huggingface.co/datasets/quchenyuan/UCF101-ZIP/resolve/main/UCF-101.zip
To: /content/drive/MyDrive/UCF101_project/UCF-101.zip
100% 6.96G/6.96G [01:29<00:00, 77.7MB/s]
📂 فك ضغط UCF101 ...
✅ تم فك الضغط، المجلد موجود في: /content/drive/MyDrive/UCF101_project/UCF-101
✅ تم نسخ الفيديوهات المختارة إلى: /content/drive/MyDrive/UCF101_project/videos_subset
✅ تم إنشاء ملف ZIP: /content/drive/MyDrive/UCF101_project/videos_subset.zip
💾 حجم المجلد videos_subset: 0.23 GB


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
subset_folder = "/content/drive/MyDrive/UCF101_project/videos_subset"


In [None]:
import os

# مسار المجلد المختار
subset_folder = "/content/drive/MyDrive/UCF101_project/UCF-101"

# التأكد من وجود المجلد
if os.path.exists(subset_folder):
    print("✅ المجلد موجود:", subset_folder)

    # عرض الفئات الموجودة بداخله
    classes = os.listdir(subset_folder)
    print("الفئات الموجودة داخل المجلد:")
    for cls in classes:
        cls_path = os.path.join(subset_folder, cls)
        # عدد ملفات الفيديو لكل فئة
        videos = [f for f in os.listdir(cls_path) if f.endswith(".avi")]
        print(f"- {cls}: {len(videos)} فيديوهات")
else:
    print("⚠️ المجلد غير موجود")


✅ المجلد موجود: /content/drive/MyDrive/UCF101_project/UCF-101
الفئات الموجودة داخل المجلد:
- BreastStroke: 101 فيديوهات
- Fencing: 111 فيديوهات
- HighJump: 123 فيديوهات
- ApplyLipstick: 114 فيديوهات
- BodyWeightSquats: 112 فيديوهات
- PlayingPiano: 105 فيديوهات
- PommelHorse: 123 فيديوهات
- CliffDiving: 0 فيديوهات
- FieldHockeyPenalty: 111 فيديوهات
- SalsaSpin: 133 فيديوهات
- FloorGymnastics: 125 فيديوهات
- Skijet: 100 فيديوهات
- Haircut: 130 فيديوهات
- TrampolineJumping: 119 فيديوهات
- LongJump: 131 فيديوهات
- ParallelBars: 114 فيديوهات
- Rafting: 111 فيديوهات
- ShavingBeard: 161 فيديوهات
- SkyDiving: 110 فيديوهات
- SumoWrestling: 116 فيديوهات
- TableTennisShot: 0 فيديوهات
- Typing: 135 فيديوهات
- Nunchucks: 132 فيديوهات
- PushUps: 0 فيديوهات
- JugglingBalls: 121 فيديوهات
- Kayaking: 141 فيديوهات
- Skiing: 135 فيديوهات
- Swing: 131 فيديوهات
- HorseRace: 0 فيديوهات
- Archery: 145 فيديوهات
- PlayingFlute: 155 فيديوهات
- SoccerPenalty: 137 فيديوهات
- FrontCrawl: 137 فيديوهات
- Basketball:

In [None]:
import os
import shutil

base_folder = "/content/drive/MyDrive/UCF101_project/UCF-101"
subset_folder = os.path.join(base_folder, "videos_subset")
os.makedirs(subset_folder, exist_ok=True)

classes = ["BasketballDunk", "HorseRiding", "PlayingGuitar", "Archery"]

for cls in classes:
    src_folder = os.path.join(base_folder, cls)
    dst_folder = os.path.join(subset_folder, cls)
    os.makedirs(dst_folder, exist_ok=True)

    if not os.path.exists(src_folder):
        print(f"⚠️ الفئة غير موجودة: {cls}")
        continue

    for file in os.listdir(src_folder):
        if file.endswith(".avi"):
            shutil.copy2(os.path.join(src_folder, file), dst_folder)

print(f"✅ تم نسخ الفيديوهات إلى: {subset_folder}")


✅ تم نسخ الفيديوهات إلى: /content/drive/MyDrive/UCF101_project/UCF-101/videos_subset


In [1]:
# ========================================
# 1️⃣ تثبيت المكتبات المطلوبة
# ========================================
!pip install torch torchvision transformers accelerate datasets gradio opencv-python --quiet

# ========================================
# 2️⃣ ربط Google Drive
# ========================================
from google.colab import drive
import os
drive.mount('/content/drive')

# ========================================
# 3️⃣ إعداد المسارات والفئات
# ========================================
dataset_folder = "/content/drive/MyDrive/UCF101_project/UCF-101"
save_model_path = "/content/drive/MyDrive/UCF101_project/blip_finetunned"
os.makedirs(save_model_path, exist_ok=True)

classes = ["ApplyLipstick", "PlayingPiano", "TrampolineJumping", "ShavingBeard"]
class_to_idx = {cls: idx for idx, cls in enumerate(classes)}

# ========================================
# 4️⃣ تحويل الفيديو إلى frames
# ========================================
import cv2
import numpy as np

def video_to_frames(video_path, max_frames=16):
    cap = cv2.VideoCapture(video_path)
    frames = []
    count = 0
    while cap.isOpened() and count < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frames.append(frame)
        count += 1
    cap.release()
    frames = np.array(frames)
    return frames

# ========================================
# 5️⃣ Dataset و DataLoader
# ========================================
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

class VideoFramesDataset(Dataset):
    def __init__(self, root_dir, classes, max_frames=16):
        self.samples = []
        self.labels = []
        self.classes = classes
        self.class_to_idx = {cls: idx for idx, cls in enumerate(classes)}
        self.max_frames = max_frames

        for cls in classes:
            cls_folder = os.path.join(root_dir, cls)
            if not os.path.exists(cls_folder):
                print(f"⚠️ الفئة غير موجودة: {cls}")
                continue
            for file in os.listdir(cls_folder):
                if file.endswith(".avi"):
                    self.samples.append(os.path.join(cls_folder, file))
                    self.labels.append(self.class_to_idx[cls])

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        video_path = self.samples[idx]
        frames = video_to_frames(video_path, self.max_frames)
        frames = torch.tensor(frames).permute(0,3,1,2).float()  # (T, C, H, W)
        label = self.labels[idx]
        return frames, label

# تجهيز البيانات
dataset = VideoFramesDataset(dataset_folder, classes, max_frames=16)
train_indices, test_indices = train_test_split(range(len(dataset)), test_size=0.2, stratify=dataset.labels)
train_dataset = torch.utils.data.Subset(dataset, train_indices)
test_dataset = torch.utils.data.Subset(dataset, test_indices)

train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=2)

# ========================================
# 6️⃣ إعداد نموذج BLIP + MLP Classifier مع Fine-tuning جزئي
# ========================================
from transformers import BlipForConditionalGeneration, BlipProcessor
import torch.nn as nn
import torch.optim as optim

model_name = "Salesforce/blip-image-captioning-base"
processor = BlipProcessor.from_pretrained(model_name)
blip_model = BlipForConditionalGeneration.from_pretrained(model_name)
device = "cuda" if torch.cuda.is_available() else "cpu"
blip_model.to(device)

# Freeze معظم layers
for name, param in blip_model.vision_model.named_parameters():
    param.requires_grad = False

# فتح آخر layer فقط للتدريب
for param in blip_model.vision_model.encoder.layers[-1].parameters():
    param.requires_grad = True

# إنشاء classifier
dummy_frame = torch.randn(1,3,224,224).to(device)
with torch.no_grad():
    dummy_output = blip_model.vision_model(dummy_frame).last_hidden_state.mean(dim=1)
hidden_dim = dummy_output.size(1)
classifier = nn.Linear(hidden_dim, len(classes)).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(list(classifier.parameters()) + list(blip_model.vision_model.encoder.layers[-1].parameters()), lr=1e-4)

# ========================================
# 7️⃣ تدريب الموديل
# ========================================
epochs = 3  # يمكن زيادتها حسب الحاجة
for epoch in range(epochs):
    blip_model.train()
    classifier.train()

    for frames, labels in train_loader:
        batch_size, T, C, H, W = frames.shape
        frames = frames.to(device)
        labels = labels.to(device)

        video_embeds = []
        for t in range(T):
            pixel_values = processor(images=frames[:,t].permute(0,2,3,1).cpu().numpy(),
                                     return_tensors="pt").pixel_values.to(device)
            output = blip_model.vision_model(pixel_values).last_hidden_state.mean(dim=1)
            video_embeds.append(output)
        video_embeds = torch.stack(video_embeds).mean(dim=0)

        optimizer.zero_grad()
        logits = classifier(video_embeds)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1} done, Loss: {loss.item():.4f}")

# ========================================
# 8️⃣ تقييم الموديل
# ========================================
blip_model.eval()
classifier.eval()
correct = 0
total = 0
with torch.no_grad():
    for frames, labels in test_loader:
        batch_size, T, C, H, W = frames.shape
        frames = frames.to(device)
        labels = labels.to(device)

        video_embeds = []
        for t in range(T):
            pixel_values = processor(images=frames[:,t].permute(0,2,3,1).cpu().numpy(),
                                     return_tensors="pt").pixel_values.to(device)
            output = blip_model.vision_model(pixel_values).last_hidden_state.mean(dim=1)
            video_embeds.append(output)
        video_embeds = torch.stack(video_embeds).mean(dim=0)

        logits = classifier(video_embeds)
        _, predicted = torch.max(logits, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Accuracy: {correct/total*100:.2f}%")

# ========================================
# 9️⃣ حفظ الموديل على Drive
# ========================================
blip_model.save_pretrained(save_model_path)
torch.save(classifier.state_dict(), os.path.join(save_model_path, "classifier.pth"))
print(f"✅ تم حفظ الموديل على: {save_model_path}")

# ========================================
# 10️⃣ واجهة Gradio لتصنيف + وصف الفيديو
# ========================================
import gradio as gr

def classify_and_describe_video(video_path):
    frames = video_to_frames(video_path, max_frames=8)
    frames_tensor = torch.tensor(frames).permute(0,3,1,2).float().to(device)
    T, C, H, W = frames_tensor.shape

    # تصنيف الفيديو
    video_embeds = []
    for t in range(T):
        pixel_values = processor(images=frames_tensor[t].permute(1,2,0).cpu().numpy(),
                                 return_tensors="pt").pixel_values.to(device)
        with torch.no_grad():
            output = blip_model.vision_model(pixel_values).last_hidden_state.mean(dim=1)
        video_embeds.append(output)
    video_embeds = torch.stack(video_embeds).mean(dim=0)
    logits = classifier(video_embeds)
    pred_class = classes[torch.argmax(logits).item()]

    # وصف الفيديو باستخدام BLIP (أخذ أول 3 frames فقط)
    descriptions = []
    for t in range(min(3, T)):
        pixel_values = processor(images=frames[t], return_tensors="pt").pixel_values.to(device)
        with torch.no_grad():
            output_ids = blip_model.generate(pixel_values)
        desc = processor.decode(output_ids[0], skip_special_tokens=True)
        descriptions.append(desc)
    video_description = " ".join(descriptions)

    return f"Class: {pred_class}", video_description

iface = gr.Interface(
    fn=classify_and_describe_video,
    inputs=gr.Video(source="upload"),  # التغيير هنا
    outputs=[gr.Textbox(label="Predicted Class"),
             gr.Textbox(label="Video Description")],
    title="Video Classification + Description with BLIP Fine-tuning",
    description="Upload a video. The model predicts the class and generates a short description."
)

iface.launch()



Mounted at /content/drive


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Fetching 1 files:   0%|          | 0/1 [00:00<?, ?it/s]

preprocessor_config.json:   0%|          | 0.00/287 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/506 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/990M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/990M [00:00<?, ?B/s]

Epoch 1 done, Loss: 0.0031


It looks like you are trying to rescale already rescaled images. If the input images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again.


Epoch 2 done, Loss: 0.0014
Epoch 3 done, Loss: 0.0006
Accuracy: 100.00%
✅ تم حفظ الموديل على: /content/drive/MyDrive/UCF101_project/blip_finetunned


TypeError: Video.__init__() got an unexpected keyword argument 'source'

In [3]:
import gradio as gr

def classify_and_describe_video(video_path):
    frames = video_to_frames(video_path, max_frames=8)
    frames_tensor = torch.tensor(frames).permute(0,3,1,2).float().to(device)
    T, C, H, W = frames_tensor.shape

    # تصنيف الفيديو
    video_embeds = []
    for t in range(T):
        pixel_values = processor(images=frames_tensor[t].permute(1,2,0).cpu().numpy(),
                                 return_tensors="pt").pixel_values.to(device)
        with torch.no_grad():
            output = blip_model.vision_model(pixel_values).last_hidden_state.mean(dim=1)
        video_embeds.append(output)
    video_embeds = torch.stack(video_embeds).mean(dim=0)
    logits = classifier(video_embeds)
    pred_class = classes[torch.argmax(logits).item()]

    # وصف الفيديو باستخدام BLIP (أخذ أول 3 frames فقط)
    descriptions = []
    for t in range(min(3, T)):
        pixel_values = processor(images=frames[t], return_tensors="pt").pixel_values.to(device)
        with torch.no_grad():
            output_ids = blip_model.generate(pixel_values)
        desc = processor.decode(output_ids[0], skip_special_tokens=True)
        descriptions.append(desc)
    video_description = " ".join(descriptions)

    return f"Class: {pred_class}", video_description

iface = gr.Interface(
    fn=classify_and_describe_video,
    inputs=gr.Video(),  # فقط بدون type أو source
    outputs=[gr.Textbox(label="Predicted Class"),
             gr.Textbox(label="Video Description")],
    title="Video Classification + Description with BLIP Fine-tuning",
    description="Upload a video. The model predicts the class and generates a short description."
)

iface.launch()


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://18bf5a97b1ec49f8f1.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




TypeError: Video.__init__() got an unexpected keyword argument 'source'

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://e475e3f09fbe73af2a.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [None]:
from google.colab import drive
import os
drive.mount('/content/drive')

# مسار الحفظ داخل Google Drive
save_model_path_drive = "/content/drive/MyDrive/UCF101_project/blip_finetuned"
os.makedirs(save_model_path_drive, exist_ok=True)

# حفظ موديل BLIP
blip_model.save_pretrained(save_model_path_drive)

# حفظ Classifier
torch.save(classifier.state_dict(), os.path.join(save_model_path_drive, "classifier.pth"))

print(f"✅ تم حفظ الموديل على Google Drive: {save_model_path_drive}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ تم حفظ الموديل على Google Drive: /content/drive/MyDrive/UCF101_project/blip_finetuned
