In [None]:
import os
import numpy as np
import torch
import torchvision
import torch.nn as nn
from PIL import Image
from pathlib import Path
import tensorflow as tf

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

%matplotlib inline

In [None]:
classes =  ["cardboard", "glass", "metal", "paper", "plastic", "trash"]

In [None]:
#from google.colab import drive
#drive.mount('/content/gdrive')

#!ls gdrive/MyDrive/picketdataset

In [None]:
import pathlib
dataset_url = r".../model/Garbage-classification"
#data_dir = tf.keras.utils.get_file('Garbage-classification', origin=dataset_url, untar=True)
data_dir = pathlib.Path(dataset_url)

In [None]:
image_count = len(list(data_dir.glob('*/*.jpg')))
print(image_count)

In [None]:
batch_size = 32
img_height = 512
img_width = 384
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
  data_dir,
  validation_split=0.2,
  subset="training",
  seed=123,
  image_size=(img_height, img_width),
  batch_size=batch_size)

In [None]:
val_ds = tf.keras.preprocessing.image_dataset_from_directory(
  data_dir,
  validation_split=0.2,
  subset="validation",
  seed=123,
  image_size=(img_height, img_width),
  batch_size=batch_size)

In [None]:
class_names = train_ds.class_names
print(class_names)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 10))
for images, labels in train_ds.take(1):
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(images[i].numpy().astype("uint8"))
    plt.title(class_names[labels[i]])
    plt.axis("off")

In [None]:
for image_batch, labels_batch in train_ds:
  print(image_batch.shape)
  print(labels_batch.shape)
  break

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
normalization_layer = layers.experimental.preprocessing.Rescaling(1./255)


In [None]:
normalized_ds = train_ds.map(lambda x, y: (normalization_layer(x), y))
image_batch, labels_batch = next(iter(normalized_ds))
first_image = image_batch[0]
# Notice the pixels values are now in `[0,1]`.
print(np.min(first_image), np.max(first_image))

In [None]:
num_classes = 6

model = Sequential([
  layers.experimental.preprocessing.Rescaling(1./255, input_shape=(img_height, img_width, 3)),
  layers.Conv2D(16, 3, padding='same', activation='relu'),
  layers.MaxPooling2D(),
  layers.Conv2D(32, 3, padding='same', activation='relu'),
  layers.MaxPooling2D(),
  layers.Conv2D(64, 3, padding='same', activation='relu'),
  layers.MaxPooling2D(),
  layers.Flatten(),
  layers.Dense(128, activation='relu'),
  layers.Dense(num_classes)
])

In [None]:
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
epochs=10
history = model.fit(
  train_ds,
  validation_data=val_ds,
  epochs=epochs
)

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

epochs_range = range(epochs)

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

In [None]:
#dont go further down lol

In [None]:
root_dir = Path("gdrive/MyDrive/picketdataset/Garbage-classification")

training = []
training_ids = []
testing = []
testing_ids = []

for classe in classes:
    class_id = classes.index(classe) #for each in classes list, find the index of it
    files = root_dir.glob("{classe}/*.jpg".format(classe=classe)) #iterate through all class folders to get imgs
    files = sorted(files) #turn them to sorted list
    for file in files[:len(files)*4//5]: #split the data into 4/5 training
        with open(file, 'rb') as f:
            if plt.imread(f).T.shape == (3, 512, 384):
                training.append(plt.imread(f)) #add the image transposed into (384, 512, 3)
                training_ids.append(class_id) #append corresponding picture to training labels

    for file2 in files[len(files)*4//5:]: #split data into 1/5 traing
        with open(file2, 'rb') as f2:
            if plt.imread(f2).T.shape == (3, 512, 384):
                testing.append(plt.imread(f2))
                testing_ids.append(class_id)

In [None]:
print(len(training))
print(len(testing))

In [None]:
#print(training[0].T.shape)

In [None]:
training_files = []
for train_file in training:
  training_files.append(train_file.T)

testing_files = []
for test_file in testing:
  testing_files.append(test_file.T)

In [None]:
#print(len(training_files))
#print(len(testing_files))

In [None]:
training = np.asarray(training_files)
testing = np.asarray(testing_files)

In [None]:
training.shape
testing.shape

In [None]:
print(classes[training_ids[500]])
plt.imshow(training[500])


In [None]:
#np.save("gdrive/MyDrive/picketdataset/trainimgs.npy", training)

#np.save("gdrive/MyDrive/picketdataset/testimgs.npy", testing)

#np.save("gdrive/MyDrive/picketdataset/trainlabels.npy", training_ids)

#np.save("gdrive/MyDrive/picketdataset/testlabels.npy", testing_ids)

In [None]:
training_loaded = np.load("gdrive/MyDrive/picketdataset/trainimgs.npy", mmap_mode="r")

testing_loaded = np.load("gdrive/MyDrive/picketdataset/testimgs.npy", mmap_mode="r")

trainids_loaded = np.load("gdrive/MyDrive/picketdataset/trainlabels.npy", mmap_mode="r")

testids_loaded = np.load("gdrive/MyDrive/picketdataset/testlabels.npy", mmap_mode="r")

In [None]:
training_loaded.shape
testing_loaded.shape

In [None]:
###dont go down

In [None]:
import torch
import torchvision
import torch.nn as nn
import numpy as np

In [None]:
res_model = torchvision.models.resnet18(pretrained=True)
newmodel = torch.nn.Sequential(*(list(res_model.children())[:-1]))
newmodel.eval();

In [None]:
x_std.shape

In [None]:
x_mean = []
x_std = []

for batch_cnt in range(3):
    x_mean.append(np.mean(training_loaded[batch_cnt * 673: (batch_cnt + 1) * 673], axis=0))
    x_std.append(np.std(training_loaded[batch_cnt * 673: (batch_cnt + 1) * 673], axis= 0))
x_mean = np.mean(x_mean, axis=0)
x_std = np.mean(x_std, axis=0)
x_mean = np.mean(x_mean, axis=(1,2))
x_std = np.mean(x_std, axis=(1,2))



class LinearModel(nn.Module):
    """
    Model that converts a shape (N, 512) descriptor vector for N images into a shape (N, 25) 
    array of classification scores for each of the N images.
    """
    def __init__(self):
        super().__init__()
        self.dense1 = nn.Linear(512, 100)
        self.dense2 = nn.Linear(100, 25)
        self.relu = nn.ReLU()
  
    def __call__(self, x):
        """
        Parameters
        ----------
        x: shape (N, 512) descriptor vector for N images. Use resnet to convert images into image descriptors
        
        Returns
        --------
        Classification scores for each of the N images. shape (N, 25)
        """
        out1 = self.relu(self.dense1(x))
        return self.dense2(out1)

In [None]:
model = torchvision.models.resnet18(pretrained=True)
newmodel = torch.nn.Sequential(*(list(model.children())[:-1]))
newmodel.to('cuda')
newmodel.eval()

In [None]:
last_layer = LinearModel()
last_layer.to('cuda')
optim = torch.optim.Adam(last_layer.parameters())

def accuracy(predictions, truth):
    """
    Returns the mean classification accuracy for a batch of predictions.
    
    Parameters
    ----------
    predictions : Union[numpy.ndarray, mg.Tensor], shape=(M, D)
        The scores for D classes, for a batch of M data points
    truth : numpy.ndarray, shape=(M,)
        The true labels for each datum in the batch: each label is an
        integer in [0, D)
    
    Returns
    -------
    float
    """
    return np.mean(np.argmax(predictions, axis=1) == truth) # <COGLINE>

from statistics import mean


In [None]:
training_loaded[batch_indices].shape
def norm(x_arr, mean, std):
    N = len(x_arr)
    print((np.repeat(mean, N)).shape)
    return (x_arr - (np.repeat(mean, N).reshape(N, 3, 1, 1))) / np.repeat(std, N).reshape(N, 3, 1, 1)

from sklearn import preprocessing

In [None]:
##training
batch_size = 10
idxs_y = np.arange(len(testing_loaded))
for epoch_cnt in range(7):
    idxs = np.arange(len(training_loaded))  # -> array([0, 1, ..., 9999])
    np.random.shuffle(idxs)
    accs = []
    print("epoch: " + str(epoch_cnt + 1))
    
    for batch_cnt in range(0, len(training_loaded)//batch_size):
        batch_indices = idxs[batch_cnt*batch_size : (batch_cnt + 1)*batch_size]
        
        batch = torch.tensor(training_loaded[batch_indices], dtype=torch.float).to('cuda')  # random batch of our training data
        truth = torch.tensor(trainids_loaded[batch_indices], dtype=torch.long).to('cuda')
        

        # `model.__call__ is responsible for performing the "forward-pass"
        #print(type(torch.tensor(batch).float()))
        features = newmodel(batch)
        prediction = last_layer(features.reshape(batch_size, 512))
        
        #calculate loss
        loss_function = nn.CrossEntropyLoss()
        loss = loss_function(prediction, truth)
        loss.backward()
        
        # the optimizer is responsible for updating all of the parameters
        optim.step()
        
        # we'll also compute the accuracy of our model as usual
        acc = accuracy(prediction.cpu().detach().numpy(), truth.cpu().detach().numpy()) #take tensor, strip grad, convert numpy
        optim.zero_grad()

        if batch_cnt % 10 == 0:
          print("loss: " + str(loss.item()))
          print("accuracy: " + str(acc))