<a href="https://colab.research.google.com/github/omerhac/arc_challenge/blob/master/preprocess.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import tensorflow as tf
import json
from google.cloud import storage
from matplotlib import pyplot as plt
from matplotlib import colors
from tensorflow.keras.utils import plot_model
from tensorflow.keras.layers import Conv2D, Lambda, Dense, Flatten, MaxPool2D, Input, BatchNormalization, Conv2DTranspose, UpSampling2D, Reshape
from sklearn.preprocessing import OneHotEncoder

# Data loading


In [None]:

def load_data():
  """
  Loads all the data into training_tasks, eval_tasks and test_tasks
  """

  ## get paths
  GCS_PATH = "gs://kds-d3cfb3d523ca35d2517017a78110126404d01fdea69417ce49950459"
  training_filenames = tf.io.gfile.glob(GCS_PATH + "/training/*")
  test_filenames = tf.io.gfile.glob(GCS_PATH + "/test/*")
  eval_filenames = tf.io.gfile.glob(GCS_PATH + "/evaluation/*")

  # create datasets with filenames
  training_dataset = tf.data.Dataset.list_files(training_filenames)
  eval_dataset = tf.data.Dataset.list_files(eval_filenames)
  test_dataset = tf.data.Dataset.list_files(test_filenames)

  # load the jsons
  def load_task(filename):
    task_json = tf.io.read_file(filename)
    return task_json

  training_dataset = training_dataset.map(load_task)
  eval_dataset = eval_dataset.map(load_task)
  test_dataset = test_dataset.map(load_task)

  training_dataset_numpy = tf.data.Dataset.as_numpy_iterator(training_dataset) # convert to numpy iterator
  eval_dataset_numpy = tf.data.Dataset.as_numpy_iterator(eval_dataset)
  test_dataset_numpy = tf.data.Dataset.as_numpy_iterator(test_dataset)

  ## create a numpy array of tasks (n_tasks, )
  def list_from_jsons(jsons_numpy_iterator):
    """
      Create a list of task dictionaries from jsons numpy interator
    """

    tasks = []
    for task in jsons_numpy_iterator:
      tasks.append(json.loads(task))

    return tasks

  ## get numpy arrays of datasets
  training_tasks = list_from_jsons(training_dataset_numpy)
  eval_tasks = list_from_jsons(eval_dataset_numpy)
  test_tasks = list_from_jsons(test_dataset_numpy)

  return training_tasks, eval_tasks, test_tasks


def load_data_from_jsons():
  """
  Load tasks from jsons to lists of tasks.

  Returns:
  training_tasks, eval_tasks, test_tasks --> lists of tasks
  """

  with open("training_tasks.json", 'r') as f:
    training_tasks = json.load(f)

  with open("eval_tasks.json", 'r') as f:
    eval_tasks = json.load(f)
  
  with open("test_tasks.json", 'r') as f:
    test_tasks = json.load(f)

  return training_tasks, eval_tasks, test_tasks

# Utils

In [1]:
def plot_board(board, ax, title=""):
  """
  Plot a board on a given axis
  """
  cmap = colors.ListedColormap(
      ['#000000', '#0074D9','#FF4136','#2ECC40','#FFDC00',
        '#AAAAAA', '#F012BE', '#FF851B', '#7FDBFF', '#870C25'])
  norm = colors.Normalize(vmin=0, vmax=9)
  
  ax.imshow(board, cmap=cmap, norm=norm)
  ax.grid(True,which='both',color='lightgrey', linewidth=0.5)    
  ax.set_yticks([x-0.5 for x in range(1+board.shape[0])])
  ax.set_xticks([x-0.5 for x in range(1+board.shape[1])])     
  ax.set_xticklabels([])
  ax.set_yticklabels([])
  ax.set_title(title)

def plot_one(task, ax, i,train_or_test,input_or_output):
  """
  Plot one task on a given axis
  """
  cmap = colors.ListedColormap(
      ['#000000', '#0074D9','#FF4136','#2ECC40','#FFDC00',
        '#AAAAAA', '#F012BE', '#FF851B', '#7FDBFF', '#870C25'])
  norm = colors.Normalize(vmin=0, vmax=9)
  
  input_matrix = task[train_or_test][i][input_or_output]
  ax.imshow(input_matrix, cmap=cmap, norm=norm)
  ax.grid(True,which='both',color='lightgrey', linewidth=0.5)    
  ax.set_yticks([x-0.5 for x in range(1+len(input_matrix))])
  ax.set_xticks([x-0.5 for x in range(1+len(input_matrix[0]))])     
  ax.set_xticklabels([])
  ax.set_yticklabels([])
  ax.set_title(train_or_test + ' '+input_or_output)
    

def plot_task(task):
    """
    Plots the first train and test pairs of a specified task,
    using same color scheme as the ARC app
    """    
    num_train = len(task['train'])
    fig, axs = plt.subplots(2, num_train, figsize=(3*num_train,3*2))
    for i in range(num_train):     
        plot_one(task, axs[0,i],i,'train','input')
        plot_one(task, axs[1,i],i,'train','output')        
    plt.tight_layout()
    plt.show()        
        
    num_test = len(task['test'])
    fig, axs = plt.subplots(2, num_test, figsize=(3*num_test,3*2))
    if num_test==1: 
        plot_one(task, axs[0],0,'test','input')
        plot_one(task, axs[1],0,'test','output')     
    else:
        for i in range(num_test):      
            plot_one(task, axs[0,i],i,'test','input')
            plot_one(task, axs[1,i],i,'test','output')  
    plt.tight_layout()
    plt.show() 
  

def plot_board_pairs(board_pairs, labels):
  """
  Plots the board pairs (for siamese networks) with their label as a title
  """

  fig, axs = plt.subplots(len(board_pairs), 2, figsize=(8, 3 * len(board_pairs)))
  
  for i, pair in enumerate(board_pairs):
    # plot a pair on a given axis
    plot_board(pair[0], axs[i, 0], title="anchor") 
    plot_board(pair[1], axs[i, 1], title=labels[i])
  
  plt.tight_layout()
  plt.show()


def plot_decoder_boards(board_pairs):
  """
  Plots the board pairs (for siamese networks) with their label as a title
  """

  fig, axs = plt.subplots(len(board_pairs), 2, figsize=(8, 3 * len(board_pairs)))

  for i, pair in enumerate(board_pairs):
    # plot a pair on a given axis
    plot_board(pair[0], axs[i, 0]) 
    plot_board(pair[1], axs[i, 1])

  plt.tight_layout()
  plt.show()


def display_training_curves(hist, metric='accuracy', with_val=False):
  """display learning curves for keras history dict, args: history dict, with val --> boolean with/without val"""
  plt.figure(figsize=(18,6))

  # accuracy plots
  plt.subplot(1,2,1)
  plt.plot(hist[metric])
  
  if with_val:
    plt.plot(hist['val_' + metric])
    plt.legend(['Train', 'Validation'])
  
  else:
    plt.legend(['Train'])
  
  plt.title('Model accuracy')
  plt.xlabel('EPOCH')
  plt.ylabel('Accuracy')

  # loss plots
  plt.subplot(1,2,2)
  plt.plot(hist['loss'])

  if with_val:
    plt.plot(hist['val_loss'])
    plt.legend(['Train loss', 'Val loss'])
  
  else:
    plt.legend(['Train loss'])
  
  plt.title('Model loss')
  plt.xlabel('EPOCH')
  plt.ylabel('Loss')
  plt.show()

# Data generating funcitons

In [None]:
def get_dataset_shapes(dataset):
  """
  Returns dataset board shapes. 
  """

  shape_0 = []
  shape_1 = []
  
  # check every task
  for task in dataset:
    boards = get_task_boards(task) # get all boards
    shape_0 += [board.shape[0] for board in boards]
    shape_1 += [board.shape[1] for board in boards]
  
  return shape_0, shape_1

def get_task_boards(task, threshold_shape=(40, 40), pad=None, test=False, divide_sets=False):
  """
  Get the training / testing boards of every example in a specific task. 

  Args: threshold_shape --> threshold shape for which 
  biggger samples won't be returned. 
  pad --> functional padding function to pad boards with (up tp threshold_size).
  test --> bool, whether a test task or not. for output of task example.
  divide_sets --> bool, whether to divide boards sets to training - input / output, test - input / output.
  """
  
  training_input_boards, training_output_boards, test_input_boards, test_output_boards = [], [], [], []

  # train boards
  for example in task['train']:
    input_board = np.array(example['input'])
    output_board = np.array(example['output'])

    if((input_board.shape[0] < threshold_shape[0]) and (input_board.shape[1] < threshold_shape[1])):
      training_input_boards.append(pad(input_board, output_shape=threshold_shape) if pad else input_board) # check for padding func

    if((output_board.shape[0] < threshold_shape[0]) and (output_board.shape[1] < threshold_shape[1])):
      training_output_boards.append(pad(output_board, output_shape=threshold_shape) if pad else output_board) # check for padding func

  # test boards
  for example in task['test']:
    input_board = np.array(example['input'])

    if not test: # check if test example
      output_board = np.array(example['output'])
    
    # check whether the board is smaller then threshold shape
    if((input_board.shape[0] < threshold_shape[0]) and (input_board.shape[1] < threshold_shape[1])):
      test_input_boards.append(pad(input_board, output_shape=threshold_shape) if pad else input_board) # check for padding func

    if((output_board.shape[0] < threshold_shape[0]) and (output_board.shape[1] < threshold_shape[1]) and (not test)):
      test_output_boards.append(pad(output_board, output_shape=threshold_shape) if pad else output_board) # check for padding func
  
  # whether to divide boards:
  if divide_sets:
    return training_input_boards, training_output_boards, test_input_boards, test_output_boards
  else:
    return training_input_boards + training_output_boards + test_input_boards + test_output_boards

def pad(mat, output_shape, padder=0):
  """
  Pad a matrix with padder up to output_shape. Insert matrix at upper left corner.
  
  Args:
  mat - np.array matrix of rank 2
  output_shape - tuple 
  padder - int
  """

  output_board = np.zeros(shape=output_shape) + padder # create output board and pad it

  # get input shape
  input_rows = mat.shape[0]
  input_cols = mat.shape[1]

  # if random=False, insert input matrix in upper left corner
  output_board[:input_rows, :input_cols] = mat
  return output_board

def random_pad(mat, output_shape, padder=0):
  """
  Pad a matrix with padder up to output_shape. Insert the matrix at a random location
  
  Args:
  mat - np.array matrix of rank 2
  output_shape - tuple 
  padder - int
  seed - int
  """

  output_board = np.zeros(shape=output_shape) + padder # create output board and pad it

  # get input shape
  input_rows = mat.shape[0]
  input_cols = mat.shape[1]

  # insert mat at a random loacation
  # get random location
  start_row = np.random.randint(output_shape[0] - input_rows)
  start_col = np.random.randint(output_shape[1] - input_cols)
  # insert
  output_board[start_row:start_row+input_rows, start_col:start_col+input_cols] = mat

  return output_board

def get_all_boards(training_tasks, eval_tasks, test_tasks):
  """
  Extracts all the boards from all training/eval/testing tasks. 
  """

  training_boards = []
  for task in training_tasks:
    training_boards += get_task_boards(task,threshold_shape=BOARD_SIZE, pad=pad)

    # augment training set (random pad)
    training_boards += get_task_boards(task,threshold_shape=BOARD_SIZE, pad=random_pad)
    training_boards += get_task_boards(task,threshold_shape=BOARD_SIZE, pad=random_pad)

  eval_boards = []
  for task in eval_tasks:
    eval_boards += get_task_boards(task,threshold_shape=BOARD_SIZE, pad=pad)

    # augment eval set (random pad)
    eval_boards += get_task_boards(task,threshold_shape=BOARD_SIZE, pad=random_pad)
    eval_boards += get_task_boards(task,threshold_shape=BOARD_SIZE, pad=random_pad)
    

  test_boards = []
  for task in test_tasks:
    test_boards += get_task_boards(task,threshold_shape=BOARD_SIZE, pad=pad, test=True)

    # augment test set (random pad)
    test_boards += get_task_boards(task,threshold_shape=BOARD_SIZE, pad=random_pad, test=True)
    test_boards += get_task_boards(task,threshold_shape=BOARD_SIZE, pad=random_pad, test=True)

  all_boards = training_boards+eval_boards+test_boards
  return all_boards



# Data augmentation


In [None]:
def get_rotated_views(board):
  """
  Turns a board 90 deg counter clockwise 3 times. Returns a list length 4 with all the rotated views including the original one.
  """

  rotates = [board]
  for i in range(3):
    board = np.rot90(board)
    rotates.append(board)
  
  return rotates

def get_rotated_data_pairs(rotated_views):
  """
  Creates all the possible pairs out of the rotated views tensor and labels them. If a1..a4 are rotated vies (A.C.W) of a1, returns labels according to:
   - (a1, a1) = 1 --> same board
   - (a1, a2) = 2 --> 90 a.c.w rotate
   - (a1, a3) = 3 --> 180 rotate
   - (a1, a4) = 4 --> 270 a.c.w rotate
   - ...
   - (a3, a4) = 2 --> 90 a.c.w rotate

  Args:
  rotated_views = a list of 4 rotated views of the board

  Returns:
  pairs: a list of tuples of boards
  labels: a list of labels matching each pair
  """

  # stopping rule
  if rotated_views == []:
    return [], []

  anchor = rotated_views[0]  # select board to compare with 
  pairs, labels = [], []
  label = 1 # init label

  # iterate over all remaining examples
  for view in rotated_views:
    pairs.append((anchor, view))
    labels.append(label)
    label += 1 # update label, views rotate a.c.w
  
  next_pairs, next_labels = get_rotated_data_pairs(rotated_views[1:]) # recursive call

  return pairs + next_pairs, labels + next_labels

def get_binary_board(board):
  """
  Returns a binary board. Every non 0 value becomes 1. 0 stays 0.
  """

  return (board != 0).astype('int32')


# Dataset creating funcitons

In [None]:
def model_shape_board(board):
  """
  Reshapes board for model digestion. shape [*BOARD_SIZE, 1]
  """
  return board.reshape([*BOARD_SIZE, 1])

def plotting_shape_board(board):
  """
  Reshapes the board for plotting. shape [*BOARD_SIZE]
  """
  return board.reshape([*BOARD_SIZE])

def get_all_pairs_reshaped(board, all_boards):
  """
  Creates a list of pairs of data for the board and reshapes the boards to shape [*BOARD_SHAPE, 1] for nn digestion. For all the possible rotates and a false match

  Args:
  board - np.array
  all_boards - all the other boards on the dataset not including this one

  Returns:
  a list of 5 tuples, 4 for the rotation part and 1 for a false match
  """

  rotated_views = get_rotated_views(board)
  rotated_pairs, rotated_labels = get_rotated_data_pairs(rotated_views)
  rotated_pairs = [(model_shape_board(t[0]), model_shape_board(t[1])) for t in rotated_pairs] # reshape boards

  # create false match pair
  different_board = all_boards[np.random.randint(len(all_boards))] # generate random example
  false_label = 0

  return rotated_pairs + [[board.reshape([*BOARD_SIZE, 1]), different_board.reshape([*BOARD_SIZE, 1])]], rotated_labels + [false_label]

def get_dataset_from_lists(pair_list, label_list, for_encoder=False):
  """
  Creates an x,y dataset from a list of pairs and a list of labels. One hot encode labels

  Args:
  pair list: a list of pairs of images
  label list: a list of ints - labels
  for_encoder: bool - determains if the dataset if for the encoder

  Retrurns:
  x - [x1, x2] a list of two numpy array, each of shape [n_samples, *BOARD_SHAPE, 1] containing stacked inputs of one of the siamese twins OR only x1 if for encoder
  y - one hot labels, only for siamese networks
  """

  # prepare x
  pairs_stacked = np.stack(pair_list)
  x1 = pairs_stacked[:,0,:,:] # get one column of inputs
  x2 = pairs_stacked[:,1,:,:] # get the second one
  x = [x1, x2]

  # prepare y
  ohe = OneHotEncoder(sparse=False) # sparse = false is super importent!!
  stacked_labels = np.stack(label_list).reshape([-1,1]) # stack and reshape label
  y = ohe.fit_transform(stacked_labels)

  # if the dataset is for the decoder return only one column of images
  if not for_encoder:
    return x, y
  
  else:
    return x1
  

def normalize_boards(boards):
  """
  Normalize a list of boards. The boards already have unit variance so we only need to zero-mean them.
  """
  
  sum = 0

  # check what is the avarage of the boards:
  for board in boards:
    sum += board.sum()
  
  avarage = sum / (len(boards) * BOARD_SIZE[0] * BOARD_SIZE[1])
  
  # normalize boards
  norm_boards = [board - avarage for board in boards]

  return norm_boards