# pre works

In [None]:
# -- 필요한 라이브러리 설치 --
!pip install transformers
!pip install Keras-Preprocessing
# !pip install fasttext

In [None]:
# -- import --
import pandas as pd
import torch
from transformers import ElectraForSequenceClassification, AutoTokenizer, AdamW, ElectraTokenizer
from transformers import get_linear_schedule_with_warmup
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from keras_preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
import random
import numpy as np
from sklearn.metrics import classification_report
# GPU 사용
device = torch.device("cuda")

In [4]:
# # -- making temporary df to test multimodal --
# df_for_multimodal = pd.DataFrame()
# for cat_id in df['cat_id'].unique():
#     df_for_multimodal = pd.concat([df_for_multimodal, df[df['cat_id'] == cat_id].head(1)])
# df_for_multimodal = df_for_multimodal.reset_index(drop=True)
# df_for_multimodal.to_csv('/content/drive/MyDrive/df_for_multimodal.csv', index=False)

# -- 필요 데이터 --
import pandas as pd
import re

# -- 간단한 테스트용 170개 row 데이터 --
# df = pd.read_csv('/content/drive/MyDrive/df_for_multimodal.csv')
# df['product_name'] = df['product_name'].apply(lambda x : re.sub('[^가-힣\sa-zA-Z]', '', x))
# df.info()

# -- 실제 이미지 크롤링 할 때 사용한 파일 --
df = pd.read_csv('/content/drive/MyDrive/bungae_df_for_image_crawling.csv')
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100169 entries, 0 to 100168
Data columns (total 7 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   product_id    100169 non-null  int64  
 1   product_name  100169 non-null  object 
 2   image_url     100169 non-null  object 
 3   image_cnt     100169 non-null  float64
 4   cat_id        100169 non-null  object 
 5   main_cat      100169 non-null  object 
 6   mid_cat       100169 non-null  object 
dtypes: float64(1), int64(1), object(5)
memory usage: 5.3+ MB


In [None]:
# -- 다시 필요한 작업 --
# 1. 이미지를 크롤링 하면서 사라진 것들이 있음. 이미지가 없는 row는 삭제해주기
# 2. 메인 카테고리로 할 것이기 때문에 이미지 폴더 대분류 기준으로 다시 만들고 이미지들도 그에 맞게 옮겨주기

# --> 주피터 노트북에서 작업

# ----- main below ------

# Multimodal test model (Fasttext + MobileNet2) -> failed

In [46]:
# -- model --
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision.models import mobilenet_v2
import fasttext
from sklearn.preprocessing import LabelEncoder
import gensim


# Define the multi-modal model
class MultiModalModel(nn.Module):
    def __init__(self, num_classes):
        super(MultiModalModel, self).__init__()

        # Load the FastText word embedding model
        self.word_embedding = gensim.models.KeyedVectors.load_word2vec_format('/content/drive/MyDrive/wiki.ko.vec', binary=False)

        # Load the MobileNetV2 pre-trained model
        self.image_model = mobilenet_v2(pretrained=True)
        self.image_model.classifier = nn.Identity()

        # Text processing layers
        self.text_fc = nn.Linear(300, 128)

        # Image processing layers
        self.image_fc = nn.Linear(1280, 128)

        # Output layer
        self.output_fc = nn.Linear(256, num_classes)

    def forward(self, texts, images):
        # Text processing
        if isinstance(texts, tuple):
            # Handle batch input
            text_tokens = texts[0].split()
            text_embedding = torch.mean(
                torch.stack([torch.from_numpy(self.word_embedding.get_vector(token))
                            if token in self.word_embedding.key_to_index
                            else torch.zeros(300)
                            for token in text_tokens]), dim=0)
        else:
            # Handle single input
            text_tokens = texts.split()
            text_embedding = torch.mean(
                torch.stack([torch.from_numpy(self.word_embedding.get_vector(token))
                            if token in self.word_embedding.key_to_index
                            else torch.zeros(300)
                            for token in text_tokens]), dim=0)

        text_output = F.relu(self.text_fc(text_embedding))

        # Image processing
        image_output = self.image_model(images)

        # Combine modalities
        combined = torch.cat((text_output, image_output), dim=1)

        # Classification
        output = self.classifier(combined)
        return output

# Custom Dataset class
class MultiModalDataset(Dataset):
    def __init__(self, texts, images, labels):
        self.texts = texts
        self.images = images
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        text = self.texts[index]
        image = self.images[index]
        label = self.labels[index]
        return text, image, label


In [47]:
# -- text data --
texts = list(df['product_name'])

# -- image paths --

images = list()
for idx in range(len(df)):
    cat_id = df.loc[idx, 'cat_id']
    prd_id = df.loc[idx, 'product_id']
    image_path = f'data/drive/MyDrive/bungae_fashion_image/{cat_id}/{prd_id}_image.jpg'
    images.append(image_path)

# -- label --
encoder = LabelEncoder()
df['label'] = encoder.fit_transform(df['cat_id'])
labels = torch.tensor(list(df['label']))

# Create the multi-modal dataset and data loader
dataset = MultiModalDataset(texts, images, labels)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)


In [48]:
# Initialize the model
num_classes = 170
model = MultiModalModel(num_classes)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)




In [None]:
from tqdm.notebook import tqdm

# Training loop
for epoch in range(10):
    for texts, images, labels in dataloader:
        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(texts, images)

        # Compute loss
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Print the loss for monitoring
        print(f'Epoch [{epoch+1}/10], Loss: {loss.item():.4f}')

# Multimodal test model (Koelectra + MobileNet2) -> working

In [69]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from transformers import ElectraTokenizer, ElectraModel
from PIL import Image


# Custom Dataset
class MultiModalDataset(Dataset):
    def __init__(self, text_data, image_data, labels, text_tokenizer):
        self.text_data = text_data
        self.image_data = image_data
        self.labels = labels
        self.text_tokenizer = text_tokenizer
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ])

    def __len__(self):
        return len(self.text_data)

    def __getitem__(self, idx):
        text = self.text_data[idx]
        image = Image.open(self.image_data[idx]).convert("RGB")
        image = self.transform(image)
        label = self.labels[idx]

        return text, image, label

# Define your data examples (text, image paths, and labels)
text_data = list(df['product_name'])

image_data = list()
for idx in range(len(df)):
    cat_id = df.loc[idx, 'cat_id']
    prd_id = df.loc[idx, 'product_id']
    image_path = f'data/drive/MyDrive/bungae_fashion_image/{cat_id}/{prd_id}_image.jpg'
    image_data.append(image_path)

encoder = LabelEncoder()
df['label'] = encoder.fit_transform(df['cat_id'])
labels = df['label']

# Initialize the text tokenizer
text_tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")


# Create the custom dataset
dataset = MultiModalDataset(text_data, image_data, labels, text_tokenizer)

# Define data loaders
batch_size = 8
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Define the multi-modal model
class MultiModalModel(nn.Module):
    def __init__(self, num_classes):
        super(MultiModalModel, self).__init__()

        # Text Model (KoElectra)
        self.text_model = ElectraModel.from_pretrained("monologg/koelectra-base-v3-discriminator")
        self.text_tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")

        # Image Model (MobileNetV2)
        self.image_model = models.mobilenet_v2(pretrained=True)
        self.image_model.classifier = nn.Identity()  # Remove the final fully connected layer

        # Fusion Layer
        fusion_dim = self.text_model.config.hidden_size + self.image_model.last_channel
        self.fusion_layer = nn.Linear(fusion_dim, fusion_dim)

        # Output Layer
        self.output_layer = nn.Linear(fusion_dim, num_classes)

    def forward(self, text_inputs, image_inputs):
        # Text Encoding
        text_input_ids = self.text_tokenizer.batch_encode_plus(
            text_inputs,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        )["input_ids"]
        text_outputs = self.text_model(input_ids=text_input_ids)[0][:, 0, :]  # Use the CLS token embedding

        # Image Encoding
        image_outputs = self.image_model.features(image_inputs)
        image_outputs = torch.nn.functional.adaptive_avg_pool2d(image_outputs, 1).reshape(image_outputs.size(0), -1)

        # Fusion
        fusion_inputs = torch.cat((text_outputs, image_outputs), dim=1)
        fused_outputs = self.fusion_layer(fusion_inputs)

        # Output
        logits = self.output_layer(fused_outputs)

        return logits



In [None]:
# Initialize the multi-modal model
num_classes = 170  # Specify the number of output classes
model = MultiModalModel(num_classes)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = criterion.to(device)

for epoch in range(num_epochs):
    for texts, images, labels in data_loader:
        texts = list(texts)
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(texts, images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    # Print the loss for every epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

# After training, you can use the model for predictions on new data

# Multimodal (KoElectra + MobileNet2) -> train, val, and test

In [None]:
# -- 필요한 라이브러리 설치 --
!pip install transformers
!pip install Keras-Preprocessing

In [None]:
# -- 필요한 데이터 --
pd.read_csv('/content/drive/MyDrive/')

In [None]:
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from transformers import ElectraTokenizer, ElectraModel
from torchvision.models import mobilenet_v2
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import Image

# Custom Dataset
class MultiModalDataset(Dataset):
    def __init__(self, text_data, image_data, labels, text_tokenizer, transform):
        self.text_data = text_data
        self.image_data = image_data
        self.labels = labels
        self.text_tokenizer = text_tokenizer
        self.transform = transform

    def __len__(self):
        return len(self.text_data)

    def __getitem__(self, idx):
        text = self.text_data[idx]
        image = Image.open(self.image_data[idx]).convert("RGB")
        image = self.transform(image)
        label = self.labels[idx]

        return text, image, label

# Split data into train, validation, and test sets
train_data, val_data = train_test_split(df, test_size=0.2, random_state=42)
val_data, test_data = train_test_split(val_data, test_size=0.3, random_state=42)

# Define your data examples (text, image paths, and labels)
text_train_data = list(train_data['product_name'])
text_val_data = list(val_data['product_name'])
text_test_data = list(test_data['product_name'])

image_train_data = list()
image_val_data = list()
image_test_data = list()
for idx in range(len(df)):
    cat_id = df.loc[idx, 'cat_id']
    prd_id = df.loc[idx, 'product_id']
    # -- image_path 조정 필요 --
    image_path = f'data/drive/MyDrive/bungae_fashion_image/{cat_id}/{prd_id}_image.jpg'
    if idx in train_data.index:
        image_train_data.append(image_path)
    elif idx in val_data.index:
        image_val_data.append(image_path)
    else:
        image_test_data.append(image_path)

encoder = LabelEncoder()
df['label'] = encoder.fit_transform(df['cat_id'])
labels_train = df.loc[train_data.index, 'label']
labels_val = df.loc[val_data.index, 'label']
labels_test = df.loc[test_data.index, 'label']

In [None]:
# Initialize the text tokenizer
text_tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-small-v3-discriminator")

# Define transformations for image data
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Create the custom datasets
train_dataset = MultiModalDataset(text_train_data, image_train_data, labels_train, text_tokenizer, image_transform)
val_dataset = MultiModalDataset(text_val_data, image_val_data, labels_val, text_tokenizer, image_transform)
test_dataset = MultiModalDataset(text_test_data, image_test_data, labels_test, text_tokenizer, image_transform)

# Define data loaders
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Define the multi-modal model
class MultiModalModel(nn.Module):
    def __init__(self, num_classes):
        super(MultiModalModel, self).__init__()

        # Text Model (KoElectra)
        self.text_model = ElectraModel.from_pretrained("monologg/koelectra-small-v3-discriminator")
        self.text_tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-small-v3-discriminator")

        # Image Model (MobileNetV2)
        self.image_model = mobilenet_v2(pretrained=True)
        self.image_model.classifier = nn.Identity()  # Remove the final fully connected layer

        # Fusion Layer
        fusion_dim = self.text_model.config.hidden_size + self.image_model.last_channel
        self.fusion_layer = nn.Linear(fusion_dim, fusion_dim)

        # Output Layer
        self.output_layer = nn.Linear(fusion_dim, num_classes)

    def forward(self, text_inputs, image_inputs):
        # Text Encoding
        text_input_ids = self.text_tokenizer.batch_encode_plus(
            text_inputs,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        )["input_ids"]
        text_outputs = self.text_model(input_ids=text_input_ids)[0][:, 0, :]  # Use the CLS token embedding

        # Image Encoding
        image_outputs = self.image_model.features(image_inputs)
        image_outputs = torch.nn.functional.adaptive_avg_pool2d(image_outputs, 1).reshape(image_outputs.size(0), -1)

        # Fusion
        fusion_inputs = torch.cat((text_outputs, image_outputs), dim=1)
        fused_outputs = self.fusion_layer(fusion_inputs)

        # Output
        logits = self.output_layer(fused_outputs)

        return logits

In [None]:
# Initialize the multi-modal model
num_classes = 170  # Specify the number of output classes
model = MultiModalModel(num_classes)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = criterion.to(device)

train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0
    train_correct = 0
    train_total = 0

    for texts, images, labels in train_loader:
        texts = list(texts)
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(texts, images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        train_total += labels.size(0)
        train_correct += predicted.eq(labels).sum().item()

    train_loss /= len(train_loader)
    train_accuracy = 100.0 * train_correct / train_total

    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)

    # Validation
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for texts, images, labels in val_loader:
            texts = list(texts)
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(texts, images)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, predicted = outputs.max(1)
            val_total += labels.size(0)
            val_correct += predicted.eq(labels).sum().item()

    val_loss /= len(val_loader)
    val_accuracy = 100.0 * val_correct / val_total

    val_losses.append(val_loss)
    val_accuracies.append(val_accuracy)

    # Print the loss and accuracy for every epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

# Multimodal with data augumentation and dropout (KoElectra + MobileNet2)

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from transformers import ElectraTokenizer, ElectraModel
from PIL import Image


# Custom Dataset
class MultiModalDataset(Dataset):
    def __init__(self, text_data, image_data, labels, text_tokenizer, transform=None):
        self.text_data = text_data
        self.image_data = image_data
        self.labels = labels
        self.text_tokenizer = text_tokenizer
        self.transform = transform

    def __len__(self):
        return len(self.text_data)

    def __getitem__(self, idx):
        text = self.text_data[idx]
        image = Image.open(self.image_data[idx]).convert("RGB")

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]

        return text, image, label


# Define your data examples (text, image paths, and labels)
text_data = list(df['product_name'])

image_data = list()
for idx in range(len(df)):
    cat_id = df.loc[idx, 'cat_id']
    prd_id = df.loc[idx, 'product_id']
    image_path = f'data/drive/MyDrive/bungae_fashion_image/{cat_id}/{prd_id}_image.jpg'
    image_data.append(image_path)

encoder = LabelEncoder()
df['label'] = encoder.fit_transform(df['cat_id'])
labels = df['label']

# Initialize the text tokenizer
text_tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")

# Define data augmentation transforms
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Create the custom dataset with data augmentation
dataset = MultiModalDataset(text_data, image_data, labels, text_tokenizer, transform=image_transform)


In [None]:
# Split the dataset into train, validation, and test sets
train_size = int(0.8 * len(dataset))
val_size = (len(dataset) - train_size) // 2
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

# Define data loaders for train, validation, and test sets
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [None]:
# Define the multi-modal model
class MultiModalModel(nn.Module):
    def __init__(self, num_classes, dropout_rate=0.5):
        super(MultiModalModel, self).__init__()

        # Text Model (KoElectra)
        self.text_model = ElectraModel.from_pretrained("monologg/koelectra-base-v3-discriminator")
        self.text_tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")

        # Image Model (MobileNetV2)
        self.image_model = models.mobilenet_v2(pretrained=True)
        self.image_model.classifier = nn.Identity()  # Remove the final fully connected layer

        # Fusion Layer
        fusion_dim = self.text_model.config.hidden_size + self.image_model.last_channel
        self.fusion_layer = nn.Linear(fusion_dim, fusion_dim)

        # Output Layer
        self.output_layer = nn.Linear(fusion_dim, num_classes)

        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, text_inputs, image_inputs):
        # Text Encoding
        text_input_ids = self.text_tokenizer.batch_encode_plus(
            text_inputs,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        )["input_ids"]
        text_outputs = self.text_model(input_ids=text_input_ids)[0][:, 0, :]  # Use the CLS token embedding

        # Image Encoding
        image_outputs = self.image_model.features(image_inputs)
        image_outputs = torch.nn.functional.adaptive_avg_pool2d(image_outputs, 1).reshape(image_outputs.size(0), -1)

        # Fusion
        fusion_inputs = torch.cat((text_outputs, image_outputs), dim=1)
        fused_outputs = self.fusion_layer(fusion_inputs)

        # Dropout
        fused_outputs = self.dropout(fused_outputs)

        # Output
        logits = self.output_layer(fused_outputs)

        return logits


# Initialize the multi-modal model
num_classes = 170  # Specify the number of output classes
model = MultiModalModel(num_classes)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [None]:
# Training loop
num_epochs = 10

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = criterion.to(device)

for epoch in range(num_epochs):
    train_loss = []
    train_correct = 0
    train_total = 0

    model.train()

    for texts, images, labels in train_loader:
        texts = list(texts)
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(texts, images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Track training loss
        train_loss.append(loss.item())

        # Track training accuracy
        _, predicted = outputs.max(1)
        train_total += labels.size(0)
        train_correct += predicted.eq(labels).sum().item()

    # Calculate average training loss
    avg_train_loss = sum(train_loss) / len(train_loss)

    # Calculate training accuracy
    train_accuracy = 100.0 * train_correct / train_total

    # Validation loop
    val_loss = []
    val_correct = 0
    val_total = 0

    model.eval()

    with torch.no_grad():
        for texts, images, labels in val_loader:
            texts = list(texts)
            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(texts, images)
            loss = criterion(outputs, labels)

            # Track validation loss
            val_loss.append(loss.item())

            # Track validation accuracy
            _, predicted = outputs.max(1)
            val_total += labels.size(0)
            val_correct += predicted.eq(labels).sum().item()

    # Calculate average validation loss
    avg_val_loss = sum(val_loss) / len(val_loss)

    # Calculate validation accuracy
    val_accuracy = 100.0 * val_correct / val_total

    # Print the loss and accuracy for every epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

# After training, you can use the model for predictions on new data


# with SMOTE

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from transformers import ElectraTokenizer, ElectraModel
from PIL import Image
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler


# Custom Dataset
class MultiModalDataset(Dataset):
    def __init__(self, text_data, image_data, labels, text_tokenizer, transform=None):
        self.text_data = text_data
        self.image_data = image_data
        self.labels = labels
        self.text_tokenizer = text_tokenizer
        self.transform = transform

    def __len__(self):
        return len(self.text_data)

    def __getitem__(self, idx):
        text = self.text_data[idx]
        image = Image.open(self.image_data[idx]).convert("RGB")

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]

        return text, image, label


# Define your data examples (text, image paths, and labels)
text_data = list(df['product_name'])

image_data = list()
for idx in range(len(df)):
    cat_id = df.loc[idx, 'cat_id']
    prd_id = df.loc[idx, 'product_id']
    image_path = f'data/drive/MyDrive/bungae_fashion_image/{cat_id}/{prd_id}_image.jpg'
    image_data.append(image_path)

encoder = LabelEncoder()
df['label'] = encoder.fit_transform(df['cat_id'])
labels = df['label']

# Apply SMOTE for oversampling
oversampler = SMOTE()
text_data, image_data, labels = oversampler.fit_resample(text_data, image_data, labels)

# Apply RandomUnderSampler for undersampling
undersampler = RandomUnderSampler()
text_data, image_data, labels = undersampler.fit_resample(text_data, image_data, labels)

# Initialize the text tokenizer
text_tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")

# Define data augmentation transforms
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Create the custom dataset with data augmentation
dataset = MultiModalDataset(text_data, image_data, labels, text_tokenizer, transform=image_transform)

# Split the dataset into train, validation, and test sets
train_size = int(0.8 * len(dataset))
val_size = (len(dataset) - train_size) // 2
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

# Define data loaders for train, validation, and test sets
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define the multi-modal model
class MultiModalModel(nn.Module):
    def __init__(self, num_classes, dropout_rate=0.5):
        super(MultiModalModel, self).__init__()

        # Text Model (KoElectra)
        self.text_model = ElectraModel.from_pretrained("monologg/koelectra-base-v3-discriminator")
        self.text_tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")

        # Image Model (MobileNetV2)
        self.image_model = models.mobilenet_v2(pretrained=True)
        self.image_model.classifier = nn.Identity()  # Remove the final fully connected layer

        # Fusion Layer
        fusion_dim = self.text_model.config.hidden_size + self.image_model.last_channel
        self.fusion_layer = nn.Linear(fusion_dim, fusion_dim)

        # Output Layer
        self.output_layer = nn.Linear(fusion_dim, num_classes)

        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, text_inputs, image_inputs):
        # Text Encoding
        text_input_ids = self.text_tokenizer.batch_encode_plus(
            text_inputs,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        )["input_ids"]
        text_outputs = self.text_model(input_ids=text_input_ids)[0][:, 0, :]  # Use the CLS token embedding

        # Image Encoding
        image_outputs = self.image_model.features(image_inputs)
        image_outputs = torch.nn.functional.adaptive_avg_pool2d(image_outputs, 1).reshape(image_outputs.size(0), -1)

        # Fusion
        fusion_inputs = torch.cat((text_outputs, image_outputs), dim=1)
        fused_outputs = self.fusion_layer(fusion_inputs)

        # Dropout
        fused_outputs = self.dropout(fused_outputs)

        # Output
        logits = self.output_layer(fused_outputs)

        return logits


# Initialize the multi-modal model
num_classes = 170  # Specify the number of output classes
model = MultiModalModel(num_classes)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = criterion.to(device)

for epoch in range(num_epochs):
    train_loss = []
    train_correct = 0
    train_total = 0

    model.train()

    for texts, images, labels in train_loader:
        texts = list(texts)
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(texts, images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Track training loss
        train_loss.append(loss.item())

        # Track training accuracy
        _, predicted = outputs.max(1)
        train_total += labels.size(0)
        train_correct += predicted.eq(labels).sum().item()

    # Calculate average training loss
    avg_train_loss = sum(train_loss) / len(train_loss)

    # Calculate training accuracy
    train_accuracy = 100.0 * train_correct / train_total

    # Validation loop
    val_loss = []
    val_correct = 0
    val_total = 0

    model.eval()

    with torch.no_grad():
        for texts, images, labels in val_loader:
            texts = list(texts)
            images = images.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(texts, images)
            loss = criterion(outputs, labels)

            # Track validation loss
            val_loss.append(loss.item())

            # Track validation accuracy
            _, predicted = outputs.max(1)
            val_total += labels.size(0)
            val_correct += predicted.eq(labels).sum().item()

    # Calculate average validation loss
    avg_val_loss = sum(val_loss) / len(val_loss)

    # Calculate validation accuracy
    val_accuracy = 100.0 * val_correct / val_total

    # Print the loss and accuracy for every epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%, Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

# After training, you can use the model for predictions on new data


In [None]:
# -- version 2 --

In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from transformers import ElectraTokenizer, ElectraModel
from PIL import Image
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler


# Custom Dataset
class MultiModalDataset(Dataset):
    def __init__(self, text_data, image_data, labels, text_tokenizer, transform=None):
        self.text_data = text_data
        self.image_data = image_data
        self.labels = labels
        self.text_tokenizer = text_tokenizer
        self.transform = transform

    def __len__(self):
        return len(self.text_data)

    def __getitem__(self, idx):
        text = self.text_data[idx]
        image = Image.open(self.image_data[idx]).convert("RGB")

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]

        return text, image, label


# Define your data examples (text, image paths, and labels)
text_data = list(df['product_name'])
image_data = list()

for idx in range(len(df)):
    cat_id = df.loc[idx, 'cat_id']
    prd_id = df.loc[idx, 'product_id']
    image_path = f'data/drive/MyDrive/bungae_fashion_image/{cat_id}/{prd_id}_image.jpg'
    image_data.append(image_path)

encoder = LabelEncoder()
df['label'] = encoder.fit_transform(df['cat_id'])
labels = df['label']

# Initialize the text tokenizer
text_tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")

# Define data augmentation transforms
image_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# Create the custom dataset with data augmentation
dataset = MultiModalDataset(text_data, image_data, labels, text_tokenizer, transform=image_transform)

# Split the dataset into train, validation, and test sets
train_size = int(0.8 * len(dataset))
val_size = (len(dataset) - train_size) // 2
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

# Apply SMOTE and RandomUnderSampler to balance the dataset
smote = SMOTE(sampling_strategy='auto', random_state=42)
under_sampler = RandomUnderSampler(sampling_strategy='auto', random_state=42)

train_text = [sample[0] for sample in train_dataset]
train_images = torch.stack([sample[1] for sample in train_dataset])
train_labels = torch.tensor([sample[2] for sample in train_dataset])

train_text_reshape = [text.reshape(-1, 1) for text in train_text]
train_text_resampled, train_labels_resampled = smote.fit_resample(torch.cat(train_text_reshape), train_labels)
train_text_resampled, train_labels_resampled = under_sampler.fit_resample(train_text_resampled, train_labels_resampled)

train_text_resampled = [text.flatten().tolist() for text in train_text_resampled]
train_labels_resampled = train_labels_resampled.tolist()

# Convert the resampled data back to tensors
train_text_resampled = [torch.tensor(text) for text in train_text_resampled]
train_text_resampled = [text.squeeze() for text in train_text_resampled]
train_text_resampled = [text.tolist() for text in train_text_resampled]
train_images_resampled = train_images[train_labels_resampled]
train_labels_resampled = torch.tensor(train_labels_resampled)

# Reconstruct the resampled dataset
train_dataset_resampled = [(text, image, label) for text, image, label in zip(train_text_resampled, train_images_resampled, train_labels_resampled)]

# Define data loaders for train, validation, and test sets
batch_size = 8
train_loader = DataLoader(train_dataset_resampled, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define the multi-modal model
class MultiModalModel(nn.Module):
    def __init__(self, num_classes, dropout_rate=0.5):
        super(MultiModalModel, self).__init__()

        # Text Model (KoElectra)
        self.text_model = ElectraModel.from_pretrained("monologg/koelectra-base-v3-discriminator")
        self.text_tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")

        # Image Model (MobileNetV2)
        self.image_model = models.mobilenet_v2(pretrained=True)
        self.image_model.classifier = nn.Identity()  # Remove the final fully connected layer

        # Fusion Layer
        fusion_dim = self.text_model.config.hidden_size + self.image_model.last_channel
        self.fusion_layer = nn.Linear(fusion_dim, fusion_dim)

        # Output Layer
        self.output_layer = nn.Linear(fusion_dim, num_classes)

        # Dropout
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, text_inputs, image_inputs):
        # Text Encoding
        text_input_ids = self.text_tokenizer.batch_encode_plus(
            text_inputs,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        )["input_ids"]
        text_outputs = self.text_model(input_ids=text_input_ids)[0][:, 0, :]  # Use the CLS token embedding

        # Image Encoding
        image_outputs = self.image_model.features(image_inputs)
        image_outputs = torch.nn.functional.adaptive_avg_pool2d(image_outputs, 1).reshape(image_outputs.size(0), -1)

        # Fusion
        fusion_inputs = torch.cat((text_outputs, image_outputs), dim=1)
        fused_outputs = self.fusion_layer(fusion_inputs)

        # Dropout
        fused_outputs = self.dropout(fused_outputs)

        # Output
        logits = self.output_layer(fused_outputs)

        return logits


# Initialize the multi-modal model
num_classes = 170  # Specify the number of output classes
model = MultiModalModel(num_classes)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = criterion.to(device)

for epoch in range(num_epochs):
    train_loss = 0.0
    model.train()

    for text, image, label in train_loader:
        text = text.to(device)
        image = image.to(device)
        label = label.to(device)

        optimizer.zero_grad()

        logits = model(text, image)
        loss = criterion(logits, label)

        loss.backward()
        optimizer.step()

        train_loss += loss.item() * text.size(0)

    train_loss /= len(train_loader.dataset)

    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {train_loss:.4f}")
