In [None]:
!pip install einops pyvi
# !pip install --upgrade keras
# !pip install -q -U tensorflow-addons
# !pip install 'keras<3.0.0' mediapipe-model-maker

In [None]:
import os
import json
import numpy as np
import pandas as pd
import logging
import cv2
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Dense, Dropout, concatenate, Add, Dot, Softmax, Lambda, Reshape
from tensorflow.keras.callbacks import LearningRateScheduler
# import tensorflow_addons as tfa
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, confusion_matrix, ConfusionMatrixDisplay
from transformers import AutoImageProcessor, AutoModelForImageClassification, AutoTokenizer, AutoModel
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.utils import resample
import unicodedata
import regex as re
from pyvi import ViTokenizer
from transformers import logging
logging.set_verbosity_error() 

In [None]:
import warnings
warnings.filterwarnings("ignore")

## Data processing

In [None]:
class VietnameseTextPreprocessor:
    vowel_map = [
        ['a', 'à', 'á', 'ả', 'ã', 'ạ', 'a'],
        ['ă', 'ằ', 'ắ', 'ẳ', 'ẵ', 'ặ', 'aw'],
        ['â', 'ầ', 'ấ', 'ẩ', 'ẫ', 'ậ', 'aa'],
        ['e', 'è', 'é', 'ẻ', 'ẽ', 'ẹ', 'e'],
        ['ê', 'ề', 'ế', 'ể', 'ễ', 'ệ', 'ee'],
        ['i', 'ì', 'í', 'ỉ', 'ĩ', 'ị', 'i'],
        ['o', 'ò', 'ó', 'ỏ', 'õ', 'ọ', 'o'],
        ['ô', 'ồ', 'ố', 'ổ', 'ỗ', 'ộ', 'oo'],
        ['ơ', 'ờ', 'ớ', 'ở', 'ỡ', 'ợ', 'ow'],
        ['u', 'ù', 'ú', 'ủ', 'ũ', 'ụ', 'u'],
        ['ư', 'ừ', 'ứ', 'ử', 'ữ', 'ự', 'uw'],
        ['y', 'ỳ', 'ý', 'ỷ', 'ỹ', 'ỵ', 'y']
    ]

    tone_map = ['', 'f', 's', 'r', 'x', 'j']
    vowel_to_ids = {}

    @classmethod
    def initialize_vowel_to_ids(cls):
        for i in range(len(cls.vowel_map)):
            for j in range(len(cls.vowel_map[i]) - 1):
                cls.vowel_to_ids[cls.vowel_map[i][j]] = (i, j)

    @staticmethod
    def unicode_normalize(text):
        return unicodedata.normalize('NFC', text)

    @classmethod
    def is_valid_vietnamese_word(cls, word):
        chars = list(word)
        vowel_index = -1
        for index, char in enumerate(chars):
            x, y = cls.vowel_to_ids.get(char, (-1, -1))
            if x != -1:
                if vowel_index == -1:
                    vowel_index = index
                else:
                    if index - vowel_index != 1:
                        return False
                    vowel_index = index
        return True

    @classmethod
    def standardize_vietnamese_tone(cls, word):
        if not cls.is_valid_vietnamese_word(word):
            return word

        chars = list(word)
        tone = 0
        vowel_indices = []
        is_qu_or_gi = False
        for index, char in enumerate(chars):
            x, y = cls.vowel_to_ids.get(char, (-1, -1))
            if x == -1:
                continue
            elif x == 9 and index != 0 and chars[index - 1] == 'q':  # check 'qu'
                chars[index] = 'u'
                is_qu_or_gi = True
            elif x == 5 and index != 0 and chars[index - 1] == 'g':  # check 'gi'
                chars[index] = 'i'
                is_qu_or_gi = True
            if y != 0:
                tone = y
                chars[index] = cls.vowel_map[x][0]
            if not is_qu_or_gi or index != 1:
                vowel_indices.append(index)

        if len(vowel_indices) < 2:
            if is_qu_or_gi:
                if len(chars) == 2:
                    x, y = cls.vowel_to_ids.get(chars[1])
                    chars[1] = cls.vowel_map[x][tone]
                else:
                    x, y = cls.vowel_to_ids.get(chars[2], (-1, -1))
                    if x != -1:
                        chars[2] = cls.vowel_map[x][tone]
                    else:
                        chars[1] = cls.vowel_map[5][tone] if chars[1] == 'i' else cls.vowel_map[9][tone]
                return ''.join(chars)
            return word

        for index in vowel_indices:
            x, y = cls.vowel_to_ids[chars[index]]
            if x == 4 or x == 8:  # ê, ơ
                chars[index] = cls.vowel_map[x][tone]
                return ''.join(chars)

        if len(vowel_indices) == 2:
            if vowel_indices[-1] == len(chars) - 1:
                x, y = cls.vowel_to_ids[chars[vowel_indices[0]]]
                chars[vowel_indices[0]] = cls.vowel_map[x][tone]
            else:
                x, y = cls.vowel_to_ids[chars[vowel_indices[1]]]
                chars[vowel_indices[1]] = cls.vowel_map[x][tone]
        else:
            x, y = cls.vowel_to_ids[chars[vowel_indices[1]]]
            chars[vowel_indices[1]] = cls.vowel_map[x][tone]
        return ''.join(chars)


    @classmethod
    def standardize_sentence_tone(cls, sentence):
        sentence = sentence.lower()
        words = sentence.split()
        for index, word in enumerate(words):
            if not word:
                return " "
            cleaned_word = re.sub(r'(^\p{P}*)([p{L}.]*\p{L}+)(\p{P}*$)', r'\1/\2/\3', word).split('/')
            if len(cleaned_word) == 3:
                cleaned_word[1] = cls.standardize_vietnamese_tone(cleaned_word[1])
            words[index] = ''.join(cleaned_word)
        return ' '.join(words)

    @classmethod
    def fix_repeated_chars(cls, sentence): 
        return re.sub(r'(.)\1{2,}', r'\1', sentence)


    @classmethod
    def preprocess(cls, text):
        text = cls.unicode_normalize(text)
        text = cls.standardize_sentence_tone(text)
        text = cls.fix_repeated_chars(text)
        return text

VietnameseTextPreprocessor.initialize_vowel_to_ids()

## Extract Feature

In [None]:
class FeatureExtractor:
    def __init__(self, processor, text_model, image_model, tokenizer, device):
        self.device = device
        self.processor = processor
        self.text_model = text_model
        self.image_model = image_model
        self.tokenizer = tokenizer

    def _load_image(self, image_path):
        """Helper function to load an image from a specified path."""
        image = cv2.imread(image_path)
        if image is None:
            raise ValueError(f"Image not found at path: {image_path}")
        return image

    def _process_image(self, image):
        """Helper function to process and prepare image tensors for feature extraction."""
        inputs = self.processor(images=image, return_tensors="pt").to(self.device)
        with torch.no_grad():
            outputs = self.image_model(**inputs)
        return outputs.logits.cpu().numpy().squeeze()

    def extract_image_features(self, image_name, path_prefix=""):
        """Extract features from a list of images."""
        image_features = []
        image_path = path_prefix + image_name
        try:
            image = self._load_image(image_path)
            features = self._process_image(image)
            image_features.append(features)
        except Exception as e:
            print(f"Error processing image '{image_name}': {str(e)}")
            image_features.append(np.zeros(1000)) 
        return np.array(image_features)

    def _tokenize_text(self, text):
        """Helper function to tokenize and encode text for feature extraction."""
        inputs = self.tokenizer(text, return_tensors="pt", padding=True, truncation=True).to(self.device)
        with torch.no_grad():
            outputs = self.text_model(**inputs)
        return outputs.last_hidden_state.mean(dim=1).cpu().numpy().squeeze()

    def extract_text_features(self, text):
        """Extract features from a list of text inputs."""
        text_features = [] 
        try:
            features = self._tokenize_text(text)
            text_features.append(features)
        except Exception as e:
            print(f"Error processing text: {str(e)}")
            text_features.append(np.zeros(1024))  
        return np.array(text_features)

    def extract_features(self, texts, caption_texts, images, path_prefix=""):
        """
        Extract both text and image features.
        
        Parameters:
        - texts: List of primary text inputs.
        - caption_texts: List of texts used as captions for images.
        - images: List of image file names.
        
        Returns:
        - Tuple of arrays (text_features, image_features, caption_text_features).
        """
        image_features = self.extract_image_features(images, path_prefix=path_prefix)
        text_features = self.extract_text_features(texts)
        caption_features = self.extract_text_features(caption_texts)
        
        return text_features, image_features, caption_features

## SarcasmModel

In [None]:
class SarcasmModel:
    def __init__(self, config, n_classes, combined_layer_size, is_caption=False):
        self.config = config
        self.n_classes = n_classes
        self.is_caption = is_caption
        self.combined_layer_size = combined_layer_size
        self.model = self.build_model()

    def build_model(self):
        # Define inputs
        image_input = Input(shape=(self.config.image_dim,), name='image_input')
        text_input = Input(shape=(self.config.text_dim,), name='text_input')
        
        # Create branches
        image_branch = self._create_branch(image_input, self.config.image_branch_layers)
        text_branch = self._create_branch(text_input, self.config.text_branch_layers)

        # Optional caption branch
        if self.is_caption:
            caption_input = Input(shape=(self.config.caption_dim,), name='caption_input')
            caption_branch = self._create_branch(caption_input, self.config.caption_branch_layers)
            combined = concatenate([image_branch, text_branch, caption_branch])
            inputs = [image_input, text_input, caption_input]
        else:
            combined = concatenate([image_branch, text_branch])
            inputs = [image_input, text_input]
        
        # Combine layers
        combined = Dense(self.combined_layer_size, activation='relu')(combined)
        combined = Dropout(self.config.dropout_rate)(combined)
        combined = Dense(int(self.combined_layer_size/2), activation='relu')(combined)
        combined = Dropout(self.config.dropout_rate)(combined) 
        
        # Output layer
        output = Dense(self.n_classes, activation='softmax', name="output")(combined)
        
        return Model(inputs=inputs, outputs=output)

    def _create_branch(self, input_layer, layer_sizes):
        x = input_layer
        for size in layer_sizes:
            x = Dense(size, activation='relu')(x)
            x = Dropout(self.config.dropout_rate)(x)
        return x

## Testing model

In [None]:
class TestingModel:
    def __init__(self, model, map_label, is_caption=False):
        self.model = model 
        self.map_label = map_label
        self.is_caption = is_caption

    def decode_labels(self, one_hot_labels):
        reverse_mapping = {v: k for k, v in self.map_label.items()}
        return [reverse_mapping[idx] for idx in np.argmax(one_hot_labels, axis=1)]

    def dict_labels(self, list_probs):
        return {label: prob for label, prob in zip(self.map_label.keys(), list_probs[0])}

    def predict(self, image_features, text_features, caption_features=None):
        # Check if caption features are included
        if self.is_caption and caption_features is not None:
            predictions = self.model.predict(
                [image_features, text_features, caption_features],
                verbose=0
            )
        else:
            predictions = self.model.predict(
                [image_features, text_features],
                verbose=0
            )
        
        list_prods = predictions.tolist() 
        dic_pro_label = self.dict_labels(list_prods)
        return self.decode_labels(predictions), dic_pro_label

    def save_model(self, path):
        self.model.save(path)
        print(f"Model saved at {path}")

    def load_model(self, path):
        self.model = tf.keras.models.load_model(path, compile=False)
        print(f"Model loaded from {path}")

In [None]:
class Config:
    def __init__(self):
        self.test_path = '/kaggle/input/vimmsd/test-images/'
        self.test_json_path = '/kaggle/input/vimmsd/vimmsd-private-test-new-translate.csv'
        self.image_dim = 1000
        self.text_dim = 1024
        self.caption_dim = 1024
        self.image_branch_layers = [1024, 512]
        self.text_branch_layers = [512, 256]
        self.caption_branch_layers = [512, 256]
        self.combined_layer_size = 1024
        self.dropout_rate = 0.3

    def display(self):
        """Prints all configuration parameters."""
        for key, value in self.__dict__.items():
            print(f"{key}: {value}")
 

In [None]:
import numpy as np
class WeightEnsembleVoting:
    def __init__(self):
        pass

    def predict(self, list_of_dicts):
        """
        Predicts the final label based on weighted ensemble voting from multiple models.

        Parameters:
            list_of_dicts (list of dict): A list of dictionaries containing class probabilities
                                          from different models, e.g., [{"a": 0.5}, {"a": 0.3, "c": 0.8}].

        Returns:
            final_label (str): The label with the highest average probability across all models.
            max_prob (float): The highest average probability for the chosen label.
        """
        combined_dict = {}
        for prob_dict in list_of_dicts:
            for label, prob in prob_dict.items():
                if label in combined_dict:
                    combined_dict[label].append(prob)
                else:
                    combined_dict[label] = [prob]

        averaged_dict = {label: np.mean(probs) for label, probs in combined_dict.items()}

        final_label = max(averaged_dict, key=averaged_dict.get)
        max_prob = averaged_dict[final_label]

        return final_label, max_prob

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
config = Config()

### Data

In [None]:
processor = AutoImageProcessor.from_pretrained("google/vit-base-patch16-384")
image_model = AutoModelForImageClassification.from_pretrained("google/vit-base-patch16-384").to(device)

tokenizer = AutoTokenizer.from_pretrained("jinaai/jina-embeddings-v3")
text_model = AutoModel.from_pretrained("jinaai/jina-embeddings-v3", 
                                            trust_remote_code=True,
                                            torch_dtype=torch.float32).to(device)

In [None]:
feature_extractor = FeatureExtractor(processor, text_model, image_model, tokenizer, device)

In [None]:
map_label2 = {
    'not-sarcasm': 0,
    'multi-sarcasm': 1
}

map_label3 = {
    'image-sarcasm': 1,
    'text-sarcasm': 2,
    'multi-sarcasm': 0
}

map_label3_not = {
    'image-sarcasm': 1,
    'text-sarcasm': 2,
    'not-sarcasm': 0
}

map_label4 = {
    'not-sarcasm': 0,
    'image-sarcasm': 1,
    'text-sarcasm': 2,
    'multi-sarcasm': 3
}

In [None]:
wev = WeightEnsembleVoting()

model2 = SarcasmModel(config, 2, 1024, is_caption=True).model
model2 = TestingModel(model2, map_label2, is_caption=True)
model2.load_model("/kaggle/input/weight-best-model/best_2class_new.h5")

# model3 = SarcasmModel(config, 3, 768).model
# model3 = TestingModel(model3, map_label3)
# model3.load_model("/kaggle/input/weight-best-model/best_3class_multi_image_text.h5")

# model3_not = SarcasmModel(config, 3, 768).model
# model3_not = TestingModel(model3_not, map_label3_not)
# model3_not.load_model("/kaggle/input/weight-best-model/best_3class_not_image_text.h5")

# model4 = SarcasmModel(config, 4, 768).model
# model4 = TestingModel(model4, map_label4)
# model4.load_model("/kaggle/input/weight-best-model/best_4class_44.44.h5")

model4_caption = SarcasmModel(config, 4, 1024, is_caption=True).model
model4_caption = TestingModel(model4_caption, map_label4, is_caption=True)
model4_caption.load_model("/kaggle/input/weight-best-model/best_4class_new.h5")

In [None]:
test_df = pd.read_csv(config.test_json_path)

test_df['caption'] = test_df['caption'].apply(VietnameseTextPreprocessor.unicode_normalize)
test_df['caption'] = test_df['caption'].apply(VietnameseTextPreprocessor.preprocess)

In [None]:
from tqdm import tqdm
results = {}
list_prob = []
labels = []
for i in tqdm(test_df.index):
    image = test_df.image[i]
    caption = test_df.caption[i]
    caption_image = test_df.caption_image[i]
    text_test_features, image_test_features, caption_image_test_features  = feature_extractor.extract_features(caption, caption_image, image, config.test_path)
    pred2, dict_pred2 = model2.predict(image_test_features, text_test_features, caption_image_test_features)
    # pred3_not, dict_pred3_not = model3_not.predict(image_test_features, text_test_features)
    # pred3, dict_pred3 = model3.predict(image_test_features, text_test_features)
    # pred4, dict_pred4 = model4.predict(image_test_features, text_test_features)
    pred4_caption, dict_pred4_caption = model4_caption.predict(image_test_features, text_test_features, caption_image_test_features)
    # temp = {}
    # if pred2[0] == "not-sarcasm":
    #     temp["not-sarcasm"] = dict_pred2.get("not-sarcasm")
    # else:
    #     pred3, dict_pred3 = model3.predict(image_test_features, text_test_features)
    #     temp.update(dict_pred3)
    predict, prob = wev.predict([dict_pred2, dict_pred4_caption])
    list_prob.append(prob)
    labels.append(predict)
    results[str(i)] = predict

In [None]:
# predictions, prob, dict_pred4 = model4.predict(image_test_features, text_test_features)
# results = {str(i): pred for i, pred in enumerate(predictions)}

In [None]:
output = {
    "results": results,
    "phase": 'test'
}

with open('results1.json', 'w') as f:
    json.dump(output, f, indent=2)

In [None]:
df = pd.read_json("/kaggle/working/results1.json")
df.value_counts()

In [None]:
test_df["label"] = labels
test_df["prob"] = list_prob

## Caption Label Voting by Majority and Probability

In [None]:
class CaptionProcessor:
    def __init__(self, df):
        self.df = df.copy()

    def _get_duplicate_captions(self):
        """Find rows with duplicate 'caption'."""
        return self.df[self.df.duplicated('caption', keep=False)]

    def _select_label(self, group):
        """Select the appropriate label for duplicate rows based on 'caption'."""
        label_counts = group['label'].value_counts()

        if len(label_counts) == 1:
            # Case where all labels are the same
            return label_counts.idxmax()

        if label_counts.iloc[0] > label_counts.iloc[1]:
            # Case 1: One label has a higher count
            return label_counts.idxmax()
        
        # Case 2: Labels have equal counts, compare average probabilities
        mean_probs = group.groupby('label')['prob'].mean()
        return mean_probs.idxmax()

    def process(self):
        """Process the dataframe to ensure duplicate 'caption' rows have the correct label."""
        duplicate_captions = self._get_duplicate_captions()

        for caption, group in duplicate_captions.groupby('caption'):
            selected_label = self._select_label(group)
            self.df.loc[self.df['caption'] == caption, 'label'] = selected_label

        return self.df


In [None]:
processor = CaptionProcessor(test_df)
processed_df = processor.process()

In [None]:
results = {str(i): pred for i, pred in enumerate(list(processed_df["label"]))}

output = {
    "results": results,
    "phase": 'test'
}

with open('results.json', 'w') as f:
    json.dump(output, f, indent=2)

In [None]:
df = pd.read_json("/kaggle/working/results.json")
df.value_counts()