In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
from torchvision import datasets
from torchvision.transforms import ToTensor, Lambda, Compose, Normalize
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (24,8)

In [None]:
# Hyperparameter definieren
learning_rate = 1e-3
batch_size = 64

**Train dataset**

In [None]:
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean

    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean

    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)


In [None]:
def standardize(image: torch.Tensor) -> torch.Tensor:
    image /= 255.
    return image

In [None]:
transforms=Compose([
    ToTensor(),
    Normalize((0.1307,), (0.3081,)),
    AddGaussianNoise(0., 100.),
    Lambda(standardize)
])

In [None]:
# Download training data from open datasets.
training_data = datasets.CIFAR10(
    root="data",
    train=True,
    download=True,
    transform=transforms
)

# Download test data from open datasets.
test_data = datasets.CIFAR10(
    root="data",
    train=False,
    download=True,
    transform=transforms
)

In [None]:
training_data.data.shape, test_data.data.shape

In [None]:
training_data.data.min(), training_data.data.max()

In [None]:
# daten zu float 32 konvertieren
# alles durch 255 dividieren
# training_data.data/255 - > 0 bis 1
# image vor dem AE mit Noise und nach der Eingabe - hoffentlich dann ohne Noise ausgeben

train_noise = torch.tensor(np.random.randn(50000, 32, 32, 3)*100, dtype=torch.float32)

test_noise = torch.tensor(np.random.randn(10000, 32, 32, 3)*100, dtype=torch.float32)

In [None]:
img_noise = train_noise.detach().numpy()*255
plt.imshow(img_noise[0]);

In [None]:

plt.imshow(test_data.data[0])

In [None]:
train = (torch.tensor(training_data.data).type(torch.float32) + train_noise)/255.
test = (torch.tensor(test_data.data).type(torch.float32) + test_noise)/255.

# train = (training_data.data.type(torch.float32) + train_noise)/255.
# test = (test_data.data.type(torch.float32) + test_noise)/255.

In [None]:
train.shape, test.shape

In [None]:
training_data_ae = TensorDataset(train, train)
test_data_ae = TensorDataset(test, test)

In [None]:
#training_data_ae.transform = transforms
#test_data_ae.transform = transforms

In [None]:


# Create data loaders.
train_dataloader = DataLoader(training_data_ae, batch_size=batch_size)
test_dataloader = DataLoader(test_data_ae, batch_size=batch_size)

for X, y in test_dataloader:
    print("Shape of X [Batch Size, Channels, H, W]: ", X.shape)
    print("Shape of y: ", y.shape, y.dtype) # SAME AS INPUT
    break

In [None]:
data, labels = next(iter(test_dataloader))
print(torch.min(data), torch.max(data))
img = data.detach().numpy()*255
img = img.astype(int)
plt.imshow(img[0]);

**Examine Dataset Range**

In [None]:
data, labels = next(iter(train_dataloader))
print(torch.min(data), torch.max(data))

**Model**

In [None]:
class DenoisingAutoencoder(nn.Module):
    def __init__(self):
        super(DenoisingAutoencoder, self).__init__()
        self.flatten = nn.Flatten()
        self.encoder = nn.Sequential(
            nn.Linear(32*32*3, 128), nn.ReLU(), # use tanh
            nn.Linear(128, 64), nn.ReLU(),
            # nn.Linear(64, 32), nn.ReLU(),
            # nn.Linear(32, 8), nn.ReLU(),
        )

        self.decoder = nn.Sequential(
            # nn.Linear(8, 32), nn.ReLU(),
            # nn.Linear(32, 64), nn.ReLU(),
            nn.Linear(64, 128), nn.ReLU(),
            nn.Linear(128, 32*32*3),nn.Sigmoid()  # Data value range [0, 1]
        )

    def forward(self, x):
        x = self.flatten(x)
        x = self.encoder(x)
        out = self.decoder(x)
        out = torch.reshape(out, (-1, 32, 32, 3))
        return out

model = DenoisingAutoencoder()

In [None]:
# Initialize the loss function
loss_fn = nn.CrossEntropyLoss()

In [None]:
# Define optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Training Loop
# In TF kann man einfach fitten, in PyTorch muss man selbst loopen

def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset) # Von da wissen wir die Anzahl der Zyklen
    for batch, (X, y) in enumerate(dataloader): # X == y
        # Compute prediction and loss
        recon = model(X) # reconstructed data
        loss = loss_fn(recon, X) # benchmark recon against X

        # Backpropagation
        optimizer.zero_grad()

        # Gradienten auf x Berechnen
        # In tensorflow: Apply gradient oder Gewicht = Gewicht + LearningRate*Gradient
        loss.backward()
        # Uberschreiben aller Gewichte und Biases mit neuen Werten
        optimizer.step()

        if batch % 500 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss, Train: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [None]:
def test_loop(test_dataloader, model, loss_fn):
    size = len(test_dataloader.dataset)
    test_loss = 0.
    with torch.no_grad():
        for batch, (X, y) in enumerate(test_dataloader):
            pred = model(X)
            test_loss += loss_fn(pred, y).item()


    test_loss /= size
    print(f"Avg loss, Test: {test_loss:>8f} \n")

    return test_loss

In [None]:
epochs = 10
test_losses = []
for t in range(epochs):
    #print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loss = test_loop(test_dataloader, model, loss_fn)
    test_losses.append(test_loss)

print("Done!")


In [None]:
plt.plot(np.array(test_losses));

In [None]:
data, labels = next(iter(test_dataloader))
print(torch.min(data), torch.max(data))
img = data.detach().numpy()*255
img = img.astype(int)
plt.imshow(img[0]);

In [None]:
img.shape

In [None]:
img_new = model(data)


In [None]:
img_new = (img_new.detach().numpy()*255).astype(int)
print(img_new)

In [None]:
plt.imshow(img_new[0])