## Libraries Import

In [1]:
## Imports
import os 
import time

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential

from tensorflow.keras.layers import Dense, Input, concatenate, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.optimizers import Adam

import scipy
import cv2
import matplotlib.pyplot as plt

from PIL import Image

# Dataset

## Images

In [2]:
all_images = []
all_indexes = []
folder = "dataset/nft images2/"
for filename in os.listdir(folder):
    img = cv2.imread(os.path.join(folder,filename))
    if img is not None:
        all_indexes.append(filename.strip(".jpg"))
        im_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        res = cv2.resize(im_rgb, dsize=(512, 512), interpolation=cv2.INTER_CUBIC)
        all_images.append(res)
        
all_indexes = [int(x) for x in all_indexes]

## Labels

In [3]:
nft_data = pd.read_excel("dataset/nft_sales.xlsx", engine='openpyxl')
event_total_price = nft_data['event_total_price'][all_indexes]
event_total_price = np.array(event_total_price)

labels = []
for price in event_total_price:
    if price >= 0.5:
        labels.append(1)
    elif price <= 0.1:
        labels.append(3)
    else:
        labels.append(2)

# Split, normalize and one hot encode data

In [4]:
n_classes = 3
class_labels = list(range(n_classes))

def one_hot_encoder(y):
    nb_classes = len(class_labels)
    nb_samples = len(y)
    one_hot_y = np.zeros((nb_samples, nb_classes), dtype=bool)
    for i in range(0,nb_samples):
        j = int(y[i]-1)
        one_hot_y[i, j] = 1
        
    return one_hot_y

def one_hot_decoder(one_hot_y):
    nb_samples, nb_classes = one_hot_y.shape
    y = np.zeros((nb_samples))
    for i in range(0,nb_samples):
        y[i] = np.where(one_hot_y[i]==1)[0][0] +1
    return y

def normalize_prediction(y_oh_pred):
    nb_samples, nb_classes = y_oh_pred.shape
    y_oh_pred_norm = np.zeros((nb_samples, nb_classes))
    for i in range(0,nb_samples):
        max_pred = np.max(y_oh_pred[i,:])
        j = np.where(y_oh_pred[i]==max_pred)[0][0]
        y_oh_pred_norm[i, j] = 1
    return y_oh_pred_norm

def get_acc(y_ts, y_pred):
    confusion_matrix = np.zeros((len(class_labels), len(class_labels)))
    nb_samples = len(y_ts)
    
    for i in range(0, nb_samples) : 
        actual_class = int(y_ts[i]) - 1
        predicted_class = int(y_pred[i]) - 1

        confusion_matrix[ predicted_class , actual_class ] = confusion_matrix[ predicted_class , actual_class ] + 1
        
    acc_rep = {}
    oa_acc = 0
    aa_acc = 0
    acc_rep["OA"]= 0
    acc_rep["AA"]= 0
    for class_ in range(0, len(class_labels)) :
        if confusion_matrix[class_, class_] == 0:
            acc = 0
        else:
            acc = confusion_matrix[class_,class_]/np.sum(confusion_matrix[:,class_]) * 100

        acc_rep["class " + str(class_ + 1)] = acc
        aa_acc+=acc
        oa_acc+=confusion_matrix[class_,class_]
    acc_rep["OA"]= oa_acc/np.sum(confusion_matrix[:,:])*100
    acc_rep["AA"]= aa_acc/len(class_labels)
    
    return acc_rep

In [5]:
X_train, X_test, y_train, y_test = train_test_split(all_images, labels, test_size=0.33, random_state=42)

X_train = np.array(X_train)
X_test = np.array(X_test)
y_train = np.array(y_train)
y_test = np.array(y_test)

X_train = X_train/255.0
X_test = X_test/255.0

In [6]:
y_train_OH = one_hot_encoder(y_train)
y_test_OH = one_hot_encoder(y_test)

# Building the CNN

In [7]:
N_input = X_train[0].shape
N_output =  len(class_labels)

model = Sequential()
model.add(Input(shape=N_input))
model.add(Conv2D(32, (3,3), activation='relu'))
model.add(MaxPooling2D(2,2))
model.add(Conv2D(16, (3,3), activation='relu'))
model.add(MaxPooling2D(2,2))
model.add(Conv2D(8, (3,3), activation='relu'))
model.add(MaxPooling2D(2,2))
model.add(Conv2D(4, (3,3), activation='relu'))
model.add(MaxPooling2D(2,2))
# flatten output of conv
model.add(Flatten())
# hidden layer
model.add(Dense(100, activation='relu'))
model.add(Dense(50, activation='relu'))

# output layer
model.add(Dense(N_output, activation='softmax'))

model.compile(loss='categorical_crossentropy', metrics=['accuracy'], optimizer='adam')

model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d (Conv2D)              (None, 510, 510, 32)      896       
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 255, 255, 32)      0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 253, 253, 16)      4624      
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 126, 126, 16)      0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 124, 124, 8)       1160      
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 62, 62, 8)         0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 60, 60, 4)         2

In [None]:
model.fit(X_train, y_train_OH, batch_size=64, epochs=30)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30

# Test

In [None]:
y_pred_cnn = model.predict(y_test_OH)
y_pred_OH = normalize_prediction(y_pred_cnn)
y_pred = one_hot_decoder(y_pred_OH)
acc = get_acc(test_mask, y_pred)
acc