In [1]:
import torch
import torchvision

In [2]:
import torch.nn as nn
import torch.nn.functional as F


class SimpleCNNNet(nn.Module):
    def __init__(self):
        super(SimpleCNNNet, self).__init__()
        # Convolutional Layer
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3)
        # Dropout Layer: 30% der Neuronen werden zufällig "ausgeschaltet"
        self.dropout = nn.Dropout(p=0.3)
        # Hidden Layer - Anzahl der Neuronen: (I - K + 1)^2 * F / 2*P
        # I = 28: Bild mit 28x28 Pixel
        # K = 3: Kernel Größe
        # F = 16: Anzahl der Filter
        # P = 2: Pooling Operation1
        self.hidden_layer = nn.Linear(2704, 128) 
        self.output_layer = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.max_pool2d(x , (2,2))
        x = self.dropout(x)
        x = torch.flatten(x, 1)
        x = self.hidden_layer(x)
        x = self.output_layer(x)
        return x


In [3]:
from torchvision import datasets, transforms

train_dataset = datasets.MNIST(
    "../mnist_data",
    download=True,
    train=True,
    transform=transforms.Compose([transforms.ToTensor()]),
)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
test_dataset = datasets.MNIST(
    "../mnist_data",
    download=True,
    train=False,
    transform=transforms.Compose([transforms.ToTensor()]),
)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=True)

In [4]:
len(train_dataset) + len(test_dataset)

70000

In [5]:
import torch.optim as optim
if torch.cuda.is_available():  
    device = "cuda:0" 
else:  
    device = "cpu" 
model = SimpleCNNNet().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01)
loss_function = nn.CrossEntropyLoss()

In [6]:
def train(model, train_loader, optimizer, loss_function, device="cpu", epochs=5):
    model.train()

    for epoch in range(epochs):
        running_loss = 0.0
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            predictions = model(data)
            batch_loss = loss_function(predictions, target)
            batch_loss.backward()
            optimizer.step()
            running_loss += batch_loss.item()
            if (batch_idx+1) % 100 == 0:
                print(f"Epoch: {epoch}, Batch: {batch_idx+1}, Average Loss: {running_loss/(batch_idx + 1)}")
train(model, train_loader, optimizer, loss_function, device=device)

Epoch: 0, Batch: 100, Average Loss: 2.130849679708481
Epoch: 0, Batch: 200, Average Loss: 1.8365431880950929
Epoch: 0, Batch: 300, Average Loss: 1.525294956366221
Epoch: 0, Batch: 400, Average Loss: 1.2994868168234825
Epoch: 0, Batch: 500, Average Loss: 1.143743477702141
Epoch: 0, Batch: 600, Average Loss: 1.030202533081174
Epoch: 0, Batch: 700, Average Loss: 0.9449341111736638
Epoch: 0, Batch: 800, Average Loss: 0.8754402166232467
Epoch: 0, Batch: 900, Average Loss: 0.8214053858816623
Epoch: 1, Batch: 100, Average Loss: 0.36910633444786073
Epoch: 1, Batch: 200, Average Loss: 0.3710865820944309
Epoch: 1, Batch: 300, Average Loss: 0.36160923798878986
Epoch: 1, Batch: 400, Average Loss: 0.35808429021388294
Epoch: 1, Batch: 500, Average Loss: 0.3568768500983715
Epoch: 1, Batch: 600, Average Loss: 0.35514166614661613
Epoch: 1, Batch: 700, Average Loss: 0.3541108517668077
Epoch: 1, Batch: 800, Average Loss: 0.3505493401456624
Epoch: 1, Batch: 900, Average Loss: 0.3470132754991452
Epoch: 2, 

In [7]:
def test(model, test_loader, loss_function, device="cpu"):
    model.eval()
    batch_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            predictions = model(data)
            batch_loss += loss_function(predictions, target).item()
            predicted_labels = predictions.argmax(dim=1, keepdim=True)
            correct += predicted_labels.eq(target.view_as(predicted_labels)).sum().item()
    average_loss = batch_loss / len(test_loader)
    accuracy = 100. * correct / len(test_loader.dataset)
    print(f'Average loss: {average_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.3f}%)\n')
test(model, test_loader, loss_function)

Average loss: 0.2286, Accuracy: 9339/10000 (93.390%)



# Fruit Model

In [8]:
from torchvision import models, transforms, datasets
fruit_model = models.resnet18(pretrained=True)
number_features = fruit_model.fc.in_features
for param in fruit_model.parameters():
    param.requires_grad = False
fruit_model.fc = nn.Linear(number_features, 5)

In [9]:
from torchvision import transforms, datasets
data_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

image_dataset = datasets.ImageFolder("data", data_transform)

In [10]:
train_size = int(0.8 * len(image_dataset))
test_size = len(image_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(image_dataset, [train_size, test_size])
train_loader = torch.utils.data.DataLoader(train_dataset, 
                                               batch_size=32,
                                             shuffle=True, 
                                               num_workers=4)

test_loader = torch.utils.data.DataLoader(test_dataset, 
                                               batch_size=32,
                                             shuffle=True, 
                                               num_workers=4)

In [11]:
import torch.optim as optim
optimizer = optim.SGD(fruit_model.parameters(), lr=0.01)
loss_function = nn.CrossEntropyLoss()

train(fruit_model, train_loader, optimizer, loss_function, device=device)

In [12]:
test(fruit_model, test_loader, loss_function)

Average loss: 0.3163, Accuracy: 121/128 (94.531%)



In [13]:
from PIL import Image
def predict(model, tranform_function, image_path):
    image = Image.open(image_path)
    transformed = tranform_function(image).float().unsqueeze(0)
    predicted = model(transformed)
    probabilities = F.softmax(predicted, dim=1)
    return probabilities
predict(fruit_model, data_transform, "evaluation/apple.jpg")

tensor([[0.7438, 0.0643, 0.0832, 0.0971, 0.0117]], grad_fn=<SoftmaxBackward>)

In [14]:
apple = "evaluation/apple.jpg"
print(f"Orange Probabilities: {predict(fruit_model, data_transform, apple)}")
banana = "evaluation/banana.jpg"
print(f"Orange Probabilities: {predict(fruit_model, data_transform, banana)}")
orange = "evaluation/orange.jpg"
print(f"Orange Probabilities: {predict(fruit_model, data_transform, orange)}")

Orange Probabilities: tensor([[0.7438, 0.0643, 0.0832, 0.0971, 0.0117]], grad_fn=<SoftmaxBackward>)
Orange Probabilities: tensor([[0.0485, 0.8450, 0.0239, 0.0554, 0.0273]], grad_fn=<SoftmaxBackward>)
Orange Probabilities: tensor([[0.2148, 0.0457, 0.0173, 0.6945, 0.0277]], grad_fn=<SoftmaxBackward>)


# TorchServe

In [15]:
torch.save(fruit_model.state_dict(), "fruit_model.pth")


In [16]:
import torch
example_input = torch.rand(1, 3, 224, 224)
traced_script_module = torch.jit.trace(fruit_model, example_input)
traced_script_module.save("fruit_model.pt")

In [20]:
!torch-model-archiver --model-name fruit_model --version 1.0 --serialized-file fruit_model.pt --extra-files index_to_name.json --handler image_classifier

In [21]:
!mv fruit_model.mar models/

In [22]:
!torchserve --start --ncs --model-store models --models fruit_model.mar

In [27]:
!curl -X POST http://127.0.0.1:8080/predictions/fruit_model -T data/strawberry/00000001.jpg


[
  {
    "strawberry": 0.9564064145088196
  },
  {
    "apple": 0.02362699992954731
  },
  {
    "orange": 0.010261671617627144
  },
  {
    "grape": 0.006195542402565479
  },
  {
    "banana": 0.003509427886456251
  }
]