In [None]:
import torch
import numpy as np
from matplotlib import pyplot as plt

In [None]:
rng = torch.Generator().manual_seed(137)

We will start today's tutorial with the MNIST dataset, which is widely used in the deep learning.
This is labeled black-white images of numbers from 0 to 9.

In [None]:
import torchvision

transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor() # Let's assume that the all data is in PyTorch Tensor format
])

# Load the MNIST dataset
train_dataset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)

# Get the data and labels as tensors
x_train, y_train = train_dataset.data, train_dataset.targets
x_test, y_test = test_dataset.data, test_dataset.targets

fig,ax=plt.subplots(ncols=6)
for i,a in zip(range(6),ax):
  a.imshow(x_train[i], cmap='gray')
  a.set_title(y_train[i].numpy())
  a.axis('off')

In [None]:
print(x_train[0].shape)

#1. Sequential Architecture-Building
Let's start from the fully-connected (FC) deep neural network. Probably the easiest way to build it is to use the sequential method.

In [None]:
# Start model definition by using Sequential()
model_fc = torch.nn.Sequential()

# From now on, let's build it by adding each layer or function one by one.
# To add a layer, we use add_module()
model_fc.add_module('flatten',torch.nn.Flatten()) # Make the 2D image into 1D array
model_fc.add_module('fc1',torch.nn.Linear(28*28,128)) # Fully-connected hidden layer 1 with 128 nodes
model_fc.add_module('relu1',torch.nn.ReLU()) # Add nonlinearity by normalizing it between 0-1
model_fc.add_module('fc2',torch.nn.Linear(128,128)) # Fully-connected hidden lyaer 2 with 128 nodes
model_fc.add_module('relu2',torch.nn.ReLU()) # Same as before
model_fc.add_module('fc3',torch.nn.Linear(128,10)) # Output layer with 10 node (0-9)

In [None]:
# Move the model to GPU if possible
# You need to change your runtime environment at the upper right menu
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_fc.to(device)
print(device)

# Let's print the summary of the model
import torchsummary
torchsummary.summary(model_fc,(28,28),device=str(device))

For each hidden layer, we applied sigmoid function as an **activation function**.
Activation function is a function used for adding **nonlinearity** to the process.

There could be many different choices of activation functions, including your own custom functions.
Some famous activation functions are shown below.
While ReLU (rectified linear unit) and its deviations are famous nowadays, there is no global strict rule for selecting activation functions.
However, you need to be careful for the activation function at the output layer, as different activation functions give different target set.
![Activation Function](https://cdn-images-1.medium.com/max/1200/1*ZafDv3VUm60Eh10OeJu1vw.png)

(https://cdn-images-1.medium.com/max/1200/1*ZafDv3VUm60Eh10OeJu1vw.png)

Now let's compile the model.

One of the important thing is the definition of **loss function**, the function that we want to minimize.
If the output is the continous variables, then the loss function can be the **mean square error**,
$$ \mathcal{L}_{\rm mse} = \sum_i (y_i^{\rm true} - y_i^{\rm predict})^2 \, ,$$
which is similar to the ordinary $\chi^2$-fitting.

On the other hand, if the output is the categorical probability, then the loss function can be **categorical cross-entropy**,
$$ \mathcal{L}_{\rm cce} = -\sum_i p_i^{\rm true} \log_{10} p_i^{\rm predict} \, .$$

Note that one can still select different kinds of loss function, depending on your problem.

Another important thing is the learning rate, which determines how fast the weights will be changed during the learning process.
If the learning rate is too low, it takes too much time to reach the minimum of the loss function and stuck in the nearest local minima rather than the global minimum.
On the other hand, if the learning rate is too high, it can easily escape the minimum.
![Choice of Learning Rate](https://pyimagesearch.com/wp-content/uploads/2019/08/keras_learning_rate_finder_header.png)

(https://pyimagesearch.com/wp-content/uploads/2019/08/keras_learning_rate_finder_header.png)

In [None]:
optimizer = torch.optim.SGD(model_fc.parameters(),lr=1e-2) # There are numerous optimizer options, but I used a simple one
criterion = torch.nn.CrossEntropyLoss() # This loss function is designed for classification

Let's perform the actual fit!

You can decide the number of epochs (a cycle that you perform the training and validation), the number of minibatches (a number of minimal training patches you use for a single fitting), and the split ratio between training and validation samples.

In [None]:
from tqdm import tqdm # for adding the visual progress bar

# Split training dataset further --- training & validation
len_train = int(0.8*len(train_dataset))
len_valid = len(train_dataset) - len_train
trainset, validset = torch.utils.data.random_split(
    train_dataset,
     [len_train, len_valid],
    generator=rng
)

# Define the number of epochs and batch size
epochs = 10
batch_size = 64

# Create DataLoaders for training and validation sets
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
validloader = torch.utils.data.DataLoader(validset, batch_size=batch_size, shuffle=False)

# Lists to store loss values for plotting
train_loss_history = []
valid_loss_history = []

# Training loop
for epoch in range(epochs):
    running_loss = 0.0
    model_fc.train() # Set the model to training mode
    for data in tqdm(trainloader):
        inputs, labels = data

        # Move data to the same device as the model
        inputs, labels = inputs.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model_fc(inputs)

        # Calculate loss
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Print statistics
        running_loss += loss.item()

    # Calculate average training loss for the epoch
    epoch_train_loss = running_loss / len(trainloader)
    train_loss_history.append(epoch_train_loss)

    # Validation step
    model_fc.eval() # Set the model to evaluation mode
    valid_loss = 0.0
    with torch.no_grad(): # Disable gradient calculation for validation
        for data in validloader:
            inputs, labels = data

            # Move data to the same device as the model
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model_fc(inputs)

            loss = criterion(outputs, labels)
            valid_loss += loss.item()

    # Calculate average validation loss for the epoch
    epoch_valid_loss = valid_loss / len(validloader)
    valid_loss_history.append(epoch_valid_loss)

    print(f'Epoch {epoch + 1}, Training Loss: {epoch_train_loss:.4f}, Validation Loss: {epoch_valid_loss:.4f}')

print('Finished Training')

Before applying the trained model to the samples, let's check if the training is done well.
One way is to check the evolution of loss function per epoch, both for train and validation samples.

In [None]:
# Let's plot the evolution of train and validation losses over epoch
arrEpoch = np.arange(1,epochs+1)
plt.plot(arrEpoch,train_loss_history,'r-',label="Train Loss")
plt.plot(arrEpoch,valid_loss_history,'b-',label="Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss (Categorical Cross-Entropy)")
plt.yscale("log")
plt.legend()

If both the train and validation losses continue to decrease, then it may mean that there is a room for improvement if you keep the learning process more.

On the other hand, if the train loss keeps decreasing and the validation loss starts to increase, then probably the learning process is in **overfitting** --- that is, the model eventually just copy the train sample without maintaining the general nature.
You probably need to save the model before it happens and stop the learning.   

Let's evaluate the accuracy of our new model by applying them to the test samples.

In [None]:
# Create a DataLoader for the test set
testloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

model_fc.eval() # Set the model to evaluation mode
test_loss = 0.0
correct_predictions = 0
total_samples = 0

with torch.no_grad(): # Disable gradient calculation for evaluation
    for data in testloader:
        inputs, labels = data

        # Move data to the same device as the model
        inputs, labels = inputs.to(device), labels.to(device)

        outputs = model_fc(inputs)
        loss = criterion(outputs, labels)

        test_loss += loss.item()

        # Get predictions
        _, predicted = torch.max(outputs.data, 1)
        total_samples += labels.size(0)
        correct_predictions += (predicted == labels).sum().item()

# Calculate average test loss and accuracy
avg_test_loss = test_loss / len(testloader)
accuracy = correct_predictions / total_samples

print(f'Test Loss: {avg_test_loss:.4f}, Test Accuracy: {accuracy:.4f}')

Let's predict the test samples by using our new model, and compare it to the actual values.

In [None]:
model_fc.eval() # Set the model to evaluation mode
with torch.no_grad(): # Disable gradient calculation for inference
    # Get predictions for the first few test samples
    outputs = model_fc(x_test[:4].float().to(device))
    _, predicted = torch.max(outputs.data, 1)

fig,ax=plt.subplots(ncols=4)
for i,a in zip(range(4),ax):
  a.imshow(x_test[i], cmap='gray')
  a.set_title('Pred: {} Real: {}'.format(predicted.cpu()[i].numpy(),y_test[i].numpy()))
  a.axis('off')

## 2. More general method

The sequantial method is the easiest way to build the deep learning architectures.
However, as we have seen, one can only build the "sequential" architectures:
$$ A \to B \to C \to \cdots \to E \, .$$

However, in some cases you would like to build more complex architectures, such as
$$ A \to \left\{ \begin{array}{c} B \to \cdots \to C \\ D \to \cdots \to E \end{array} \right\} \to F \, .$$
To build such complex architectures, one needs a full control of inputs and outputs of each layer.

Let's rebuild the fully-connected architecture that we used in the previous section with a more general method.

In [None]:
# Define a class that inherits from nn.Module
class SimpleNN(torch.nn.Module):
    def __init__(self):
        super().__init__()
        # Input layer: Flatten the image. The input size will be 28x28.
        self.flatten = torch.nn.Flatten()
        self.fc1 = torch.nn.Linear(28*28, 128) # Fully-connected hidden layer 1 with 128 nodes
        self.fc2 = torch.nn.Linear(128, 128) # Fully-connected hidden layer 2 with 128 nodes
        self.fc3 = torch.nn.Linear(128, 10) # Output layer with 10 nodes

    def forward(self, x):
        # Flatten the input image
        x = self.flatten(x)
        # Apply the layers and activation functions
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Instantiate the model
model_fc_new = SimpleNN().to(device)

In [None]:
# Let's print the summary of the model
torchsummary.summary(model_fc_new,(28,28),device=str(device))

#3. Homework: Galaxy morphology classification

We just learned a basic of deep learning by using the MNIST dataset. This dataset has been tested by tons of different machine learning methods for a long time. Probably that's why your deep learning architecture works well.

But... will your model work well with other problems as well? Let's try it with galaxy image dataset with morphology (spiral or elliptical).



First, let's download the data.

In [None]:
!pip install gdown

In [None]:
!gdown https://drive.google.com/uc?id=1QjTECPvkzrXUJxishXS-0KTknNq0erly

In [None]:
!unzip -q gz_data.zip

Let's have a look at example images:

In [None]:
import PIL
import glob

In [None]:
fig, ax = plt.subplots(ncols=2)
for s, a in zip(['elliptical', 'spiral'], ax) :
  a.imshow(PIL.Image.open(glob.glob(f'curated_data/{s}/*.jpg')[0]))
  a.set_title(s)
  a.axis('off')

The original RGB image has a size of 424 x 424 pixels. This might be too large for the tutorial. Let's reduce the image size so that we can train our deep learning models in a reasonable time.

In [None]:
transform = torchvision.transforms.Compose([
    torchvision.transforms.Resize((27,27)),
    torchvision.transforms.ToTensor()
])
dataset = torchvision.datasets.ImageFolder('curated_data', transform=transform)

Let's split our data set into training, validation, and test data.

It's very important to do things reproducibly!

In [None]:
len_train = int(0.7*len(dataset))
len_valid = int(0.2*len(dataset))
len_test = len(dataset) - len_train - len_valid
trainset, validset, testset = torch.utils.data.random_split(
    dataset,
     [len_train, len_valid, len_test],
    generator=rng
)

In [None]:
print(f'Data set sizes:\n\ttraining:\t{len(trainset)}\n\tvalidation:\t{len(validset)}\n\ttesting:\t{len(testset)}')

In [None]:
fig,ax=plt.subplots(ncols=6)
for i,a in zip(range(6),ax):
  a.imshow(np.transpose(trainset[i][0],(1,2,0)))
  a.set_title(trainset[i][1])
  a.axis('off')

In [None]:
# Model definition
model_morph = ...

model_morph.to(device)

In [None]:
optimizer_morph = ...
criterion_morph = ...

In [None]:
# Create DataLoaders for training and validation sets
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
validloader = torch.utils.data.DataLoader(validset, batch_size=batch_size, shuffle=False)

# Lists to store loss values for plotting
train_loss_history = []
valid_loss_history = []

epochs = ...

# Training loop
for epoch in range(epochs):
    ...

    print(f'Epoch {epoch + 1}, Training Loss: {epoch_train_loss:.4f}, Validation Loss: {epoch_valid_loss:.4f}')

print('Finished Training')

In [None]:
# Create a DataLoader for the test set
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False)

model_morph.eval() # Set the model to evaluation mode
test_loss = 0.0
correct_predictions = 0
total_samples = 0

with torch.no_grad(): # Disable gradient calculation for evaluation
    for data in testloader:
        ...

# Calculate average test loss and accuracy
avg_test_loss = test_loss / len(testloader)
accuracy = correct_predictions / total_samples

print(f'Test Loss: {avg_test_loss:.4f}, Test Accuracy: {accuracy:.4f}')

In [None]:
model_morph.eval() # Set the model to evaluation mode
with torch.no_grad(): # Disable gradient calculation for inference
    # Get predictions for the first few test samples
    inputs = torch.from_numpy(np.array([testset[i][0] for i in range(4)]))
    outputs = model_morph(inputs.to(device))
    _, predicted = torch.max(outputs.data, 1)

fig,ax=plt.subplots(ncols=4)
for i,a in zip(range(4),ax):
  a.imshow(np.transpose(testset[i][0],(1,2,0)))
  a.set_title('Pred: {} Real: {}'.format(predicted.cpu()[i],testset[i][1]))
  a.axis('off')