In [1]:
# Preprocessing

In [None]:
import os
import json
import numpy as np
from PIL import Image
import cv2
from cv2 import dnn_superres
from torch import nn
from transformers import Trainer, TrainingArguments, ViTForImageClassification, ViTImageProcessor
import time
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from torch.utils.data import Dataset
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.preprocessing import LabelEncoder
from sklearn import preprocessing
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.python.keras.callbacks import ModelCheckpoint
from datasets import load_metric


In [None]:
# Initialize empty datasets
X_train = []
y_train = []
X_test = []
y_test = []

# Set train-test split ratio
split = 0.90

# Path to training data
path = "data/train"

# Load the super-resolution model for image upscaling
sr = dnn_superres.DnnSuperResImpl_create()
path_ = "FSRCNN_x3.pb"
sr.readModel(path_)
sr.setModel("fsrcnn", 3)

# Preprocess images and save them in JSON format
json_format = {}
for ind, folder in enumerate(os.listdir(path)):
    images_list = []
    for image_name in os.listdir(os.path.join(path, folder)):
        # Open and process each image
        image = Image.open(os.path.join(path, folder, image_name)).convert('RGB')
        image = sr.upsample(np.asarray(image))
        image = cv2.fastNlMeansDenoisingColored(image, None, 7, 21, 16, 16)
        image = Image.fromarray(image)
        image = image.resize((224, 224))
        img_array = np.asarray(image).reshape((224, 224, 3))
        images_list.append(img_array.tolist())
    json_format[folder] = images_list

# Save preprocessed images to a JSON file
json.dump(json_format, open("full_data.json", "w"))

In [None]:
# Train VIT Models

In [None]:
# Load the ViT image processor
model_name_or_path = 'google/vit-base-patch16-224-in21k'
feature_extractor = ViTImageProcessor.from_pretrained(model_name_or_path)

# Load preprocessed data
with open('data/full_data.json') as f:
    json_load = json.load(f)

# Train the ViT models in a loop
for clf in range(0, 100):
    print(f"Training classifier {clf}")
    start_time = time.time()

    # Prepare train and test datasets
    x_train, y_train, x_test, y_test = [], [], [], []

    for data_class in list(json_load.keys()):
        # Determine the limit for training data using a gamma distribution
        # every model will have a different distribution of class images and therefore different strenghts at predicting specific classes
        limit = 0
        while limit < 10:
            limit = int((np.random.gamma(4, 2) + 2) * 2)

        data = np.asarray(json_load[data_class])
        if limit > len(data) - 1:
            limit = len(data) - 1

        np.random.shuffle(data)
        train = data[:limit]
        test = data[limit:]

        x_train.extend(train)
        y_train.extend([data_class] * len(train))
        x_test.extend(test)
        y_test.extend([data_class] * len(test))

    print(f"Train data shape: {np.array(x_train).shape}")
    print(f"Test data shape: {np.array(x_test).shape}")

    # Convert train and test data into DataFrames
    train_df = pd.DataFrame(
        {"pixel_values": [Image.fromarray(img.astype('uint8'), 'RGB') for img in x_train], "label": y_train})
    test_df = pd.DataFrame(
        {"pixel_values": [Image.fromarray(img.astype('uint8'), 'RGB') for img in x_test], "label": y_test})

    # Class labels
    labels = os.listdir("data/train")


    # Custom PyTorch Dataset with data augmentation
    class CustomDatasetAug(Dataset):
        def __init__(self, images, labels):
            self.images = images
            self.labels = labels
            self.datagen = ImageDataGenerator(
                rotation_range=20,
                zoom_range=0.15,
                horizontal_flip=True,
                width_shift_range=0.15,
                height_shift_range=0.15
            )

        def __len__(self):
            return len(self.labels)

        def __getitem__(self, idx):
            image = self.images[idx]
            label = self.labels[idx]
            augmented_img = self.datagen.random_transform(np.array(image))
            augmented_img = Image.fromarray(augmented_img)
            sample = {"image": feature_extractor(augmented_img), "label": label}
            return sample


    # Custom PyTorch Dataset without data augmentation
    class CustomDataset(Dataset):
        def __init__(self, images, labels):
            self.images = images
            self.labels = labels

        def __len__(self):
            return len(self.labels)

        def __getitem__(self, idx):
            image = self.images[idx]
            label = self.labels[idx]
            sample = {"image": feature_extractor(image), "label": label}
            return sample


    # Instantiate datasets
    ds_train = CustomDatasetAug(train_df['pixel_values'], train_df['label'])
    ds_test = CustomDataset(test_df['pixel_values'], test_df['label'])


    # Custom Trainer with weighted loss for handling class imbalance
    class CustomTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            labels = inputs.get("labels")
            outputs = model(**inputs)
            logits = outputs.get("logits")
            weights = torch.tensor([0.7, 0.5, 0.5, 0.5, 2.0, 0.5, 0.5, 2.0, 1.0, 0.5, 0.5, 0.5]).cuda()
            loss_fn = nn.CrossEntropyLoss(weight=weights)
            loss = loss_fn(logits.view(-1, model.config.num_labels), labels.view(-1))
            return (loss, outputs) if return_outputs else loss


    # Compute accuracy metric
    metric = load_metric("accuracy")


    def compute_metrics(p):
        return metric.compute(predictions=np.argmax(p.predictions, axis=1), references=p.label_ids)


    # Data collator for batching
    def collate_fn(batch):
        all_imgs = torch.stack([torch.tensor(x['image']["pixel_values"][0]) for x in batch])
        return {
            'pixel_values': all_imgs,
            'labels': torch.tensor([labels.index(x['label']) for x in batch])
        }


    # Model training configuration
    model = ViTForImageClassification.from_pretrained(
        model_name_or_path,
        num_labels=len(labels),
        id2label={str(i): c for i, c in enumerate(labels)},
        label2id={c: str(i) for i, c in enumerate(labels)}
    )

    training_args = TrainingArguments(
        output_dir=f"./vit_bagging/clf_{clf}",
        evaluation_strategy="steps",
        num_train_epochs=10,
        save_steps=100,
        eval_steps=100,
        logging_steps=10,
        learning_rate=1e-4,
        save_total_limit=2,
        lr_scheduler_type="cosine_with_restarts",
        remove_unused_columns=False,
        load_best_model_at_end=True,
        save_strategy="steps",
        auto_find_batch_size=True,
        gradient_checkpointing=True
    )

    # Train the model
    trainer = CustomTrainer(
        model=model,
        args=training_args,
        data_collator=collate_fn,
        compute_metrics=compute_metrics,
        train_dataset=ds_train,
        eval_dataset=ds_test,
        tokenizer=feature_extractor
    )

    train_results = trainer.train()
    trainer.save_model()
    trainer.log_metrics("train", train_results.metrics)
    trainer.save_metrics("train", train_results.metrics)
    trainer.save_state()

    # Evaluate the model
    metrics = trainer.evaluate(ds_test)
    trainer.log_metrics("eval", metrics)
    trainer.save_metrics("eval", metrics)

    print(f"--- {time.time() - start_time} seconds ---")


In [None]:
# Predict on Train Data

In [None]:
# Load the pre-trained ViT model and feature extractor
model_name_or_path = 'google/vit-base-patch16-224-in21k'
feature_extractor = ViTImageProcessor.from_pretrained(model_name_or_path)

# Load the dataset
with open('data/full_data.json') as f:
    json_load = json.load(f)

print("Loaded data")

imgs = []
labels = []

# Loop through each class in the dataset
for data_class in list(json_load.keys()):
    data = np.asarray(json_load[data_class])
    
    if len(data) > 0:
        for i in data:
            # Extract pixel values using the feature extractor
            a = feature_extractor(i)["pixel_values"][0]
            imgs.append(a)
    
    # Create a label list for each class
    label_list = [data_class for _ in range(len(data))]
    labels.extend(label_list)

print(f"Total labels: {len(labels)}")

# Create a folder path for bagging models
folder = "vit_bagging"
print("Preprocessed")

all_results = []

# Loop through each model in the bagging folder
for model_path in os.listdir(folder):
    print(f"Processing model: {model_path}")
    r = []

    # Process images in batches of 40
    for i in range(40, len(imgs), 40):
        img_batch = imgs[i-40:i]
        model = ViTForImageClassification.from_pretrained(os.path.join(folder, model_path), local_files_only=True).cuda()
        r.extend(model(torch.tensor(np.asarray(img_batch)).cuda()).logits.tolist())
        l = i

    # Process the remaining images
    img_batch = imgs[l:]
    model = ViTForImageClassification.from_pretrained(os.path.join(folder, model_path), local_files_only=True).cuda()
    r.extend(model(torch.tensor(np.asarray(img_batch)).cuda()).logits.tolist())

    all_results.append(r)

# Save the results in a JSON file
json_format = {
    "res": all_results,
    "labels": labels
}

with open("bagging_results.json", "w") as f:
    json.dump(json_format, f)

In [None]:
# Train Stacking Model

In [None]:
# Load the bagging results
with open('bagging_results.json') as f:
    json_load = json.load(f)

data = np.asarray(json_load["res"])
labels = np.asarray(json_load["labels"])

# Reshape data for stacking
a = []
for i in range(data.shape[1]):
    a.append([j[i] for j in data])

X = np.array(a)
print(f"Shape of data for stacking: {X.shape}")

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, np.array(labels), test_size=0.2, random_state=1)

# Encode labels to integers
le = LabelEncoder()
le.fit(list(os.listdir("data/train")))

print(f"Label classes: {le.classes_}")

y_train = le.transform(y_train)
y_test = le.transform(y_test)

# Reshape data for LSTM model input
X_train = X_train.reshape((2386, 98, 12))
X_test = X_test.reshape((597, 98, 12))

# Define the LSTM model
model = models.Sequential([
    keras.Input(shape=(98, 12)),
    layers.LSTM(512, return_sequences=True),
    layers.Dropout(0.3),
    layers.LSTM(265, return_sequences=True),
    layers.Dropout(0.3),
    layers.LSTM(128, return_sequences=True),
    layers.Dropout(0.3),
    layers.LSTM(128, return_sequences=True),
    layers.Dropout(0.3),
    layers.LSTM(128, return_sequences=False),
    layers.Dropout(0.3),
    layers.Dense(64, activation="relu"),
    layers.Dropout(0.3),
    layers.Dense(32, activation="relu"),
    layers.Dense(12, activation="softmax"),
])

# Compile the model
model.compile(loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

# Define class weights for handling class imbalance
cls_weight = {0: 0.5, 1: 2, 2: 0.5, 3: 0.5, 4: 0.5, 5: 0.5, 6: 0.5, 7: 10, 8: 2, 9: 0.5, 10: 0.5, 11: 0.5}

# Define model checkpoint callback to save the best model
checkpoint = ModelCheckpoint("models/bagging-{epoch:02d}.model", monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

# Train the model
model.fit(X_train, y_train, batch_size=32, epochs=40, validation_data=(X_test, y_test), callbacks=[checkpoint], class_weight=cls_weight)

# Evaluate the model and print the classification report
y_pred = model.predict(X_test)
res = [np.argmax(i) for i in y_pred]

print(classification_report(y_test, res))


In [None]:
# Preprocess Test Data for Bagging

In [None]:
# Load the test data
all_imgs = []
all_filenames = []

test_folder = r"C:\Users\Admin\Downloads\Captcha Dateien\Captcha Dateien\test\test_data\test"

# Loop through test data files
for i in range(9):
    with open(f'test/x_test{i}.json') as f:
        json_load = json.load(f)
        images = np.asarray(json_load["X_test"])
        file_names = list(json_load["filenames"])
        all_imgs.extend(images)
        all_filenames.extend(file_names)

print("Loaded test data")

# Preprocess test images
imgs = [feature_extractor(im)["pixel_values"][0] for im in all_imgs]

print("Preprocessed test images")

all_results = []

# Loop through each model in the bagging folder
for model_path in os.listdir(folder):
    print(f"Processing model: {model_path}")
    r = []

    # Process images in batches of 40
    for i in range(40, len(imgs), 40):
        img_batch = imgs[i-40:i]
        model = ViTForImageClassification.from_pretrained(os.path.join(folder, model_path), local_files_only=True).cuda()
        r.extend(model(torch.tensor(np.asarray(img_batch)).cuda()).logits.tolist())
        l = i

    # Process remaining images
    img_batch = imgs[l:]
    model = ViTForImageClassification.from_pretrained(os.path.join(folder, model_path), local_files_only=True).cuda()
    r.extend(model(torch.tensor(np.asarray(img_batch)).cuda()).logits.tolist())

    all_results.append(r)

# Save the results in a JSON file
json_format = {
    "res": all_results,
    "filenames": all_filenames
}

with open("bagging_results_test_more.json", "w") as f:
    json.dump(json_format, f)

In [None]:
#Predict Test Data and Save Results

In [None]:
# Load the bagging results
with open('bagging_results_test_more.json') as f:
    json_load = json.load(f)

data = np.asarray(json_load["res"])
labels = np.asarray(json_load["filenames"])

# Prepare data for prediction
a = []
for i in range(data.shape[1]):
    a.append([j[i] for j in data])

X_test = np.array(a)

# Loop through relevant models and make predictions
for model_name in os.listdir("relevant_models"):
    if model_name in ["dense_1", "dense_2"]:
        X_test_reshaped = X_test.reshape((8730, 98, 12, 1))
    else:
        X_test_reshaped = X_test.reshape((8730, 98, 12))

    # Load the trained model
    model = keras.models.load_model(os.path.join("relevant_models", model_name))

    # Create a DataFrame to store the predictions
    output_labels = ["ImageName"] + list(os.listdir("data/train"))
    csv_df = pd.DataFrame(columns=output_labels)

    # Make predictions for each image
    for ind, img in enumerate(X_test_reshaped):
        t = np.array([img])
        logits = list(model.predict(t)[0])
        output = [labels[ind]] + logits
        csv_df.loc[len(csv_df)] = output

    # Save predictions to CSV
    csv_df.to_csv(f"new_bagging_outputs/{model_name}.csv", index=False)