In [5]:
# الخلية 1: تحميل المكتبات المطلوبة
# هذه المكتبات تستخدم لمعالجة النصوص، معالجة الصور، بناء الشبكات العصبية، والتدريب
import nltk
import os
import re
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import Counter
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import CocoCaptions
from torchvision import transforms, models
from torch.nn.utils.rnn import pack_padded_sequence
import torch.optim as optim

# تحميل بيانات NLTK اللازمة لتقطيع النصوص
nltk.download('punkt')
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\20114\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\20114\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [6]:
# الخلية 2: إعداد تحويلات الصور ومسارات البيانات
# هذه التحويلات تعد الصور للشبكة العصبية (تغيير الحجم وتحويلها إلى تنسيق tensor)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# مسارات بيانات COCO
root = r"D:\Level_3\term 1\Deep_Learning\Codes\Fenv\subset_coco\images\train2017"
annFile = r"subset_coco/annotations/captions_train2017_40k.json"

In [7]:
# الخلية 3: تحميل بيانات COCO وفحصها
# هنا نقوم بتحميل مجموعة بيانات COCO للصور والتعليقات النصية المصاحبة
coco_train = CocoCaptions(
    root=root,
    annFile=annFile,
    transform=transform
)

print("Number of samples:", len(coco_train))

# اختبار تحميل عينة لفحص شكل البيانات
img, captions = coco_train[5]
print(f"Image shape: {img.shape}")
print(f"Number of captions: {len(captions)}")
print(f"First caption: {captions[0]}")

loading annotations into memory...
Done (t=1.40s)
creating index...
index created!
Number of samples: 40000
Image shape: torch.Size([3, 224, 224])
Number of captions: 5
First caption: A piece of cake and coffee are on an outdoor table.


In [8]:
# الخلية 3: تحميل بيانات COCO وفحصها
# هنا نقوم بتحميل مجموعة بيانات COCO للصور والتعليقات النصية المصاحبة
coco_train = CocoCaptions(
    root=root,
    annFile=annFile,
    transform=transform
)

print("Number of samples:", len(coco_train))

# اختبار تحميل عينة لفحص شكل البيانات
img, captions = coco_train[5]
print(f"Image shape: {img.shape}")
print(f"Number of captions: {len(captions)}")
print(f"First caption: {captions[0]}")

loading annotations into memory...
Done (t=0.80s)
creating index...
index created!
Number of samples: 40000
Image shape: torch.Size([3, 224, 224])
Number of captions: 5
First caption: A piece of cake and coffee are on an outdoor table.


In [9]:
# الخلية 4: وظيفة تنظيف النصوص
# تقوم بتنظيف النص من الرموز الخاصة وتحويله إلى حروف صغيرة ثم تقسيمه إلى كلمات
def clean_caption(text):
    text = text.lower()
    text = re.sub(r"[^a-zA-Z0-9]+", " ", text)
    return nltk.word_tokenize(text)

In [10]:
# الخلية 5: بناء المفردات (Vocabulary)
# هذه الفئة تبني مفردات من الكلمات المستخدمة في التعليقات مع تجاهل الكلمات النادرة
class Vocabulary:
    def __init__(self, freq_threshold=5):
        self.freq_threshold = freq_threshold
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {v: k for k, v in self.itos.items()}

    def tokenize(self, text):
        text = text.lower()
        text = re.sub(r"[^a-zA-Z0-9]+", " ", text)
        return nltk.word_tokenize(text)

    def build(self, captions):
        counter = Counter()
        for c in captions:
            counter.update(self.tokenize(c))

        idx = 4
        for word, freq in counter.items():
            if freq >= self.freq_threshold:
                self.stoi[word] = idx
                self.itos[idx] = word
                idx += 1

    def numericalize(self, text):
        tokens = self.tokenize(text)
        return [self.stoi.get(t, self.stoi["<UNK>"]) for t in tokens]

In [11]:
# الخلية 6: بناء المفردات من جميع التعليقات
# هنا نقوم بجمع كل التعليقات من مجموعة البيانات وبناء المفردات
all_caps = []

for i in range(len(coco_train)):
    _, caps = coco_train[i]
    all_caps.extend(caps)

vocab = Vocabulary(freq_threshold=5)
vocab.build(all_caps)

print("Vocabulary size:", len(vocab.itos))

Vocabulary size: 6431


In [12]:
# الخلية 7: فئة Dataset المخصصة لبيانات COCO
# تحول الصور والتعليقات إلى تنسيق يمكن للشبكة العصبية التعامل معه
class CocoDS(Dataset):
    def __init__(self, root, annFile, transform, vocab):
        self.ds = CocoCaptions(root, annFile, transform)
        self.vocab = vocab

    def __len__(self):
        return len(self.ds)

    def __getitem__(self, idx):
        img, caps = self.ds[idx]
        cap = caps[0]  # نأخذ أول تعليق فقط

        numerical = [self.vocab.stoi["<SOS>"]]
        numerical += self.vocab.numericalize(cap)
        numerical += [self.vocab.stoi["<EOS>"]]

        return img, torch.tensor(numerical)

In [13]:
# الخلية 8: وظيفة collate_fn لتنظيم الدفعات
# تقوم بترتيب البيانات في الدفعات حسب طول النصوص وإضافة حشوات (padding)
def collate_fn(batch):
    batch.sort(key=lambda x: len(x[1]), reverse=True)
    imgs, caps = zip(*batch)

    lengths = torch.tensor([len(cap) for cap in caps])
    imgs = torch.stack(imgs)

    caps = torch.nn.utils.rnn.pad_sequence(
        caps,
        batch_first=True,
        padding_value=vocab.stoi["<PAD>"]
    )

    return imgs, caps, lengths

In [14]:
# الخلية 9: إنشاء DataLoader
# يقوم بتحميل البيانات على شكل دفعات للتدريب
dataset = CocoDS(root, annFile, transform, vocab)

loader = DataLoader(
    dataset,
    batch_size=20,
    shuffle=True,
    collate_fn=collate_fn
)

print("DataLoader جاهز!")

loading annotations into memory...
Done (t=0.68s)
creating index...
index created!
DataLoader جاهز!


In [15]:
# الخلية 10: تحديد الجهاز (GPU إذا متوفر)
# اختيار بين GPU أو CPU للتدريب الأسرع
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [16]:
# الخلية 11: بناء نموذج التشفير (Encoder) CNN
# يستخدم ResNet50 لاستخراج المميزات من الصور
class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
        modules = list(resnet.children())[:-2]
        self.backbone = nn.Sequential(*modules)
        self.avgpool = resnet.avgpool
        self.embed = nn.Linear(resnet.fc.in_features, embed_size)
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)
    
    def forward(self, images):
        features_map = self.backbone(images)
        b, c, h, w = features_map.size()
        features_seq = features_map.permute(0, 2, 3, 1).contiguous().view(b, h*w, c)
        pooled = self.avgpool(features_map)
        pooled = pooled.view(pooled.size(0), -1)
        pooled = self.bn(self.embed(pooled))
        return pooled, features_seq

In [17]:
# الخلية 12: بناء نماذج فك التشفير المختلفة (Decoder)
# هنا نجد ثلاثة أنواع من فك التشفير: أساسي، مع انتباه (attention)، ومحول (transformer)
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, features, captions, lengths):
        embeddings = self.embed(captions)
        inputs = torch.cat((features.unsqueeze(1), embeddings), 1)
        packed = pack_padded_sequence(inputs, lengths.cpu(), batch_first=True)
        hiddens, _ = self.lstm(packed)
        outputs = self.linear(hiddens[0])
        return outputs


class Attention(nn.Module):
    def __init__(self, encoder_dim, decoder_dim):
        super(Attention, self).__init__()
        self.encoder_att = nn.Linear(encoder_dim, decoder_dim)
        self.decoder_att = nn.Linear(decoder_dim, decoder_dim)
        self.full_att = nn.Linear(decoder_dim, 1)
    
    def forward(self, encoder_outputs, decoder_hidden):
        att1 = self.encoder_att(encoder_outputs)
        att2 = self.decoder_att(decoder_hidden).unsqueeze(1)
        energy = self.full_att(torch.tanh(att1 + att2)).squeeze(2)
        alpha = F.softmax(energy, dim=1)
        attention_weighted_encoding = (encoder_outputs * alpha.unsqueeze(2)).sum(dim=1)
        return attention_weighted_encoding, alpha


class DecoderWithLSTM(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, encoder_dim=2048, num_layers=1):
        super(DecoderWithLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.attention = Attention(encoder_dim, hidden_size)
        self.lstm = nn.LSTM(embed_size + encoder_dim, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, encoder_outputs, captions):
        embeddings = self.embedding(captions)
        h = torch.zeros(1, captions.size(0), self.lstm.hidden_size).to(captions.device)
        c = torch.zeros(1, captions.size(0), self.lstm.hidden_size).to(captions.device)
        outputs = []
        
        for t in range(captions.size(1)):
            attention_weighted_encoding, _ = self.attention(encoder_outputs, h[-1])
            lstm_input = torch.cat((embeddings[:, t, :], attention_weighted_encoding), dim=1).unsqueeze(1)
            output, (h, c) = self.lstm(lstm_input, (h, c))
            output = self.fc(output.squeeze(1))
            outputs.append(output)
        
        return torch.stack(outputs, dim=1)


class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, hidden_dim, num_layers, max_len=50):
        super(TransformerDecoder, self).__init__()
        self.token_embed = nn.Embedding(vocab_size, embed_size)
        self.pos_embed = nn.Embedding(max_len, embed_size)
        decoder_layer = nn.TransformerDecoderLayer(d_model=embed_size, nhead=num_heads, dim_feedforward=hidden_dim)
        self.transformer = nn.TransformerDecoder(decoder_layer, num_layers)
        self.fc_out = nn.Linear(embed_size, vocab_size)
        self.scale = math.sqrt(embed_size)
    
    def forward(self, encoder_outputs, captions):
        seq_len = captions.size(1)
        positions = torch.arange(0, seq_len, device=captions.device).unsqueeze(0)
        tgt = self.token_embed(captions) * self.scale + self.pos_embed(positions)
        tgt = tgt.permute(1, 0, 2)
        memory = encoder_outputs.permute(1, 0, 2)
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(seq_len).to(captions.device)
        output = self.transformer(tgt, memory, tgt_mask=tgt_mask)
        output = output.permute(1, 0, 2)
        return self.fc_out(output)

In [18]:
# الخلية 13: النموذج الرئيسي للتعليق على الصور
# يجمع بين المشفر وفك التشفير (بأحد الأنواع الثلاثة)
class CaptionModel(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, decoder_type="baseline", transformer_params=None):
        super(CaptionModel, self).__init__()
        self.encoder = EncoderCNN(embed_size)
        self.decoder_type = decoder_type
        
        if decoder_type == "baseline":
            self.decoder = DecoderRNN(embed_size, hidden_size, vocab_size)
        elif decoder_type == "attention":
            self.decoder = DecoderWithLSTM(embed_size, hidden_size, vocab_size, encoder_dim=2048)
        elif decoder_type == "transformer":
            if transformer_params is None:
                transformer_params = {"num_heads": 8, "hidden_dim": 2048, "num_layers": 2, "max_len": 50}
            self.decoder = TransformerDecoder(vocab_size, embed_size, transformer_params["num_heads"], 
                                             transformer_params["hidden_dim"], transformer_params["num_layers"], 
                                             transformer_params["max_len"])
        else:
            raise ValueError("Unknown decoder type")
    
    def forward(self, images, captions, lengths=None):
        pooled, features_seq = self.encoder(images)
        if self.decoder_type == "baseline":
            return self.decoder(pooled, captions, lengths)
        elif self.decoder_type == "attention":
            return self.decoder(features_seq, captions)
        elif self.decoder_type == "transformer":
            return self.decoder(features_seq, captions)

In [19]:
# الخلية 14: إعداد النموذج، دالة الخسارة، والمحسن
# تحديد المعلمات وتهيئة النموذج للتدريب
EMBED_SIZE = 256
HIDDEN_SIZE = 512
NUM_EPOCHS = 5
LEARNING_RATE = 0.001

vocab_size = len(vocab.itos)
decoder_type = "attention"
model = CaptionModel(EMBED_SIZE, HIDDEN_SIZE, vocab_size, decoder_type=decoder_type).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<PAD>"])
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [20]:
# الخلية 15: وظيفة حساب الـ accuracy
# تقوم بحساب دقة النموذج بتجاهل كلمات الحشو (PAD)
def calculate_accuracy(predictions, targets, vocab):
    """
    حساب دقة النموذج
    predictions: shape (batch_size * seq_len, vocab_size)
    targets: shape (batch_size * seq_len)
    """
    # الحصول على الكلمات المتوقعة
    _, predicted_words = predictions.max(1)
    
    # تجاهل كلمات PAD في الحساب
    non_pad_mask = targets != vocab.stoi["<PAD>"]
    
    if non_pad_mask.sum().item() == 0:
        return 0.0
    
    # حساب الدقة فقط للكلمات غير PAD
    correct = (predicted_words[non_pad_mask] == targets[non_pad_mask]).sum().item()
    total = non_pad_mask.sum().item()
    
    accuracy = correct / total * 100
    return accuracy

In [21]:
# الخلية 16: وظيفة التدريب مع حساب الـ accuracy
# تقوم بتدريب النموذج وحساب الدقة في كل خطوة
def train_model(model, loader, criterion, optimizer, num_epochs, device, vocab):
    model.train()
    total_step = len(loader)
    
    for epoch in range(num_epochs):
        epoch_loss = 0
        epoch_accuracy = 0
        total_samples = 0
        
        for i, (images, captions, lengths) in enumerate(loader):
            images = images.to(device)
            captions = captions.to(device)
            batch_size = images.size(0)
            
            if model.decoder_type == "baseline":
                targets = captions[:, 1:]
                optimizer.zero_grad()
                outputs = model(images, captions[:, :-1], lengths-1)
                targets_packed = pack_padded_sequence(targets, lengths.cpu()-1, batch_first=True)[0]
                loss = criterion(outputs, targets_packed)
                
                # حساب الaccuracy للـbatch
                accuracy = calculate_accuracy(outputs, targets_packed, vocab)
                
            elif model.decoder_type == "attention":
                inputs = captions[:, :-1]
                targets = captions[:, 1:]
                optimizer.zero_grad()
                outputs = model(images, inputs)
                outputs_reshaped = outputs.reshape(-1, outputs.size(2))
                targets_reshaped = targets.reshape(-1)
                loss = criterion(outputs_reshaped, targets_reshaped)
                
                # حساب الaccuracy للـbatch
                accuracy = calculate_accuracy(outputs_reshaped, targets_reshaped, vocab)
                
            elif model.decoder_type == "transformer":
                inputs = captions[:, :-1]
                targets = captions[:, 1:]
                optimizer.zero_grad()
                outputs = model(images, inputs)
                outputs_reshaped = outputs.reshape(-1, outputs.size(2))
                targets_reshaped = targets.reshape(-1)
                loss = criterion(outputs_reshaped, targets_reshaped)
                
                # حساب الaccuracy للـbatch
                accuracy = calculate_accuracy(outputs_reshaped, targets_reshaped, vocab)
            
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item() * batch_size
            epoch_accuracy += accuracy * batch_size
            total_samples += batch_size
            
            if (i+1) % 100 == 0:
                print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{total_step}], Loss: {loss.item():.4f}, Accuracy: {accuracy:.2f}%")
        
        # حساب متوسط الـloss والـaccuracy للـepoch
        avg_epoch_loss = epoch_loss / total_samples
        avg_epoch_accuracy = epoch_accuracy / total_samples
        print(f"Epoch [{epoch+1}/{num_epochs}] - Average Loss: {avg_epoch_loss:.4f}, Average Accuracy: {avg_epoch_accuracy:.2f}%")
        
        # حفظ النموذج بعد كل epoch
        save_model(model, epoch+1)
    
    print("Training finished.")
    return model


# الخلية 17: وظيفة حفظ النموذج
# تحفظ النموذج والأوزان بعد كل epoch
def save_model(model, epoch):
    # حفظ الأوزان كملف .pth
    torch.save(model.state_dict(), f'model_epoch_{epoch}.pth')
    
    # حفظ النموذج الكامل كملف .pkl
    torch.save(model, f'model_complete_epoch_{epoch}.pkl')
    
    # حفظ معلومات التدريب
    training_info = {
        'epoch': epoch,
        'vocab_size': len(vocab.itos),
        'embed_size': EMBED_SIZE,
        'hidden_size': HIDDEN_SIZE,
        'decoder_type': decoder_type
    }
    torch.save(training_info, f'training_info_epoch_{epoch}.pth')
    
    print(f"Model saved after epoch {epoch}")

In [None]:
# الخلية 18: بدء التدريب
# هنا نبدأ عملية تدريب النموذج
model = train_model(model, loader, criterion, optimizer, 2, device, vocab)

Epoch [1/2], Step [100/2000], Loss: 4.8017, Accuracy: 29.03%
Epoch [1/2], Step [200/2000], Loss: 4.1667, Accuracy: 29.89%
Epoch [1/2], Step [300/2000], Loss: 3.7918, Accuracy: 38.53%
Epoch [1/2], Step [400/2000], Loss: 3.4692, Accuracy: 38.67%
Epoch [1/2], Step [500/2000], Loss: 3.6239, Accuracy: 37.12%
Epoch [1/2], Step [600/2000], Loss: 3.7009, Accuracy: 33.48%
Epoch [1/2], Step [700/2000], Loss: 3.8100, Accuracy: 37.89%
Epoch [1/2], Step [800/2000], Loss: 3.2850, Accuracy: 38.30%


In [None]:
# الخلية 19: وظيفة توليد تعليقات باستخدام Beam Search
# تستخدم خوارزمية Beam Search لتوليد أفضل التعليقات للصور
def generate_caption_beam(model, image, vocab, device, beam_size=3, max_len=20):
    model.eval()
    
    with torch.no_grad():
        image = image.unsqueeze(0).to(device)
        _, encoder_outputs = model.encoder(image)
        
        sequences = [
            (
                [vocab.stoi["<SOS>"]],
                0.0,
                torch.zeros(1, 1, model.decoder.lstm.hidden_size).to(device),
                torch.zeros(1, 1, model.decoder.lstm.hidden_size).to(device)
            )
        ]
        
        for _ in range(max_len):
            all_candidates = []
            
            for seq, score, h, c in sequences:
                last_word = torch.tensor([seq[-1]]).to(device)
                
                if last_word.item() == vocab.stoi["<EOS>"]:
                    all_candidates.append((seq, score, h, c))
                    continue
                
                embed = model.decoder.embedding(last_word).squeeze(1)
                attn_output, _ = model.decoder.attention(encoder_outputs, h[-1])
                lstm_input = torch.cat((embed, attn_output), dim=1).unsqueeze(1)
                output, (h_new, c_new) = model.decoder.lstm(lstm_input, (h, c))
                logits = model.decoder.fc(output.squeeze(1))
                log_probs = torch.log_softmax(logits, dim=1)
                
                top_log_probs, top_indices = log_probs.topk(beam_size)
                
                for i in range(beam_size):
                    candidate = (
                        seq + [top_indices[0][i].item()],
                        score + top_log_probs[0][i].item(),
                        h_new,
                        c_new
                    )
                    all_candidates.append(candidate)
            
            sequences = sorted(all_candidates, key=lambda x: x[1], reverse=True)[:beam_size]
        
        best_sequence = sequences[0][0]
        
        caption = [
            vocab.itos[idx]
            for idx in best_sequence
            if idx not in (vocab.stoi["<SOS>"], vocab.stoi["<EOS>"], vocab.stoi["<PAD>"])
        ]
    
    return " ".join(caption)

In [None]:
# الخلية 20: اختبار النموذج بتوليد تعليق لصورة
# نقوم باختبار النموذج على صورة معينة ومقارنة النتيجة بالتعليقات الحقيقية
idx = 11
image, captions = coco_train[idx]

generated_caption = generate_caption_beam(
    model=model,
    image=image,
    vocab=vocab,
    device=device,
    beam_size=3,
    max_len=20
)

print("Generated Caption:")
print(generated_caption)

print("\nGround Truth Captions:")
for i, c in enumerate(captions):
    print(f"{i+1}. {c}")

In [None]:
# الخلية 21: تحميل نموذج محفوظ
# وظيفة لتحميل نموذج محفوظ مسبقاً للاستمرار في التدريب أو التقييم
def load_model(epoch, device):
    # تحميل الأوزان
    model = CaptionModel(EMBED_SIZE, HIDDEN_SIZE, vocab_size, decoder_type=decoder_type).to(device)
    model.load_state_dict(torch.load(f'model_epoch_{epoch}.pth'))
    
    # تحميل معلومات التدريب
    training_info = torch.load(f'training_info_epoch_{epoch}.pth')
    
    model.eval()
    return model, training_info


# مثال لتحميل النموذج من epoch 1
# loaded_model, info = load_model(1, device)
# print(f"Loaded model from epoch {info['epoch']}")

In [None]:
# الخلية 22: تقييم النموذج على مجموعة اختبار
# يمكن استخدام هذه الوظيفة لتقييم أداء النموذج على بيانات جديدة
def evaluate_model(model, test_loader, device, vocab):
    model.eval()
    total_accuracy = 0
    total_samples = 0
    
    with torch.no_grad():
        for images, captions, lengths in test_loader:
            images = images.to(device)
            captions = captions.to(device)
            batch_size = images.size(0)
            
            if model.decoder_type == "baseline":
                targets = captions[:, 1:]
                outputs = model(images, captions[:, :-1], lengths-1)
                targets_packed = pack_padded_sequence(targets, lengths.cpu()-1, batch_first=True)[0]
                accuracy = calculate_accuracy(outputs, targets_packed, vocab)
                
            elif model.decoder_type == "attention" or model.decoder_type == "transformer":
                inputs = captions[:, :-1]
                targets = captions[:, 1:]
                outputs = model(images, inputs)
                outputs_reshaped = outputs.reshape(-1, outputs.size(2))
                targets_reshaped = targets.reshape(-1)
                accuracy = calculate_accuracy(outputs_reshaped, targets_reshaped, vocab)
            
            total_accuracy += accuracy * batch_size
            total_samples += batch_size
    
    avg_accuracy = total_accuracy / total_samples
    print(f"Test Accuracy: {avg_accuracy:.2f}%")
    return avg_accuracy