<a href="https://colab.research.google.com/github/mimikuo365/Bike-Rider-Facilitating-System/blob/master/utils.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
from skimage import io
import tifffile
import numpy as np
from PIL import Image
import os
import matplotlib.pyplot as plt
import matplotlib.patches as patches

In [None]:
# Collect the needed index based on the wanted bands
def get_wanted_image_band(selected_band, img):
  selected_img = np.array(img[:,:,selected_band])
  selected_img = np.reshape(selected_img, (selected_img.shape[0], selected_img.shape[1], 1))
  return selected_img

In [None]:
# Circle the rectangle of the corresponding tile
def draw_rect(ax, xy, tile_height, tile_width):
  rect = patches.Rectangle(xy, tile_width, tile_height, linewidth=1, edgecolor='r', facecolor='none')
  ax.add_patch(rect)

In [None]:
def get_fontsize(tiles_in_band):
  if tiles_in_band <= 16: 
    return 7
  elif tiles_in_band <= 64:
    return 5
  elif tiles_in_band <= 256:
    return 2

In [None]:
def add_text(ax, color, setting, values):
  rx, ry = values['coordinate']
  tile_height, tile_width = setting['tile_height'], setting['tile_width']

  cx = rx + tile_width / 2.0
  cy = ry + tile_height / 2.0

  diff = values['current'] - values['average']
  cur_value = str(int(values['current'])) 
  avg_value = str(int(values['average'])) 
  threshold = str(int(values['threshold']))

  if diff != 0:
    txt = cur_value + '-' + avg_value
    txt += '\n(' + threshold + ')'
  else:
    txt = avg_value
  
  fontsize = get_fontsize(setting['height_split'] * setting['width_split'])
  ax.annotate(txt, (cx, cy), fontsize=fontsize, color=color, ha='center', va='center')

In [None]:
def add_annotation(is_swapped, ax, setting, values):
  color = 'y'
  if is_swapped:
    color = 'r'
    draw_rect(ax, values['coordinate'], setting['tile_height'], setting['tile_width'])
  add_text(ax, color, setting, values)

In [None]:
def check_folder(path):
  # Check whether the specified path exists or not
  isExist = os.path.exists(path)
  # Create a new directory if not exist 
  if not isExist:
    os.makedirs(path)

In [None]:
def calculate_threshold(ratio_ls, origin_threshold_ls):
  updated_threshold_ls = []
  for ratio, orign_threshold in zip(ratio_ls, origin_threshold_ls):
    updated_threshold = ratio * orign_threshold
    updated_threshold_ls.append(updated_threshold)
  return updated_threshold_ls

In [None]:
def smooth_cases(setting, case_ls):
  if not setting['perform_smooth']:
    return case_ls

  new_case_ls = []
  train_num = int(len(case_ls) * setting['train_val_ratio']) + setting['lstm_weeks']
  half_smoothing_length = int(setting['smoothing_length'] / 2)

  for i, case in enumerate(case_ls[:train_num]):
    start_idx, end_idx = 0, (train_num - 1)
    if i >= half_smoothing_length:
      start_idx = i - half_smoothing_length
    if i + half_smoothing_length < train_num:
      end_idx = i + half_smoothing_length
    new_case = np.average(case_ls[start_idx : end_idx + 1])
    new_case_ls.append(new_case)
  
  # append the remaining testing set's value without smoothing
  new_case_ls.extend(case_ls[train_num:])   
  
  return np.array(new_case_ls)

In [None]:
def get_correlation(prediction, real):
  prediction = prediction.flatten()
  real = real.flatten()
  corr = np.corrcoef(prediction, real)
  print(prediction)
  print(real)
  return corr[0][1]

In [None]:
def calculate_group_value(tile_ls, setting):
  group_ls = []
  for row in range(0, setting['height_split'], setting['height_group']):
    for col in range(0, setting['width_split'], setting['width_group']):
      index_in_group = [] # for debug purpose
      tiles_in_group = []
      for r in range(row, row + setting['height_group']):
        for c in range(col, col + setting['width_group']):
          index = r * setting['width_split'] + c
          index_in_group.append(index)
          tiles_in_group.append(tile_ls[index])
      cur_group_value = np.average(tiles_in_group)
      group_ls.append(cur_group_value)
  return group_ls

In [None]:
from operator import xor
def split_train_test(setting, input, output):
  train_num = int(len(output) * setting['train_val_ratio']) + setting['lstm_weeks']
  lstm_weeks = setting['lstm_weeks']
  
  scaler = MinMaxScaler(feature_range=(0, 1)) 
  input = np.array(input)
  X = scaler.fit_transform(input)

  scaler = MinMaxScaler(feature_range=(0, 1)) 
  y = scaler.fit_transform(output)

  X_train = X[:train_num,:]
  X_test = X[train_num - lstm_weeks:,:]
  y_train = y[:train_num]
  y_test = y[train_num - lstm_weeks:]

  return X_train, X_test, y_train, y_test, scaler

In [None]:
def get_avg_img(data_dic, train_num, save_path):
  counter = 0
  all_img = None
  img_ls = data_dic['image'][:train_num]
  class_ls = data_dic['class'][:train_num]
  name_ls = data_dic['epiweek'][:train_num]

  for img, category, name in zip(img_ls, class_ls, name_ls):
    if category != 'good':
      continue
    print(name)
    all_img = img.astype('float64') if all_img is None else (all_img + img.astype('float64'))
    counter += 1

  # Read the training set's average image 
  all_img = all_img.astype('float64')
  avg_img = all_img / counter
  avg_img = avg_img.astype('uint8')

  plt.imshow(avg_img, cmap='gray')
  plt.savefig(save_path, dpi=300)
  print(avg_img.shape)

  return avg_img

In [None]:
def get_row_col_pair(cur_img, setting):
  pair_ls = []
  height, width = cur_img.shape
  tile_height, tile_width = setting['tile_height'], setting['tile_width']

  for r in range(0, height - tile_height + 1, tile_height):
    for c in range(0, width - tile_width + 1, tile_width):
        pair_ls.append([c, r])
  return np.array(pair_ls)

In [None]:
# Split images into tiles by indicating the window size for rows and columns
def split_img_to_tiles(cur_img, setting):
  tile_ls = []
  height, width = cur_img.shape
  tile_height, tile_width = setting['tile_height'], setting['tile_width']

  for r in range(0, height - tile_height + 1, tile_height):
    for c in range(0, width - tile_width + 1, tile_width):
      tile = cur_img[r : r + tile_height, c : c + tile_width]
      tile_avg = round(np.average(tile), 3)
      tile_ls.append(tile_avg)

  return np.array(tile_ls)

In [None]:
def make_prediction(option, title, max_y, min_y, y_train_pred, y_train, y_test_pred, y_test):
  plt.ylim(min_y, max_y)
  if option == "combine":
    y_pred = np.concatenate((y_train_pred, y_test_pred), axis=None)
    y_true = np.concatenate((y_train, y_test), axis=None)
    plt.plot(y_pred, label = 'predicted', color='c')
    plt.plot(y_true, label = 'actual', color='b')
    plt.plot([len(y_train), len(y_train)], [min_y, max_y], color="r")

  else:  
    train_ls = [*range(0, len(y_train), 1)]
    test_ls = [*range(len(y_train), len(y_train) + len(y_test), 1)]

    plt.plot(train_ls, y_train_pred, label = 'train predicted', color=(1, 0, 0, 0.5))
    plt.plot(train_ls, y_train, label = 'train actual', color=(0, 1, 0, 0.5))
    plt.plot(test_ls, y_test_pred, label = 'test predicted', color=(1, 0, 0, 1))
    plt.plot(test_ls, y_test, label = 'test actual', color=(0, 1, 0, 1))

  plt.legend(loc="upper left")
  plt.title(title)
  plt.show()

In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt
from keras import Sequential
from keras.layers import LSTM, Dense
from sklearn.metrics import  mean_absolute_error
from sklearn.preprocessing import MinMaxScaler

def prepare_time_series(data, n_in=1, n_out=1, dropnan=True):
  n_vars = 1 if type(data) is list else data.shape[1]
  df = pd.DataFrame(data)
  cols, names = list(), list()

  # input sequence (t-n, ... t-1)
  for i in range(n_in, 0, -1):
    cols.append(df.shift(i))
    names += [(f'var{j+1}(t-{i})') for j in range(n_vars)]

  # forecast sequence (t, t+1, ... t+n)
  for i in range(0, n_out):
    cols.append(df.shift(-i))
    if i == 0:
      names += [(f'var({j+1})') for j in range(n_vars)]
    else:
      names += [(f'var{j+1}(t+{i})') for j in range(n_vars)]

  agg = pd.concat(cols, axis=1)
  agg.columns = names

  # drop rows with NaN values
  if dropnan:
    agg.dropna(inplace=True)

  return np.array(agg)

def removeLastWeek(data, n_features):
  return data[:,:-n_features]

In [None]:
def run_tile_transformation_on_all(data_dic, setting):
  # Perform image to tile transformation for every image 
  tile_ls = []
  for img in data_dic['image']:
    tiles = split_img_to_tiles(img, setting)
    tile_ls.append(tiles)
  return np.array(tile_ls)

In [None]:
def get_pixels_per_tile(img, height_split, width_split):
  height, width = img.shape
  tile_height = int(height / height_split)
  tile_width = int(width / width_split)
  return tile_height, tile_width

In [None]:
def load_csv_to_dict(filename):
  df = pd.read_csv(filename)
  # Create a dictionary to store wanted columns from CSV
  dic = {
      'epiweek': [],
      'image': [],
      'cases': [],
      'class': []
      }
  
  for index, row in df.iterrows():
    img_loc = row['img_loc']
    img = io.imread(img_loc)

    dic['image'].append(img)
    dic['epiweek'].append(row['epiweek'])
    dic['cases'].append(row['Cases'])
    dic['class'].append(row['class'])

  dic['epiweek'] = np.array(dic['epiweek'])
  dic['image'] = np.array(dic['image'])
  dic['cases'] = np.array(dic['cases'])
  dic['class'] = np.array(dic['class'])

  return dic

In [None]:
def eval_model(X_train, X_test, y_train, y_test, scaler):
  tf.random.set_seed(1)
  callback = X_train.shape[1]
  features_num = X_train.shape[2]

  # design network
  model = Sequential()
  model.add(LSTM(60, input_shape=(callback, features_num), return_sequences=True))
  model.add(LSTM(30))
  model.add(Dense(1))
  model.compile(loss='mae', optimizer='adam')

  history = model.fit(X_train, y_train, epochs=100, verbose=0, batch_size=72, 
                      validation_data=(X_test, y_test))
  
  # plot history
  plt.plot(history.history['loss'], label='train')
  plt.plot(history.history['val_loss'], label='test')
  plt.title('Loss')
  plt.legend()
  plt.show()

  y_train = scaler.inverse_transform(y_train)
  y_test = scaler.inverse_transform(y_test)
  
  max_y = max(max(y_train), max(y_test)) + 5
  min_y = min(min(y_train), min(y_test)) - 5

  # make a prediction
  y_pred = model.predict(X_train)
  y_train_pred = scaler.inverse_transform(y_pred)
  
  y_pred = model.predict(X_test)
  y_test_pred = scaler.inverse_transform(y_pred)

  title = 'Prediction'
  make_prediction("combine", title, max_y, min_y, y_train_pred, y_train, y_test_pred, y_test)
  
  
  mae = mean_absolute_error(y_test, y_test_pred)
  corr = get_correlation(y_test_pred, y_test)
  print('MAE: %.2f' % mae)
  print('Correlation: %.2f' % corr)

In [None]:
def tile_swapping(tile_info, img_info=None, setting=None):
  updated_tiles = tile_info['current'].copy()
  swapped_counter = 0
  swapped_index = []
  tile_num = len(tile_info['threshold'])
  
  for i in range(tile_num):
    threshold = tile_info['threshold'][i]
    cur_tile_value = tile_info['current'][i]
    avg_tile_value = tile_info['average'][i]
    diff = np.absolute(cur_tile_value - avg_tile_value)
    
    if diff > threshold:
      swapped_index.append(i)
      updated_tiles[i] = avg_tile_value
      swapped_counter += 1

  ratio = swapped_counter / tile_num
  if img_info != None:
    select_mask_region(img_info, tile_info, swapped_index, setting)
  
  return updated_tiles, ratio

In [None]:
def tile_swapping_and_grouping(tile_ls, data_dic, threshold_ls, avg_img_tiles, setting):
  preprocessed_tile_ls, swapped_ratio_ls = [], []
  data_dic['swap ratio'] = []
  data_dic['predicted_class'] = []
  for i, cur_img_tiles in enumerate(tile_ls):
    img = data_dic['image'][i]
    epiweek = data_dic['epiweek'][i]

    print(epiweek)
    tile_info = {
        'threshold': threshold_ls,
        'average': avg_img_tiles,
        'current': cur_img_tiles
    }

    img_info = {
        'image': img,
        'epiweek': str(epiweek),
        'path': setting['tile_swap_result_folder']
    }

    # swapping
    if setting['run_all_img_and_save_result']:
      updated_img_tiles, ratio = tile_swapping(tile_info, img_info, setting)
    else:
      updated_img_tiles, ratio = tile_swapping(tile_info)

    # save prediction results
    data_dic['swap ratio'].append(ratio)
    if ratio > 0.5:
      data_dic['predicted_class'].append('bad')
    elif ratio > 0.1:
      data_dic['predicted_class'].append('normal')
    else:
      data_dic['predicted_class'].append('good')

    # grouping
    group_avg = calculate_group_value(updated_img_tiles, setting)
    preprocessed_tile_ls.append(group_avg)
    swapped_ratio_ls.append(ratio)

  print('Swap ratio:', round(np.average(swapped_ratio_ls), 3))
  return np.array(preprocessed_tile_ls)

In [None]:
def select_mask_region(img_info, tile_info, swapped_index, setting):
  fig, ax = plt.subplots(constrained_layout=True)
  ax.axis('off')
  plt.imshow(img_info['image'], cmap='gray')
  tile_num = setting['height_split'] * setting['width_split']

  for i in range(tile_num):
    values = {
      'coordinate': setting['row_col_pairs'][i],
      'current': tile_info['current'][i],
      'average': tile_info['average'][i],
      'threshold': tile_info['threshold'][i]
    }

    if i in swapped_index:
      add_annotation(True, ax, setting, values)
    else:
      add_annotation(False, ax, setting, values)

  plt.title(img_info['epiweek'] + ' (' + str(len(swapped_index)) + ')')
  location = os.path.join(img_info['path'], img_info['epiweek'] + '.png')
  print(location)
  plt.savefig(location, dpi=300)
  plt.close()

In [None]:
def prepare_XY_for_lstm(preprocessed_tile_ls, case_ls, setting):
  # print(case_ls.shape) # (156,)
  reshaped_case_ls = case_ls.reshape(-1, 1)
  # print(reshaped_case_ls.shape) #(156, 1)

  X_train, X_test, y_train, y_test, scaler = split_train_test(setting, preprocessed_tile_ls, reshaped_case_ls)
  # print(X_train.shape, X_test.shape, y_train.shape, y_test.shape) # (134, 256) (32, 256) (134, 1) (32, 1)

  X_train_series = prepare_time_series(X_train, n_in=setting['lstm_weeks'])
  X_test_series = prepare_time_series(X_test, n_in=setting['lstm_weeks'])
  # print(X_train_series.shape, X_test_series.shape) # (124, 2816) (22, 2816)

  n_features = X_train.shape[1]
  X_train_series_clean = removeLastWeek(X_train_series, n_features)
  X_test_series_clean = removeLastWeek(X_test_series, n_features)
  # print(X_train_series_clean.shape, X_test_series_clean.shape) # (124, 2560) (22, 2560)

  final_X_train = X_train_series_clean.reshape((-1, setting['lstm_weeks'], n_features))
  final_X_test = X_test_series_clean.reshape((-1, setting['lstm_weeks'], n_features))
  # print(final_X_train.shape, final_X_test.shape) # (124, 10, 256) (22, 10, 256)

  final_y_train = y_train[setting['lstm_weeks']:]
  final_y_test = y_test[setting['lstm_weeks']:]
  # print('Final output:', final_y_train.shape, final_y_test.shape) # (124, 1) (22, 1)

  return final_X_train, final_X_test, final_y_train, final_y_test, scaler