In [1]:
from __future__ import annotations
from pathlib import Path
from typing import Optional, Union, TYPE_CHECKING

import numpy as np
import cv2
from PIL import Image
import tifffile

def load_image (
    observation_id: Union[int, str],
    patches_path: Union[str, Path],
    *,
    landcover_mapping: Optional[npt.NDArray] = None,
    return_arrays: bool = True,
) -> list[Patches]:
    
    observation_id = str(observation_id)
    region = "patches-fr"
    subfolder1 = observation_id[-2:]
    subfolder2 = observation_id[-4:-2]

    filename = Path(patches_path) / region / subfolder1 / subfolder2 / observation_id

    rgb_filename = filename.with_name(filename.stem + "_rgb.jpg")
    img = Image.open(rgb_filename)
    img = transform(img)
    return img

In [2]:
import torch
import torchvision
from torchvision import transforms
from PIL import Image
import pandas as pd

normalize = transforms.Normalize(
    mean=[0.485, 0.456, 0.406],
    std=[0.229, 0.224, 0.225]
)
transform = transforms.Compose([
    transforms.Resize(64),
    transforms.ToTensor(),
    normalize
])

In [3]:
from torch.utils.data import Dataset, DataLoader

class CustomImageDataset(Dataset):
    
    def __init__ (self,transform=None, target_transform=None):
        df_obs_train = pd.read_csv("../data/observations/observations_fr_train.csv", sep=";", index_col="observation_id", nrows = 52000)
        self.img_labels = df_obs_train.species_id.values
        self.observation_ids = df_obs_train.index
        self.patch_extractor = PatchExtractor()
        self.patch_extractor.add_all_bioclimatic_rasters()
        self.patch_extractor.append('sndppt')
        self.transform = transform
        self.target_transform = target_transform
        
    def __len__(self):
        return len(self.img_labels)
    
    def __getitem__(self, index):
        images = []
        images.append(load_image(self.observation_ids[index], "../data"))
        images = images + [environmental_patches]
        label = self.img_labels[index]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [4]:
dataset = CustomImageDataset()
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [50000, 2000])
train_loader = DataLoader(train_dataset, batch_size=1000,num_workers = 0,shuffle = True,drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=1,num_workers = 0,shuffle = True,drop_last=True)

In [5]:
img, target = iter(train_loader).next()

RuntimeError: stack expects each tensor to be equal size, but got [1, 64, 64] at entry 0 and [3, 64, 64] at entry 1

In [6]:
import torch.nn as nn
import torch.nn.functional as F

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.conv1 = nn.Conv2d(3, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv3 = nn.Conv2d(20, 30, kernel_size=5)
        self.fc1 = nn.Linear(480, 1000)
        self.fc2 = nn.Linear(1000, 17036)
    
    def forward(self, x):
        x = self.conv1(x)
        x = F.max_pool2d(x, 2)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.max_pool2d(x, 2)
        x = F.relu(x)
        x = self.conv3(x)
        x = F.max_pool2d(x, 2)
        x = F.relu(x)
        x = x.view(-1, 480)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

In [7]:
def train(epoche, model, train_loader, optimizer, loss_fn):
    for j in range(0, epoche):
        for i, data in enumerate(train_loader):
            inputs, labels = data
            optimizer.zero_grad()
            inputs = inputs.float()
        
            inputs = inputs.cuda()
            labels = labels.cuda()
            outputs = model(inputs)
        
            loss = loss_fn(outputs, labels)
            loss.backward()
        
            optimizer.step()

In [8]:
import gc

model = NeuralNetwork()
model = model.cuda()

gc.collect()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
loss_fn = nn.CrossEntropyLoss(ignore_index = 1)
train(2, model, train_loader, optimizer, loss_fn)

In [9]:
def is_in_top_30(outputs, target):
    values, indices = torch.topk(outputs, 30)
    for i in indices[0]:
        if(i == target[0]):
            return True
    return False

df_obs_train = pd.read_csv("../data/observations/observations_fr_train.csv", sep=";", index_col="observation_id")
correct = 0
counter = 0
model.eval()

for i, data in enumerate(test_loader):
        inputs, labels = data
        labels = labels.cuda()
        inputs = inputs.float()
        inputs = inputs.cuda()
        outputs = model(inputs)
        counter += 1
       
        if(is_in_top_30(outputs, labels)):
            correct += 1
        
print(correct, " aus " , counter)