In [None]:
### Thay file path
file_input_path = "butbi_cleaned_250624.xlsx"
base_image_input = "images_butbi"
model_save_path = "model_classification_butbi"
num_classes = "num_classes.txt"
file_output_after_download_images_test = "output.xlsx"
otput_file = "but_bi_2008_AI_AT.xlsx"
output_file_path_name = "VIT_AI_butbi.xlsx"

In [None]:
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
from io import BytesIO
import requests
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
from transformers import ViTFeatureExtractor, ViTModel




In [None]:
#### Bước 1: Tải ảnh (thay file path + thay tên folder ảnh + thay cột brand_gop(nếu cần) )
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
from io import BytesIO
import requests
from tqdm import tqdm
import transformers
# Đọc dữ liệu huấn luyện
file_path = file_input_path
data = pd.read_excel(file_path)

# Tạo thư mục lưu trữ ảnh và các thư mục con tương ứng với thương hiệu
base_image_folder = base_image_input
os.makedirs(base_image_folder, exist_ok=True)

# Tải ảnh xuống các thư mục con tương ứng với thương hiệu
def download_images(df, base_image_folder):
    def fetch_image(idx, row):
        img_url = row['url_thumbnail']
        brand = row['brand_clean'].replace(' ', '_').lower()
        brand_folder = os.path.join(base_image_folder, brand)
        os.makedirs(brand_folder, exist_ok=True)
        try:
            response = requests.get(img_url)
            response.raise_for_status()  # Kiểm tra nếu yêu cầu không thành công
            img = Image.open(BytesIO(response.content)).convert("RGB")
            img.save(os.path.join(brand_folder, f"{idx}.jpg"))
        except Exception as e:
            print(f"Error downloading image at index {idx}: {e}")

    with ThreadPoolExecutor(max_workers=30) as executor:
        list(tqdm(executor.map(lambda idx: fetch_image(idx, df.iloc[idx]), range(len(df))), total=len(df), desc='Downloading Images'))

download_images(data, base_image_folder)


In [None]:
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
from io import BytesIO
import requests
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
from transformers import ViTFeatureExtractor, ViTModel
from transformers import AutoImageProcessor, AutoModel



# Đọc dữ liệu huấn luyện
data = pd.read_excel(file_input_path)

def normalized(text):
    if text is None:
        return ''
    elif type(text) is not str:
        return str(text)
    else:
        return text.lower().strip()

data['brand_clean'] = data['brand_clean'].apply(normalized)

# Chuẩn bị dữ liệu
class CustomDataset(Dataset):
    def __init__(self, dataframe, folder_image_path: str, transform=None):
        self.data = dataframe
        self.transform = transform
        self.folder_image_path = folder_image_path
        self.classes = sorted(self.data['brand_clean'].unique())
        self.class_to_idx = {cls.replace(' ', '_').lower(): idx for idx, cls in enumerate(self.classes)}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        brand = self.data.iloc[idx]['brand_clean'].replace(' ', '_').lower()
        image_path = os.path.join(self.folder_image_path, brand, f"{self.data.index[idx]}.jpg")
        if not os.path.exists(image_path):
            return torch.zeros((3, 224, 224)), -1

        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            print(f"Error loading image at {image_path}: {e}")
            return torch.zeros((3, 224, 224)), -1
        
        label_idx = self.class_to_idx[brand]

        if self.transform:
            image = self.transform(image)
        return image, label_idx

print('Preparing dataset...')
df = data[(data['brand_clean'].notnull())]
df['brand_clean'] = df['brand_clean'].str.replace(' ', '_').str.lower()

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

custom_dataset = CustomDataset(dataframe=df, transform=transform, folder_image_path=base_image_input)
print(f'Loaded full dataset with {len(custom_dataset)} samples.')

# Extract all indices from the DataLoader
indices = list(range(len(custom_dataset)))

# Split indices into train, valid, and test sets
if len(indices) == 0:
    raise ValueError("No valid indices found. Ensure that the dataset is not empty.")

train_indices, remaining_indices = train_test_split(indices, test_size=0.3, random_state=42)
if len(train_indices) == 0:
    raise ValueError("Train set is empty after splitting. Adjust test_size or ensure more data is available.")

valid_indices, test_indices = train_test_split(remaining_indices, test_size=0.5, random_state=42)

# Check number of samples in each split
print(f'Number of train samples: {len(train_indices)}')
print(f'Number of valid samples: {len(valid_indices)}')
print(f'Number of test samples: {len(test_indices)}')

# Create Subset and DataLoader for each set
BATCH_SIZE = 6

# Lọc các phần tử không hợp lệ trước khi tạo DataLoader
def collate_fn(batch):
    batch = list(filter(lambda x: x[1] != -1, batch))
    if len(batch) == 0:
        return torch.zeros((0, 3, 224, 224)), torch.zeros((0,), dtype=torch.long)
    return torch.utils.data.dataloader.default_collate(batch)

train_loader = DataLoader(Subset(custom_dataset, train_indices), batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(Subset(custom_dataset, valid_indices), batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(Subset(custom_dataset, test_indices), batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)

print("Number of train samples: ", len(train_indices))
print("Number of valid samples: ", len(valid_indices))
print("Number of test samples: ", len(test_indices))
print("Detected Classes are: ", len(custom_dataset.classes), "labels")

# google/vit-huge-patch14-224-in21k
# google/vit-base-patch16-224
# Define the model class
class ViTForImageClassification2(nn.Module):
    def __init__(self, num_labels=10):
        super(ViTForImageClassification2, self).__init__()
        # self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224')
        self.vit = ViTModel.from_pretrained("google/vit-large-patch16-224-in21k")

        self.classifier = nn.Linear(self.vit.config.hidden_size, num_labels)
        self.num_labels = num_labels


    def forward(self, images):
        outputs = self.vit(pixel_values=images)
        logits = self.classifier(outputs.last_hidden_state[:, 0])
        return logits

    
    


# Initialize model
model = ViTForImageClassification2(num_labels=len(custom_dataset.classes))
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("device",device)
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=2e-5)
criterion = nn.CrossEntropyLoss()

# Function to calculate accuracy
def calculate_accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    corrects = (preds == labels).sum().item()
    return corrects / len(labels)


best_val_loss  = 999
# Train the model
EPOCHS = 6
for epoch in tqdm(range(EPOCHS),"Processs..."):
    model.train()
    running_loss = 0.0
    running_corrects = 0
    for images, labels in train_loader:
        if len(images) == 0:
            continue
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        running_corrects += calculate_accuracy(outputs, labels) * len(labels)
    
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = running_corrects / len(train_indices)
    
    val_loss = 0.0
    val_corrects = 0
    model.eval()
    with torch.no_grad():
        for images, labels in valid_loader:
            if len(images) == 0:
                continue
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            val_corrects += calculate_accuracy(outputs, labels) * len(labels)
    
    val_loss = val_loss / len(valid_loader)
    val_acc = val_corrects / len(valid_indices)
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        # # Lưu mô hình
        torch.save(model.state_dict(), model_save_path)
        print(f"Model saved at {model_save_path}")
        num_classes = len(custom_dataset.classes)
        with open("num_classes.txt", "w") as f:
            f.write(str(num_classes))
        print(f"Number of classes saved at num_classes.txt")


    
    print(f'Epoch {epoch + 1}/{EPOCHS}, Training Loss: {epoch_loss:.4f}, Training Accuracy: {epoch_acc:.4f}, Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}')



In [None]:
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
from io import BytesIO
import requests
from tqdm import tqdm
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import torch
from transformers import ViTForImageClassification, ViTConfig

# Đọc dữ liệu dự đoán
test_file_path = otput_file
test_data = pd.read_excel(test_file_path)

# Tạo thư mục lưu trữ ảnh
test_image_folder = "imgaes_base_output"
os.makedirs(test_image_folder, exist_ok=True)

# Hàm tải ảnh
def download_image(row):
    img_url = row['url_thumbnail']
    img_id = str(row['product_base_id'])  # Sử dụng trường "id" để đặt tên tệp ảnh
    img_path = os.path.join(test_image_folder, f"{img_id}.jpg")
    try:
        response = requests.get(img_url)
        response.raise_for_status()
        img = Image.open(BytesIO(response.content)).convert("RGB")
        img.save(img_path)
        return img_path
    except Exception as e:
        print(f"Error downloading image {img_url}: {e}")
        return None

# Tải ảnh xuống thư mục
print("Downloading test images...")
with ThreadPoolExecutor(max_workers=10) as executor:
    image_paths = list(tqdm(executor.map(download_image, [row for _, row in test_data.iterrows()]), total=len(test_data), desc='Downloading Test Images'))

# Loại bỏ các hàng không tải được ảnh
test_data['image_path'] = image_paths
test_data = test_data.dropna(subset=['image_path']).reset_index(drop=True)

print("Finished downloading test images.")

# Tạo lớp Dataset cho dữ liệu test
class TestBrandDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.iloc[idx]['image_path']
        
        if not os.path.exists(img_path):
            raise FileNotFoundError(f"File not found: {img_path}")
        
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img


## Save test data 

test_data.to_excel(file_output_after_download_images_test)


In [None]:
import os
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
from io import BytesIO
import requests
from tqdm import tqdm
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import torch
from transformers import ViTForImageClassification, ViTConfig


test_data = pd.read_excel(file_output_after_download_images_test)


class CustomDataset(Dataset):
    def __init__(self, dataframe, folder_image_path: str, transform=None):
        self.data = dataframe
        self.transform = transform
        self.folder_image_path = folder_image_path
        self.classes = sorted(self.data['brand_clean'].unique())
        self.class_to_idx = {cls.replace(' ', '_').lower(): idx for idx, cls in enumerate(self.classes)}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        brand = self.data.iloc[idx]['brand_clean'].replace(' ', '_').lower()
        image_path = os.path.join(self.folder_image_path, brand, f"{self.data.index[idx]}.jpg")
        if not os.path.exists(image_path):
            return torch.zeros((3, 224, 224)), -1

        try:
            image = Image.open(image_path).convert("RGB")
        except Exception as e:
            print(f"Error loading image at {image_path}: {e}")
            return torch.zeros((3, 224, 224)), -1
        
        label_idx = self.class_to_idx[brand]

        if self.transform:
            image = self.transform(image)
        return image, label_idx
    
class TestBrandDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.df.iloc[idx]['image_path']
        
        if not os.path.exists(img_path):
            raise FileNotFoundError(f"File not found: {img_path}")
        
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


# Chuẩn bị dữ liệu test
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])
test_dataset = TestBrandDataset(test_data, transform=test_transform)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Kiểm tra xem có GPU hay không
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Sử dụng số lượng lớp từ custom_dataset
# num_labels = len(custom_dataset.classes)

with open("num_classes.txt", "r") as f:
    num_classes = int(f.read().strip())
print(f"Number of classes loaded: {num_classes}")

# google/vit-base-patch16-224
# google/vit-large-patch16-224-in21k
# Khởi tạo cấu hình mô hình
config = ViTConfig.from_pretrained('google/vit-large-patch16-224-in21k', num_labels=num_classes)

# Tạo và dự đoán với mô hình đã huấn luyện
model = ViTForImageClassification.from_pretrained('google/vit-large-patch16-224-in21k', config=config)

# Tải trạng thái mô hình đã lưu
state_dict = torch.load(model_save_path)

# Lọc các khóa không mong muốn
filtered_state_dict = {k: v for k, v in state_dict.items() if k in model.state_dict()}

custom_dataset = CustomDataset(dataframe=df, transform=transform, folder_image_path=base_image_input)


model.load_state_dict(filtered_state_dict, strict=False)
model.to(device)
model.eval()

predictions = []
scores = []
print("Predicting...")
with torch.no_grad():
    for images in tqdm(test_dataloader, desc='Predicting'):
        images = images.to(device)
        outputs = model(images).logits
        preds = torch.argmax(outputs, dim=1)
        predictions.extend(preds.cpu().numpy())
        scores.extend(torch.softmax(outputs, dim=1).max(dim=1).values.cpu().numpy())

# Giải mã nhãn và lưu kết quả
decoded_predictions = [str(custom_dataset.classes[pred]).replace("_"," ") for pred in predictions]

test_data['predicted_brand'] = decoded_predictions
test_data['score'] = scores
print("Export files")
output_file_path = output_file_path_name
test_data.to_excel(output_file_path, index=False)
print(f"Kết quả dự đoán đã được lưu vào '{output_file_path}'")