In [119]:
import gzip
import os
import random
import struct
from typing import Dict, List, Optional, Tuple, TypedDict
from hps import add_arguments, setup_hparams
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F
import torchvision
import torchvision.transforms as TF
from PIL import Image
from torch import Tensor
from torch.utils.data import Dataset
from tqdm import tqdm

from hps import Hparams
from utils import log_standardize, normalize
from torch.utils.data import DataLoader
import pgm
import trainer
import argparse
from vae import HVAE
from main import main

ModuleNotFoundError: No module named 'layers'

In [3]:
def _load_uint8(f):
    idx_dtype, ndim = struct.unpack("BBBB", f.read(4))[2:]
    shape = struct.unpack(">" + "I" * ndim, f.read(4 * ndim))
    buffer_length = int(np.prod(shape))
    data = np.frombuffer(f.read(buffer_length), dtype=np.uint8).reshape(shape)
    return data

def load_idx(path: str) -> np.ndarray:
    """Reads an array in IDX format from disk.
    Parameters
    ----------
    path : str
        Path of the input file. Will uncompress with `gzip` if path ends in '.gz'.
    Returns
    -------
    np.ndarray
        Output array of dtype ``uint8``.
    References
    ----------
    http://yann.lecun.com/exdb/mnist/
    """
    open_fcn = gzip.open if path.endswith(".gz") else open
    with open_fcn(path, "rb") as f:
        return _load_uint8(f)


def _get_paths(root_dir, train):
    prefix = "train" if train else "t10k"
    images_filename = prefix + "-images-idx3-ubyte.gz"
    labels_filename = prefix + "-labels-idx1-ubyte.gz"
    metrics_filename = prefix + "-morpho.csv"
    images_path = os.path.join(root_dir, images_filename)
    labels_path = os.path.join(root_dir, labels_filename)
    metrics_path = os.path.join(root_dir, metrics_filename)
    return images_path, labels_path, metrics_path
def load_morphomnist_like(
    root_dir, train: bool = True, columns=None
) -> Tuple[np.ndarray, np.ndarray, pd.DataFrame]:
    """
    Args:
        root_dir: path to data directory
        train: whether to load the training subset (``True``, ``'train-*'`` files) or the test
            subset (``False``, ``'t10k-*'`` files)
        columns: list of morphometrics to load; by default (``None``) loads the image index and
            all available metrics: area, length, thickness, slant, width, and height
    Returns:
        images, labels, metrics
    """
    images_path, labels_path, metrics_path = _get_paths(root_dir, train)
    images = load_idx(images_path)
    labels = load_idx(labels_path)

    if columns is not None and "index" not in columns:
        usecols = ["index"] + list(columns)
    else:
        usecols = columns
    metrics = pd.read_csv(metrics_path, usecols=usecols, index_col="index")
    return images, labels, metrics

In [5]:
data = load_morphomnist_like('/home/yasin/Desktop/causal-gen/datasets/morphomnist')

In [104]:
class MorphoMNIST(Dataset):
    def __init__(
        self,
        root_dir: str,
        train: bool = True,
        transform: Optional[torchvision.transforms.Compose] = None,
        columns: Optional[List[str]] = None,
        norm: Optional[str] = None,
        concat_pa: bool = True,
    ):
        self.train = train
        self.transform = transform
        self.columns = columns
        self.concat_pa = concat_pa
        self.norm = norm

        cols_not_digit = [c for c in self.columns if c != "digit"]
        images, labels, metrics_df = load_morphomnist_like(
            root_dir, train, cols_not_digit
        )
        self.images = torch.from_numpy(np.array(images)).unsqueeze(1)
        self.labels = F.one_hot(
            torch.from_numpy(np.array(labels)).long(), num_classes=10
        )

        if self.columns is None:
            self.columns = metrics_df.columns
        self.samples = {k: torch.tensor(metrics_df[k]) for k in cols_not_digit}

        self.min_max = {
            "thickness": [0.87598526, 6.255515],
            "intensity": [66.601204, 254.90317],
        }

        for k, v in self.samples.items():  # optional preprocessing
            print(f"{k} normalization: {norm}")
            if norm == "[-1,1]":
                self.samples[k] = normalize(
                    v, x_min=self.min_max[k][0], x_max=self.min_max[k][1]
                )
            elif norm == "[0,1]":
                self.samples[k] = normalize(
                    v, x_min=self.min_max[k][0], x_max=self.min_max[k][1], zero_one=True
                )
            elif norm == None:
                pass
            else:
                NotImplementedError(f"{norm} not implemented.")
        print(f"#samples: {len(metrics_df)}\n")

        self.samples.update({"digit": self.labels})

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx: int) -> Dict[str, Tensor]:
        sample = {}
        sample["x"] = self.images[idx]

        if self.transform is not None:
            sample["x"] = self.transform(sample["x"])

        if self.concat_pa:
            sample["pa"] = torch.cat(
                [
                    v[idx] if k == "digit" else torch.tensor([v[idx]])
                    for k, v in self.samples.items()
                ],
                dim=0,
            )
        else:
            sample.update({k: v[idx] for k, v in self.samples.items()})
        return sample


def morphomnist():
    # Load data

    data_dir = "/home/yasin/Desktop/causal-gen/datasets/morphomnist"

    augmentation = {
        "train": TF.Compose(
            [
                TF.RandomCrop((32, 32), padding=4),
            ]
        ),
        "eval": TF.Compose(
            [
                TF.Pad(padding=2),  # (32, 32)
            ]
        ),
    }

    datasets = {}
    for split in ["train", "valid", "test"]:
        datasets[split] = MorphoMNIST(
            root_dir=data_dir,
            train=(split == "train"),  # test set is valid set
            transform=augmentation[("eval" if split != "train" else split)],
            columns=["thickness", "intensity", "digit"],
            norm="[-1,1]",
            concat_pa=True,
        )
    return datasets

In [105]:
def setup_dataloaders(bs):
    datasets = morphomnist()
    kwargs = {
        "batch_size": bs,
        "num_workers": 4,
        "pin_memory": True
    }
    dataloaders = {}
    dataloaders["train"] = DataLoader(datasets["train"], shuffle=True, drop_last=True, **kwargs)
    dataloaders["valid"] = DataLoader(datasets["valid"], shuffle=False, **kwargs)
    dataloaders["test"] = DataLoader(datasets["test"], shuffle=False, **kwargs)
    return dataloaders

In [106]:
dataloader = setup_dataloaders(1)

thickness normalization: [-1,1]
max: 6.255515, min: 0.87598526
intensity normalization: [-1,1]
max: 254.90317, min: 66.601204
#samples: 60000

thickness normalization: [-1,1]
max: 6.255515, min: 0.87598526
intensity normalization: [-1,1]
max: 254.90317, min: 66.601204
#samples: 10000

thickness normalization: [-1,1]
max: 6.255515, min: 0.87598526
intensity normalization: [-1,1]
max: 254.90317, min: 66.601204
#samples: 10000


In [107]:
loader = enumerate(dataloader['train'])

In [114]:
for i, batch in loader:
    print(batch['pa'])
    batch["pa"] = batch["pa"][..., None, None]#.repeat(1, 1, *(32,) * 2)
    print(batch['pa'].size())
    break

tensor([[-0.4486,  0.2572,  0.0000,  0.0000,  0.0000,  0.0000,  0.0000,  0.0000,
          0.0000,  1.0000,  0.0000,  0.0000]], dtype=torch.float64)
torch.Size([1, 12, 1, 1])
