Przed oddaniem zadania upewnij się, że wszystko działa poprawnie.
**Uruchom ponownie kernel** (z paska menu: Kernel$\rightarrow$Restart) a następnie
**wykonaj wszystkie komórki** (z paska menu: Cell$\rightarrow$Run All).

Upewnij się, że wypełniłeś wszystkie pola `TU WPISZ KOD` lub `TU WPISZ ODPOWIEDŹ`, oraz
że podałeś swoje imię i nazwisko poniżej:

In [None]:
NAME = ""

---

# 2. Wielomodalne autokodery

Przejdziemy teraz do implementacji modelu **wielomodalnego autokodera**. W przypadku rozważanych przez nas danych, autokoder ten będzie posiadać dwa wejścia oraz dwa wyjścia (wcześniej wyznaczone wektory cech obrazków oraz tekstów).

In [None]:
from typing import Dict, List, Type

import pandas as pd
import torch
from sklearn.decomposition import PCA
from torch import nn
from torch.nn import functional as F

from src.dataset import DataModule
from src.downstream import evaluate_classification
from src.nn.unimodal import UnimodalAE
from src.train import extract_embeddings, train_model
from src.visualization import make_interactive_scatter_plot, visualize_most_similar

## Zadanie 2.1 (2 pkt)
Zaczniemy od implementacji modułu kodera wielomodalnego. Należy uzupełnić poniższą implementację w taki sposób, aby:
- dla każdej modalności (określonej przez parametr `modality_names`) został utworzony modal perceptrona wielowarstwowego (MLP), który będzie przekształcać cechy w danej modalności (pamiętaj aby odpowiednio przypisać moduły PyTorchowe – np. `ModuleList` albo `ModuleDict`)
- MLP dla każdej modalności będzie posiadać taką samą architekturę (z wyłączeniem wymiaru wejściwego) - wykorzystaj podane w konstruktorze parametry dla tych sieci MLP:
  * `in_dims` - wymiary danych wejściowych dla każdej modalności,
  * `hidden_dims` - rozmiary warstw ukrytych, takie same dla każdego MLP,
  * `out_dim` - wyjściowy rozmiar, również takie same dla każdego MLP.
- w metodzie `forward()` przekształć odpowiednie modalności przez przypisane do nich sieci MLP, na wyjściu zwróć listę wektorów

In [None]:
class MultimodalEncoder(nn.Module):

    def __init__(
        self,
        modality_names: List[str],
        in_dims: Dict[str, int],
        hidden_dims: List[int],
        out_dim: int,
        last_activation: Type[nn.Module],
    ):
        super().__init__()

        self.modality_names = modality_names
        
        # TU WPISZ KOD
        raise NotImplementedError()

    def forward(self, x: Dict[str, torch.Tensor]) -> List[torch.Tensor]:
        # TU WPISZ KOD
        raise NotImplementedError()
        
    @staticmethod
    def from_hparams(hparams):
        return MultimodalEncoder(
            modality_names=hparams["modality_names"],
            in_dims=hparams["data_dims"],
            hidden_dims=hparams["hidden_dims"],
            out_dim=hparams["emb_dim"],
            last_activation=nn.Tanh,
        )


## Zadanie 2.2 (2 pkt)
Zaimplementuj dwie strategie łączenia wektorów ukrytych z różnych modalności w jeden wielomodalny wektor reprezentacji:
- w klasie `AvgFusion` zaimplementuj uśrednianie wektorów z różnych modalności
- w klasie `MLPFusion` skonkatenuj wektory z różnych modalności a następnie przekształć wynik przez sieć MLP (parametry sieci podane w konstruktorze)

In [None]:
class AvgFusion(nn.Module):
    
    def forward(self, h: List[torch.Tensor]) -> torch.Tensor:
        # TU WPISZ KOD
        raise NotImplementedError()
    
    
class MLPFusion(nn.Module):
    
    def __init__(
        self,
        modality_dim: int,
        num_modalities: int,
        hidden_dims: List[int],
        out_dim: int,
        last_activation: Type[nn.Module],
    ):
        super().__init__()
        
        # TU WPISZ KOD
        raise NotImplementedError()
        
    def forward(self, h: List[torch.Tensor]) -> torch.Tensor:
        # TU WPISZ KOD
        raise NotImplementedError()

## Zadanie 2.3 (2 pkt)
Analogicznie do kodera wielomodalnego, musimy zaimplementować moduł wielomodalnego dekodera.
- dla każdej modalności utwórz sieć MLP, która będzie dekodować (rekonstruować) oryginalne atrybuty obiektu w danej modalności:
  * `in_dim` określa wymiar wejściowego wielomodalnego wektora reprezentacji (wspólne dla wszystkich modalności)
  * `hidden_dims` określa rozmiary warstw ukrytych modeli MLP (wspólne dla wszystkich modalności)
  * `out_dims` określa wymiary atrybutów (które chcemy zrekonstruować) w każdej modalności
- w metodzie `forward()` zastosuj utworzone sieci MLP na wielomodalnej reprezentacji `z` i zwróć słownik, w którym klucze określają nazwy modalności a skojarzone wartości to rekonstrukcje atrybutów w danej modalności

In [None]:
class MultimodalDecoder(nn.Module):

    def __init__(
        self,
        modality_names: List[str],
        in_dim: int,
        hidden_dims: List[int],
        out_dims: Dict[str, int],
        last_activation: Type[nn.Module],
    ):
        super().__init__()

        self.modality_names = modality_names
        
        # TU WPISZ KOD
        raise NotImplementedError()

    def forward(self, z: torch.Tensor) -> Dict[str, torch.Tensor]:
        # TU WPISZ KOD
        raise NotImplementedError()

## Zadanie 2.4 (2 pkt)
Przeanalizuj implementację klasy bazowej `BaseAE` a następnie dokończ implementację właściwego wielomodalnego autokodera:
- w metodzie `forward()` zastosuj wielomodalny koder `encoder` na podanych danych wejściowych, a następnie połącz listę ukrytych wektorów w jedną wielomodalną reprezentację, wykorzystując moduł fuzji `fusion`
- w metodzie `_common_step()` zaimplementuj krok uczenia autokodera:
  * wyznacz wielomodalną reprezentację `z`
  * przeprowadź rekonstrukcję oryginalnych cech `x_rec` na podstawie reprezentacji `z`
  * oblicz funkcję kosztu jako błąd średniokwadratowy (`MSE`) po każdej modalności, a wartości tych funkcji kosztu uśrednij względem wszystkich modalności

In [None]:
from src.nn.ae import BaseAE


class MultimodalAE(BaseAE):

    def __init__(self, hparams):
        encoder_cls = hparams["encoder_cls"]
        
        super().__init__(
            hparams=hparams,
            encoder=encoder_cls.from_hparams(hparams),
            decoder=MultimodalDecoder(
                modality_names=hparams["modality_names"],
                in_dim=hparams["emb_dim"],
                hidden_dims=hparams["hidden_dims"][::-1],
                out_dims=hparams["data_dims"],
                last_activation=nn.Identity,
            ),
        )
        
        if hparams["fusion"] == "Avg":
            self.fusion = AvgFusion()
        elif hparams["fusion"] == "MLP":
            self.fusion = MLPFusion(
                modality_dim=hparams["emb_dim"],
                num_modalities=len(hparams["modality_names"]),
                hidden_dims=[hparams["emb_dim"], hparams["emb_dim"]],
                out_dim=hparams["emb_dim"],
                last_activation=nn.Tanh,
            )
        else:
            raise ValueError(f"Unknown fusion module: \"{hparams['fusion']}\"")

    def forward(self, batch) -> torch.Tensor:
        # TU WPISZ KOD
        raise NotImplementedError()

    def _common_step(self, batch) -> torch.Tensor:
        # TU WPISZ KOD
        raise NotImplementedError()


In [None]:
%load_ext tensorboard
%tensorboard --logdir ./data/logs --port 6006

In [None]:
default_hparams = {
    "encoder_cls": MultimodalEncoder,
    "modality_names": ["img_emb", "text_emb"],
    "data_dims": {"img_emb": 2048, "text_emb": 384}, 
    "batch_size": 64,
    "num_epochs": 30,
    "hidden_dims": [256, 256, 256],
    "emb_dim": 128,
    "lr": 1e-3,
    "weight_decay": 5e-4,
}

In [None]:
datamodule = DataModule(batch_size=default_hparams["batch_size"])

In [None]:
train_model(
    model_cls=MultimodalAE,
    hparams={
        "name": "ImageTextAvgAE",
        "fusion": "Avg",
        **default_hparams,
    },
    datamodule=datamodule,
)

In [None]:
train_model(
    model_cls=MultimodalAE,
    hparams={
        "name": "ImageTextMLPAE",
        "fusion": "MLP",
        **default_hparams,
    },
    datamodule=datamodule,
)

In [None]:
multimodal_avg_emb = extract_embeddings(
    model_cls=MultimodalAE, 
    name="ImageTextAvgAE",
    datamodule=datamodule,
)

multimodal_mlp_emb = extract_embeddings(
    model_cls=MultimodalAE, 
    name="ImageTextMLPAE",
    datamodule=datamodule,
)


In [None]:
make_interactive_scatter_plot(
    title="Multimodal embeddings (Avg)",
    z_2d=PCA(n_components=2).fit_transform(multimodal_avg_emb),
    df=datamodule.df["all"],
)

In [None]:
make_interactive_scatter_plot(
    title="Multimodal embeddings (MLP)",
    z_2d=PCA(n_components=2).fit_transform(multimodal_mlp_emb),
    df=datamodule.df["all"],
)

In [None]:
_ = visualize_most_similar(
    title="Most similar by multimodal embedding (Avg)",
    anchor_index=339,
    z=multimodal_avg_emb,
    df=datamodule.df["all"],
)

In [None]:
_ = visualize_most_similar(
    title="Most similar by multimodal embedding (MLP)",
    anchor_index=339,
    z=multimodal_mlp_emb,
    df=datamodule.df["all"],
)

In [None]:
evaluate_classification(
    model_names=[
        (UnimodalAE, "ImageAE"), 
        (UnimodalAE, "TextAE"), 
        (MultimodalAE, "ImageTextAvgAE"),
        (MultimodalAE, "ImageTextMLPAE"),
    ],
    datamodule=datamodule,
)

# Maskowane uczenie
Dotychczas wielomodalny autokoder był uczony w taki sposób, że zarówno na wejściu jak i na wyjściu otrzymywał informacje o obrazku, jak i tekście. Teraz zobaczymy jak model się będzie zachowywać w sytuacji, kiedy jedna z modalności będzie **maskowana na wejściu** (można w ten sposób symulować sytuacje, gdy jedna z modalności nie jest dostępna – *brakująca wartość*). 

Zaczniemy od przygotowania nowej implementacji `MultimodalEncoder`.

## Zadanie 2.5 (2 pkt)
Uzupełnij poniższą implementację klasy `MaskedMultimodalEncoder`, która będzie maskować cechy z wybranej modalności z określonym prawdopodobieństwem `p_m`:

- poprzez maskowanie danej modalności rozumiemy zastąpienie wektora cech w tej modalności dla konkretnego obiektu, wektorem składającym się z samych zer
- to czy pojedynczy obiekt będzie poddawany maskowaniu określamy na podstawie prawdopodobieństwa $p_m \in [0, 1]$
- zakładamy, że maskowanie dotyczy tylko etapu uczenia, natomiast w trakcie inferencji używamy dostępnych cech bez jakiejkolwiek modyfikacji

In [None]:
class MaskedMultimodalEncoder(nn.Module):

    def __init__(
        self,
        modality_names: List[str],
        in_dims: Dict[str, int],
        hidden_dims: List[int],
        out_dim: int,
        last_activation: Type[nn.Module],
        masked_modality: str,
        p_m: float,
    ):
        super().__init__()

        self.modality_names = modality_names
        
        # TU WPISZ KOD
        raise NotImplementedError()

    def forward(self, x: Dict[str, torch.Tensor]) -> List[torch.Tensor]:
        # TU WPISZ KOD
        raise NotImplementedError()
        
    @staticmethod
    def from_hparams(hparams):
        return MaskedMultimodalEncoder(
            modality_names=hparams["modality_names"],
            in_dims=hparams["data_dims"],
            hidden_dims=hparams["hidden_dims"],
            out_dim=hparams["emb_dim"],
            last_activation=nn.Tanh,
            masked_modality=hparams["masked_modality"],
            p_m=hparams["p_m"],
        )

In [None]:
train_model(
    model_cls=MultimodalAE,
    hparams={
        **default_hparams,
        "name": "MaskedImage_ImageTextAvgAE",
        "fusion": "Avg",
        "encoder_cls": MaskedMultimodalEncoder,
        "masked_modality": "img_emb",
        "p_m": 1.0,
    },
    datamodule=datamodule,
)

train_model(
    model_cls=MultimodalAE,
    hparams={
        **default_hparams,
        "name": "MaskedText_ImageTextAvgAE",
        "fusion": "Avg",
        "encoder_cls": MaskedMultimodalEncoder,
        "masked_modality": "text_emb",
        "p_m": 1.0,
    },
    datamodule=datamodule,
)

In [None]:
masked_image_avg_emb = extract_embeddings(
    model_cls=MultimodalAE, 
    name="MaskedImage_ImageTextAvgAE",
    datamodule=datamodule,
)

masked_text_avg_emb = extract_embeddings(
    model_cls=MultimodalAE, 
    name="MaskedText_ImageTextAvgAE",
    datamodule=datamodule,
)


In [None]:
make_interactive_scatter_plot(
    title="Masked Image (p_m = 1.0) - Multimodal embeddings (Avg)",
    z_2d=PCA(n_components=2).fit_transform(masked_image_avg_emb),
    df=datamodule.df["all"],
)

In [None]:
make_interactive_scatter_plot(
    title="Masked Text (p_m = 1.0) - Multimodal embeddings (Avg)",
    z_2d=PCA(n_components=2).fit_transform(masked_text_avg_emb),
    df=datamodule.df["all"],
)

In [None]:
evaluate_classification(
    model_names=[
        (UnimodalAE, "ImageAE"), 
        (UnimodalAE, "TextAE"), 
        (MultimodalAE, "ImageTextAvgAE"),
        (MultimodalAE, "ImageTextMLPAE"),
        (MultimodalAE, "MaskedImage_ImageTextAvgAE"),
        (MultimodalAE, "MaskedText_ImageTextAvgAE"),
    ],
    datamodule=datamodule,
)

## Zadanie 2.6 (2 pkt)
Zbadaj jak wartość parametru `p_m` wpływa na jakość otrzymywanych multimodalnych wektorów reprezentacji? (Skrajne wartości `p_m = 0` oraz `p_m = 1` zbadaliśmy w poprzednich przykładach). Skomentuj otrzymane wyniki.

In [None]:
def check_masking_probability_performance():
    # TU WPISZ KOD
    raise NotImplementedError()