In [None]:
import os, sys
import numpy as np
import matplotlib.pyplot as plt
import pickle

import tensorflow as tf
from tensorflow.keras.models import load_model

In [None]:
# need to mount drive
workspace = '/content/drive/MyDrive/Colab Notebooks/Text2Struct/rnn/' 
print(os.path.exists(workspace))

True


In [None]:
## Load data
current_data_dir = workspace + 'current_data/'

filehandler = open(current_data_dir + "index_word.pkl", "rb")
index_word = pickle.load(filehandler)
filehandler.close()

filehandler = open(current_data_dir + "x_test.pkl", "rb")
x_test = pickle.load(filehandler)   
filehandler.close()

filehandler = open(current_data_dir + "y_test.pkl", "rb")
y_test = pickle.load(filehandler) 
filehandler.close()

filehandler = open(current_data_dir + "nl_test.pkl", "rb")
nl_test = pickle.load(filehandler)  
filehandler.close()

x_test.shape, y_test.shape

((176, 50), (176, 50))

In [None]:
## Load model
model = load_model(workspace + 'current_model.h5')
model.summary()
# tf.keras.utils.plot_model(model)

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_1 (Embedding)     (None, None, 128)         156672    
                                                                 
 bidirectional_2 (Bidirectio  (None, 256)              198144    
 nal)                                                            
                                                                 
 repeat_vector_1 (RepeatVect  (None, 50, 256)          0         
 or)                                                             
                                                                 
 bidirectional_3 (Bidirectio  (None, 50, 256)          296448    
 nal)                                                            
                                                                 
 time_distributed_2 (TimeDis  (None, 50, 512)          131584    
 tributed)                                            

In [None]:
def soft_dice(y_true, y_pred, 
              axis=(0, 1, 2), # Note: contract 3 axes for 3D image 
              epsilon=0.00001):
  """
  Compute mean soft dice loss over all abnormality classes.
  
Args:
  
y_true (Tensorflow tensor): tensor of ground truth values for all classes.
            shape: (x_dim, y_dim, z_dim, num_classes)
  y_pred (Tensorflow tensor): tensor of predictions for all classes.
            shape: (x_dim, y_dim, z_dim, num_classes) 
  axis (tuple): spatial axes to sum over when computing numerator and
            denominator in formula for dice loss.
  epsilon (float): small constant added to numerator and denominator to
            avoid divide by 0 errors.
  Returns:
  dice_loss (float): computed value of dice loss. 
  """
  dice_numerator = 2 * tf.math.reduce_sum(y_true*y_pred, axis=axis) + epsilon
  dice_denominator = tf.math.reduce_sum(y_true**2, axis=axis) + tf.math.reduce_sum(y_pred**2, axis=axis) + epsilon
  dice = tf.math.reduce_mean(dice_numerator/dice_denominator)
  return dice

In [None]:
n = np.random.choice(len(x_test))

y_true = tf.one_hot(y_test[n][None, ...], 3)
y_true.shape

y_pred = model(x_test[n][None, ...])
y_pred.shape

soft_dice(y_true, y_pred, axis=(0,1))

<tf.Tensor: shape=(), dtype=float32, numpy=0.66962415>

In [None]:
y_true = tf.one_hot(y_test, 3)
y_true.shape

y_pred = model(x_test)
y_pred.shape

soft_dice(y_true, y_pred, axis=(0,1))

<tf.Tensor: shape=(), dtype=float32, numpy=0.81615335>

In [None]:
def contract_numeral(chars):
  string = ''
  for c in chars:
    if c == '[neg]':
      string += '-'
    elif c == '[dot]':
      string += '.'
    else:
      string += c
  return string

In [None]:
def proc_num_in_text(text, nl):
  num_idx = np.array([*range(len(text))])[nl == 1]
  text_before = text[:num_idx[0]]
  text_after = text[num_idx[-1]+1:]
  numeral = contract_numeral(text[nl == 1])
  return list(text_before) + [numeral] + list(text_after)

In [None]:
def contract_words(words):
  text = ''
  for w in words:
    text += ' ' + w
  return text[1:]

In [None]:
## Test using the test set (that was not used in training)
n = np.random.choice(len(x_test))   # randonly select an example 

seq = x_test[n][x_test[n] != 0]   # sequence
nl = nl_test[n][x_test[n] != 0]   # numeral label
el = y_test[n][x_test[n] != 0]    # ground-truth entity label (1-unit, 2-target metric)
# print(el)

el_pred = model(x_test[n][None, ...]) # predicted entity label
el_pred = tf.argmax(el_pred, axis=-1)
el_pred = el_pred[0][x_test[n] != 0]
el_pred = el_pred.numpy()
# print(el_pred)

text = np.array([index_word[i] for i in seq])
print('Text #{} is:'.format(n), contract_words(proc_num_in_text(text, nl)))

num = text[nl == 1]
print('Num is:', contract_numeral(num))

print('---Ground-Truth---')
unit = text[el == 1]
print('Unit is:', contract_words(unit))
targ = text[el == 2]
print('Targ is:', contract_words(targ))

print('---Prediction---')
unit_pred = text[el_pred == 1]
print('Unit is:', contract_words(unit_pred))
targ_pred = text[el_pred == 2]
print('Targ is:', contract_words(targ_pred))

Text #154 is: CI [num] to [num] ; per-protocol risk difference -0.062 , two-sided [num] CI [num]
Num is: -0.062
---Ground-Truth---
Unit is: 
Targ is: per-protocol risk difference
---Prediction---
Unit is: 
Targ is: per-protocol risk


In [None]:
text

array(['CI', '[num]', 'to', '[num]', ';', 'per-protocol', 'risk',
       'difference', '[neg]', '0', '[dot]', '0', '6', '2', ',',
       'two-sided', '[num]', 'CI', '[num]'], dtype='<U12')