In [None]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.io as spio
import os
import cv2

import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Activation, Dense, Dropout, Conv2D, MaxPooling2D, AveragePooling2D, Flatten, BatchNormalization
from matplotlib import image, pyplot
from sklearn.metrics import mean_squared_error
from scipy import stats
from scipy.stats import pearsonr
from sklearn.model_selection import train_test_split

In [None]:
def createTrainingData():
  for category in CATEGORIES:
    path = os.path.join(path_test, category)
    class_num = CATEGORIES.index(category)
    for img in os.listdir(path):
      img_array = cv2.imread(os.path.join(path,img))
      new_array = cv2.resize(img_array, (IMG_SIZE, IMG_SIZE))
      training.append([new_array, CAT[class_num]])

IMG_SIZE=400
IMG_CHANNELS = 3
CATEGORIES = ["0","20","50","80","100","150","200","300","400"]
CAT = [0,4.54,11.31,18.05,22.52,33.63,44.64,66.37,87.72]

training = []
path_test = "C:/Users/User/Desktop/Rivendell/bulb/const"
createTrainingData()
path_test = "C:/Users/User/Desktop/Rivendell/sun/const"
createTrainingData()

In [None]:
train_val_img, test_img = train_test_split(training, test_size=40) # 50
train_img, val_img = train_test_split(train_val_img, test_size=40) # 30

In [None]:
print(np.array(training, dtype=object).shape[0])

In [None]:
print(np.array(train_img, dtype=object).shape[0])

In [None]:
print(np.array(val_img, dtype=object).shape)

In [None]:
print(np.array(test_img, dtype=object).shape)

In [None]:
print(np.array(train_img, dtype=object)[:,1])

In [None]:
a = 1
h = 400
im_samples = 512
im_size = 50
#im_rep = 11
def create_images(im_names):
  for i in range(np.array(im_names, dtype=object).shape[0]):
    im = im_names[i][0]
    conc = im_names[i][1]
    for _ in range(im_samples):
      ix = np.random.randint(low=a, high=h-im_size)
      iy = np.random.randint(low=a, high=h-im_size)
      yield (np.array(im[iy:iy+im_size, ix:ix+im_size, :]) / 128.)-1., np.array([conc]) # np.array([conc / 50.])

def samples_count(im_names):
  return len(im_names) * im_samples

In [None]:
samples_count(train_img)

In [None]:
samples_count(val_img)

In [None]:
samples_count(test_img)

In [None]:
train_dataset = tf.data.Dataset.from_generator(
    lambda: create_images(train_img),
    output_signature=(tf.TensorSpec((im_size, im_size, 3), dtype="float32"), tf.TensorSpec((1), dtype="float32"))
)
val_dataset = tf.data.Dataset.from_generator(
    lambda: create_images(val_img),
    output_signature=(tf.TensorSpec((im_size, im_size, 3), dtype="float32"), tf.TensorSpec((1), dtype="float32"))
)
test_dataset = tf.data.Dataset.from_generator(
    lambda: create_images(test_img),
    output_signature=(tf.TensorSpec((im_size, im_size, 3), dtype="float32"), tf.TensorSpec((1), dtype="float32"))
)

In [None]:
def new_loss(y_true, y_pred):
  y_true = y_true
  y_pred = y_pred
  return tf.keras.losses.mean_absolute_error(y_true, y_pred) + tf.keras.losses.mean_squared_error(y_true, y_pred)

def loss_class(y_true, y_pred):
  y_true_idx = tf.cast(y_true, tf.int64)
  return tf.keras.losses.sparse_categorical_crossentropy(y_true_idx, y_pred, from_logits=True)

In [None]:
inp = keras.layers.Input(shape=(im_size, im_size, 3))  

x = keras.layers.Conv2D(32, (1, 1), padding="valid", kernel_regularizer=keras.regularizers.L2(0.0001))(inp)
x = keras.layers.Activation(activation='gelu')(x)

x = keras.layers.Conv2D(64, (1, 1), padding="valid", kernel_regularizer=keras.regularizers.L2(0.0001))(x)
x = keras.layers.Activation(activation='gelu')(x)
x = keras.layers.AveragePooling2D(pool_size=(5, 5), strides=(1, 1), padding='valid')(x)

x = keras.layers.LayerNormalization()(x) 

x = keras.layers.Conv2D(64, (1, 1), padding="valid", kernel_regularizer=keras.regularizers.L2(0.0001))(x)
x = keras.layers.Activation(activation='gelu')(x)
x = keras.layers.AveragePooling2D(pool_size=(3, 3), strides=(1, 1), padding='valid')(x) 

#x = keras.layers.LayerNormalization()(x) 
x = keras.layers.GlobalMaxPool2D()(x) 

x = keras.layers.Flatten()(x)
x = keras.layers.Dense(150, activation='gelu', kernel_regularizer=keras.regularizers.L2(0.0001))(x) 
x = keras.layers.Dropout(0.1)(x) # MJ było 0.3
x = keras.layers.Dense(50, activation='gelu', kernel_regularizer=keras.regularizers.L2(0.0001))(x)
x = keras.layers.Dense(1, activation = 'linear')(x)

model_o_m = keras.Model(inp, x)
model_o_m.compile(loss=new_loss, optimizer=keras.optimizers.Adam(learning_rate=0.001, beta_1=0.7, beta_2=0.9), metrics=[]) 
#metrics=['mae', 'mse', 'acc']

In [None]:
model_o_m.summary()

In [None]:
bsize = 30
train_batches = train_dataset.repeat().shuffle(samples_count(train_img)).batch(bsize).prefetch(buffer_size=tf.data.AUTOTUNE)
val_batches = val_dataset.repeat().shuffle(samples_count(val_img)).batch(bsize).prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
# training the model AA gener

lr_drop = keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    patience=5,
    factor=0.1,
)

history = model_o_m.fit(
    train_batches, 
    validation_data=val_batches, 
    steps_per_epoch=samples_count(train_img) // bsize,
    validation_steps=samples_count(val_img) // bsize,
    epochs=20,
    callbacks=[lr_drop]
)

In [None]:
plt.plot(history.history['val_loss'])
plt.plot(history.history['loss'])
plt.ylim(0, 50)
plt.title('Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['val_loss', 'loss'], loc = 'upper right')
plt.show()

In [None]:
from scipy.stats import pearsonr

print('TRAIN SET');
XX = []
YY = []
for xx, yy in train_dataset:
    XX.append(xx)
    YY.append(yy)
XX = np.array(XX)
YY = np.array(YY)


pred_val = model_o_m.predict(XX)

s1 = 512 # tyle obrazków z jednego zdjęcia
s2 = 118 # 19
YY1 = np.reshape(YY, (s1, s2, 1), order='F')
YY2 = np.mean(YY1, axis=0)

pred_val1 = np.reshape(pred_val, (s1, s2), order='F')
pred_val2 = np.mean(pred_val1, axis=0)

print('RMSEC:', np.sqrt(mean_squared_error(YY2[:, 0], pred_val2)))

corr,_ = pearsonr(YY2[:,0],pred_val2)
print('R^2 test:', corr**2)

m, b = np.polyfit(YY[:,0], pred_val, 1)
print('Slope:' , m) 
print('Intercept:', b) 

fig, ax = plt.subplots()
ax.scatter(YY2[:, 0], pred_val2[:])
ax.plot([YY2.min(), YY2.max()], [YY2.min(), YY2.max()], 'r-', lw=1)
plt.xlabel('original [μg/ml]') 
plt.ylabel('predicted [μg/ml]')
plt.grid()
plt.show()

In [None]:
print('VALIDATION SET');
XX = []
YY = []
for xx, yy in val_dataset:
    XX.append(xx)
    YY.append(yy)
XX = np.array(XX)
YY = np.array(YY)

pred_val = model_o_m.predict(XX)

s1 = 512
s2 = 40 #30
YY1 = np.reshape(YY, (s1, s2, 1), order='F')
YY2 = np.mean(YY1, axis=0)

pred_val1 = np.reshape(pred_val, (s1, s2), order='F')
pred_val2 = np.mean(pred_val1, axis=0)

print('RMSECV:', np.sqrt(mean_squared_error(YY2[:, 0], pred_val2)))
corr,_ = pearsonr(YY2[:,0],pred_val2)
print('R^2 test:', corr**2)

m, b = np.polyfit(YY[:,0], pred_val, 1)
print('Slope:' , m) 
print('Intercept:', b) 

fig, ax = plt.subplots()
ax.scatter(YY2[:, 0], pred_val2[:])
ax.plot([YY2.min(), YY2.max()], [YY2.min(), YY2.max()], 'r-', lw=1)
plt.xlabel('original [μg/ml]') 
plt.ylabel('predicted [μg/ml]')
plt.grid()
plt.show()

In [None]:
print('TEST SET');
XXt = []
YYt = []
for xx, yy in test_dataset:
    XXt.append(xx)
    YYt.append(yy)
XXt = np.array(XXt)
YYt = np.array(YYt)

pred_test= model_o_m.predict(XXt)

s1 = 512
s3 = 40 # 50
YYt1 = np.reshape(YYt, (s1, s3, 1), order='F')
YYt2 = np.mean(YYt1, axis=0)

pred_test1 = np.reshape(pred_test, (s1, s3), order='F')
pred_test2 = np.mean(pred_test1, axis=0)

print('RMSEP:', np.sqrt(mean_squared_error(YYt2[:, 0], pred_test2)))
corr,_ = pearsonr(YYt2[:,0],pred_test2)
print('R^2 test:', corr**2)

m, b = np.polyfit(YYt[:,0], pred_test, 1)
print('Slope:' , m) 
print('Intercept:', b) 

fig, ax = plt.subplots()
ax.scatter(YYt2[:, 0], pred_test2)
ax.plot([YYt2.min(), YYt2.max()], [YYt2.min(), YYt2.max()], 'r-', lw=1)
plt.xlabel('original [μg/ml]') 
plt.ylabel('predicted [μg/ml]')
plt.grid()
plt.show()