In [0]:
import numpy as np
import matplotlib.pyplot as plt
import os

In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
# %cd "./drive/My Drive/AI/NLP2 (private)"
%cd "/content/drive/My Drive/Education/Master Data Science/Master/Semester 1/NLP2/NLP2_2 (shared)"
!ls

In [0]:
def read_file(fname):
  lines = []
  with open(fname, "r") as f:
    for line in f:
      lines.append(line[:-1])
  return np.array(lines)

def get_acc(tgt, prd):
  # accuracy measures strict equality between target and prediction
  return sum(prd == tgt)/float(len(tgt))

In [0]:
def main_results(model, run, data_folder, results_folder):
  # main task results
  results = {} #dic, so we could add more results if we want
  tgt = read_file(data_folder + "/All/test5k_all.tgt")
  prd = prd = read_file(results_folder + "/" + model + "/" + run + "/All/test5k_all.prd")
  results["acc"] = get_acc(tgt, prd)
  return results


def systematicity_results(idx, model, run, data_folder, results_folder):
  # WARNING: exc and inc naming mistakenly swapped during data generation
  # here we swap them back
  results = {"help": ["exc012", "inc012"]}
  if idx == 1:
    tgt_inc012 = read_file(data_folder + "/Systematicity/Test1/syst1_test_inc012.tgt")
    tgt_exc012 = read_file(data_folder + "/Systematicity/Test1/syst1_test_exc012.tgt")
    prd_inc012 = read_file(results_folder + "/" + model + "/" + run + "/Systematicity/Test1/syst1_test_inc012.prd")
    prd_exc012 = read_file(results_folder + "/" + model + "/" + run + "/Systematicity/Test1/syst1_test_exc012.prd")
    results["acc"] = [get_acc(tgt_inc012, prd_inc012), get_acc(tgt_exc012, prd_exc012)]
  return results


def productivity_results(idx, model, run, data_folder, results_folder):
  if idx == 1 or idx == 2:  #questions
    results = {"help": ["1tsk", "2tsk", "3tsks"]}
    acc_list = []
    for i in range(1,4):
      tgt = read_file(data_folder + f"/Productivity/Tasks/prod_test_{i}tasks.tgt")
      prd = read_file(results_folder + "/" + model + "/" + run + f"/Productivity/Tasks/Test{idx}/prod_test_{i}tasks.prd")
      acc_list.append(get_acc(tgt, prd))
    results["acc"] = acc_list
  elif idx == 3 or idx == 4 or idx == 5:  #whens
    results = {"help": ["0whn", "1whn", "2whn", "3whn", "4whn"]}
    acc_list = []
    for i in range(5):
      tgt = read_file(data_folder + f"/Productivity/Whens/prod_test_{i}whens.tgt")
      prd = read_file(results_folder + "/" + model + "/" + run + f"/Productivity/Whens/Test{idx}/prod_test_{i}whens.prd")
      acc_list.append(get_acc(tgt, prd))
    results["acc"] = acc_list
  return results


def get_results(test="main", idx=1, model="lstms2s", run="run_0", data_folder="data/Final2", results_folder="results"):
  # test = {"main", "systematicity", "productivity"} -> return task specific results
  # test = "all" -> return all results
  if test == "main":
    results = main_results(model, run, data_folder, results_folder)
  elif test == "systematicity":
    results = systematicity_results(idx, model, run, data_folder, results_folder)
  elif test == "productivity":
    results = productivity_results(idx, model, run, data_folder, results_folder)
  elif test == "all":
    # systematicity only has 1 performed test, edit the following if this changes
    results = {"main": main_results(model, run, data_folder, results_folder),
               "systematicity": {1: systematicity_results(1, model, run, data_folder, results_folder)}, 
               "productivity": {}}
    for index in range(1,6):
      results["productivity"][index] = productivity_results(index, model, run, data_folder, results_folder)
  return results

In [0]:
def mean_acc(results_acc):
  # results_acc is list of dictionaries to be avgd
  accs = []
  for result in results_acc:
    if not isinstance(result["acc"], list):
      result["acc"] = [result["acc"]]
    for i, acc in enumerate(result["acc"]):
      if len(accs) < i+1:
        accs.append([acc])
      else:
        accs[i].append(acc)
  mean = [np.mean(acc) for acc in accs]
  std = [np.std(acc) for acc in accs]
  mean = mean[0] if len(mean) == 1 else mean
  std = std[0] if len(std) == 1 else std
  return {"acc": mean, "std": std}

def avg_results(results):
  # given results of multiple runs, average the results
  # returns means and stddevs
  runs = len(results)
  avg = {"main": mean_acc([results[i]["main"] for i in range(runs)]),
         "systematicity": {1: mean_acc([results[i]["systematicity"][1] for i in range(runs)])},
         "productivity": {}}
  for j in range(1,6):
    avg["productivity"][j] = mean_acc([results[i]["productivity"][j] for i in range(runs)])
  return avg

In [0]:
lstm_all = [get_results(test="all", model="lstms2s", run=f"run_{i}") for i in range(3)]
gru_all = [get_results(test="all", model="grus2s", run=f"run_{i}") for i in range(3)]
random_all = [get_results(test="all", model="random", run=f"run_{i}") for i in range(3)]

In [0]:
lstm = avg_results(lstm_all)
gru = avg_results(gru_all)
random = avg_results(random_all)

In [0]:
barwidth = 0.4
fontsize = 12
gru_label = "GRU"
lstm_label = "LSTM"
rand_label = "Random"
rlw = 0.01
randcolor = "#421613"
x_tasks = np.arange(1, 4)
x_whens = np.arange(5)
x_syst = ["Excluded", "Included"]
anchor = [1.02, 1.03]
el = 1
cs = 3

print("Systematicity 1: trained on samples excluding person0-2, doing0-2, location0-2 from questions")
print("\nProductivity 1: trained on samples with 1 or 2 questions")
print("Productivity 2: trained on samples with 1 or 3 questions")
print("\nProductivity 3: trained on samples with 0-2 whens")
print("Productivity 4: trained on samples with 0-3 whens")
print("Productivity 5: trained on samples with 0, 2 or 4 whens\n")

plt.figure(figsize=(10,6))

plt.subplot(2, 3, 1)
plt.title("Systematicity test 1", fontsize=fontsize)
plt.bar(x_syst, lstm["systematicity"][1]["acc"], label=lstm_label, tick_label=x_syst, width=-barwidth, align="edge")
plt.errorbar([-barwidth/2,1-barwidth/2], lstm["systematicity"][1]["acc"], yerr=lstm["systematicity"][1]["std"], lw=0, capsize=cs, elinewidth=el, color="black")

plt.bar(x_syst, gru["systematicity"][1]["acc"], label=gru_label, tick_label=x_syst, width=barwidth, align="edge")
plt.errorbar([barwidth/2,1+barwidth/2], gru["systematicity"][1]["acc"], yerr=gru["systematicity"][1]["std"], lw=0, capsize=cs ,elinewidth=el, color="black")

plt.bar(x_syst[0], rlw, label=rand_label, width=2*barwidth, bottom=random["systematicity"][1]["acc"][0]-rlw, color=randcolor)
plt.bar(x_syst[1], rlw, width=2*barwidth, bottom=random["systematicity"][1]["acc"][1]-rlw, color=randcolor)
plt.legend(loc="upper right", bbox_to_anchor=anchor)
plt.ylabel("Accuracy", fontsize=fontsize)

for i in range(1, 3):
  plt.subplot(2, 3, i+1)
  plt.title(f"Productivity test {i}", fontsize=fontsize)
  plt.bar(x_tasks, lstm["productivity"][i]["acc"], label=lstm_label, tick_label=x_tasks, width=-barwidth, align="edge")
  plt.errorbar(x_tasks-barwidth/2, lstm["productivity"][i]["acc"], yerr=lstm["productivity"][i]["std"], lw=0, capsize=cs, elinewidth=el, color="black")

  plt.bar(x_tasks, gru["productivity"][i]["acc"], label=gru_label, tick_label=x_tasks, width=barwidth, align="edge")
  plt.errorbar(x_tasks+barwidth/2, gru["productivity"][i]["acc"], yerr=gru["productivity"][i]["std"], lw=0, capsize=cs, elinewidth=el, color="black")

  plt.bar(x_tasks[0], rlw, label=rand_label, width=2*barwidth, bottom=random["productivity"][i]["acc"][0]-rlw, color=randcolor)
  plt.bar(x_tasks[1], rlw, width=2*barwidth, bottom=random["productivity"][i]["acc"][1]-rlw, color=randcolor)
  plt.bar(x_tasks[2], rlw, width=2*barwidth, bottom=random["productivity"][i]["acc"][2]-rlw, color=randcolor)
  plt.legend(loc="upper right", bbox_to_anchor=anchor)
  plt.xlabel("Questions", fontsize=fontsize)
  plt.ylabel("Accuracy", fontsize=fontsize)

for i in range(3, 6):
  plt.subplot(2, 3, i+1)
  plt.title(f"Productivity test {i}", fontsize=fontsize)
  plt.bar(x_whens, lstm["productivity"][i]["acc"], label=lstm_label, tick_label=x_whens, width=-barwidth, align="edge")
  plt.errorbar(x_whens-barwidth/2, lstm["productivity"][i]["acc"], yerr=lstm["productivity"][i]["std"], lw=0, capsize=cs, elinewidth=el, color="black")

  plt.bar(x_whens, gru["productivity"][i]["acc"], label=gru_label, tick_label=x_whens, width=barwidth, align="edge")
  plt.errorbar(x_whens+barwidth/2, gru["productivity"][i]["acc"], yerr=gru["productivity"][i]["std"], lw=0, capsize=cs, elinewidth=el, color="black")

  plt.bar(x_whens[0], rlw, label=rand_label, width=2*barwidth, bottom=random["productivity"][i]["acc"][0]-rlw, color=randcolor)
  plt.bar(x_whens[1], rlw, width=2*barwidth, bottom=random["productivity"][i]["acc"][1]-rlw, color=randcolor)
  plt.bar(x_whens[2], rlw, width=2*barwidth, bottom=random["productivity"][i]["acc"][2]-rlw, color=randcolor)
  plt.bar(x_whens[3], rlw, width=2*barwidth, bottom=random["productivity"][i]["acc"][3]-rlw, color=randcolor)
  plt.bar(x_whens[4], rlw, width=2*barwidth, bottom=random["productivity"][i]["acc"][4]-rlw, color=randcolor)

  plt.legend(loc="upper right", bbox_to_anchor=anchor)
  plt.xlabel("Whens", fontsize=fontsize)
  plt.ylabel("Accuracy", fontsize=fontsize)

plt.tight_layout()
plt.savefig("test_barplots.pdf")
plt.show()

In [0]:
def print_info(test, idx):
  if test == "systematicity":
    print("{:12} {:<14}{:<14}".format("","excluded","included"))
  elif test == "productivity":
    if idx == 1:
      print("{:12} {:<14}{:<14}{:<14}".format("","1 question","2 questions","3 questions"))
    elif idx == 3:
      print("{:12} {:<14}{:<14}{:<14}{:<14}{:<14}".format("","0 whens","1 when","2 whens","3 whens","4 whens"))

  
def print_results(models, test):
  print(f"> {test}")
  
  if test == "main":
    for m in models:
      print("    {:<6} :".format(m), round(models[m]["main"]["acc"],3), round(models[m]["main"]["std"],3))
    print()
  else:
    for i in range(1,len(models["random"][test])+1):
      print_info(test, i)
      for mi, m in enumerate(models):
        accs = ""
        for j in range(len(models[m][test][i]["acc"])):
          accs += "{:<14}".format(str(round(models[m][test][i]["acc"][j],3)) + " " + str(round(models[m][test][i]["std"][j],3)))
        if mi == 0:
          print("{})  {:<6} :".format(i, m), accs)
        else:
          print("    {:<6} :".format(m), accs)
      print()

In [0]:
models = {"lstm":lstm, "gru":gru, "random":random}

print_results(models, "main")
print_results(models, "systematicity")
print_results(models, "productivity")

In [0]:
print("zoom-in of the bottom three bar plots a few cells ago")
plt.title("Productivity tests 3-5", fontsize=fontsize)
plots = []
for i in range(3, 6):
  p = plt.plot(x_whens, lstm["productivity"][i]["acc"], label=lstm_label+ " " + str(i))
  plots.append(p)
plt.plot(x_whens, random["productivity"][3]["acc"],  label=rand_label, color=randcolor)
for i in range(3, 6):
  plt.plot(x_whens, gru["productivity"][i]["acc"], "--",  label=gru_label+ " " + str(i), color=plots[i-3][0].get_color())

plt.legend(ncol=2)
plt.xticks(x_whens, np.arange(5))
plt.xlabel("Whens", fontsize=fontsize)
plt.ylabel("Accuracy", fontsize=fontsize)

plt.ylim(None, 0.55)
plt.xlim(1, None)
plt.show()

In [0]:
def remove_tasks(split):
  # splits sample into questions and sentence
  www = ["who", "what", "where"]
  i = 0
  tasks = []
  while split[i] in www:
    tasks.append([split[i], split[i+1]])
    i += 2
  return np.array(split[i:]), np.array(tasks)

In [0]:
def get_fnames(test="main", idx=1, model="lstms2s", run="run_0", data_folder="data/Final2", results_folder="results"):
  # returns src, tgt and prd paths
  if test == "main":
    fsrc = data_folder+"/All/test5k_all.src"
    ftgt = data_folder+"/All/test5k_all.tgt"
    fprd = f"{results_folder}/{model}/{run}/All/test5k_all.prd"
  elif test == "systematicity":
    fsrc = [data_folder+f"/Systematicity/Test1/syst1_test_{c}012.src" for c in ["inc", "exc"]]
    ftgt = [data_folder+f"/Systematicity/Test1/syst1_test_{c}012.tgt" for c in ["inc", "exc"]]
    fprd = [f"{results_folder}/{model}/{run}/Systematicity/Test1/syst1_test_{c}012.prd" for c in ["inc", "exc"]]
  elif test == "productivity":
    print("Working on it!")
    return
  return fsrc, ftgt, fprd

In [0]:
#split systematicity results
#sorry for the chaotic code
syst_splits = {}
for model in ["lstms2s", "grus2s", "random"]:
  correct_split = {"person":[], "doing":[], "location":[]}
  for run in ["run_0", "run_1", "run_2"]:
    results = {"fs":{}}
    results["fs"]["src"], results["fs"]["tgt"], results["fs"]["prd"] = get_fnames(test="systematicity", model=model, run=run)
    for f in results["fs"]:
      results[f] = [read_file(fname) for fname in results["fs"][f]]
    results["correct"] = [results["tgt"][i] == results["prd"][i] for i in range(len(results["src"]))]
    syst_inc = {"src":results["src"][1], "tgt":results["tgt"][1], "prd":results["prd"][1], "correct":results["correct"][1]}
    person, doing, location = np.zeros(len(syst_inc["src"])), np.zeros(len(syst_inc["src"])), np.zeros(len(syst_inc["src"]))
    for i, sample in enumerate(syst_inc["src"]):
      _, tasks = remove_tasks(sample.split())
      for q, w in tasks:
        if w in ["person0","person1","person2"]:
          person[i] = 1
        elif w in ["doing0", "doing1", "doing2"]:
          doing[i] = 1
        elif w in ["location0", "location1", "location2"]:
          location[i] = 1
    correct_split["person"].append(sum(syst_inc["correct"]*person/sum(person)))
    correct_split["doing"].append(sum(syst_inc["correct"]*doing)/sum(doing))
    correct_split["location"].append(sum(syst_inc["correct"]*location/sum(location)))
  for key in correct_split:
    correct_split[key] = {"acc": np.mean(correct_split[key]), "std": np.std(correct_split[key])}
  syst_splits[model] = correct_split

In [0]:
plt.figure(figsize=(4.1,3.1))
for model in ["lstms2s", "grus2s", "random"]:
  x = [split+"0-2" for split in syst_splits[model]]
  acc = [syst_splits[model][split]["acc"] for split in syst_splits[model]]
  std = [syst_splits[model][split]["std"] for split in syst_splits[model]]
  if model == "lstms2s":
    plt.errorbar(x, acc, std, capsize=4, label=lstm_label)
  elif model == "grus2s":
    plt.errorbar(x, acc, std, capsize=4, label=gru_label)
  elif model == "random":
    plt.errorbar(x, acc, std, capsize=4, label=rand_label, color=randcolor)
  
plt.title("Systematicity test split", fontsize=fontsize)
plt.legend(loc="lower left", bbox_to_anchor=[-0.02,-0.02])
plt.xlim(-0.3,2.3)

plt.xlabel("Included in questions", fontsize=fontsize)
plt.ylabel("Accuracy", fontsize=fontsize)
plt.tight_layout()
plt.savefig("syst_split_plot.pdf")
plt.show()

# use lines to reconize accuracy differences better
# location is harder due data structure

In [0]:
def get_mask(split):
  # masks subsentences with unique indices
  mask = -2*np.ones(len(split))
  marker = 0
  for i, t in enumerate(split):
    if t == "when":
      mask[i] = -1
      marker += 1
    else:
      mask[i] = marker
    while sum(mask == marker) == 5:
      if i < len(split)-1 and split[i+1] == "when":
        break
      else:
        marker -= 1
  return mask

def mark_www(split):
  # marks where in the sentence person, doing, or location is present
  www = []
  for t in split:
    if t[:2] == "pe":
      www.append("who")
    elif t[:2] == "do":
      www.append("what")
    elif t[:2] == "lo":
      www.append("where")
    else:
      www.append("")
  return np.array(www)

def get_maxdist(data):
  # return list of maximum search distances for all data
  # the larger the search distance, naturally you'd expect the sample to be harder
  max_dist = []
  # for each sample
  for seq in data:
    sentence, tasks = remove_tasks(seq.split())
    ids = np.arange(len(sentence))
    mask = get_mask(sentence) #mask subsentences
    www = mark_www(sentence)  #mask who what where positions

    max_d = 0
    # for each question
    for q, w in tasks:
      # select all subsentences from which to look for answers
      sub_ids = mask[sentence==w]
      # loop through those subsentences
      for s in sub_ids:
        s_ids = ids[mask==s]  #track indices in sentence
        s_www = www[mask==s]  #track www relations in subsentences 
        s_sent = sentence[mask==s] #filter subsentence from sentence
        w1_idx = s_ids[s_www==q]  # index first part of answer
        w2_idx = s_ids[s_sent==w] # index second part of answer
        s_dist = np.abs(w1_idx - w2_idx)[0]
        if s_dist > max_d:
          max_d = s_dist
    max_dist.append(max_d)
  return np.array(max_dist)

In [0]:
# investigate accuracies over search distances
# first determine search distance for each sample
# then count number of samples for each search distance
# then count how many samples were correct for each search distance per model

src_main = read_file("data/Final2/All/test5k_all.src")
tgt_main = read_file("data/Final2/All/test5k_all.tgt")

dist = get_maxdist(src_main)
dist_range = [i for i in range(np.max(dist)+1) if sum(dist==i) > 0]
dist_count = []
for i in dist_range:
  count = sum(dist==i)
  dist_count.append(count)

dist_splits = {}
for model in ["lstms2s", "grus2s", "random"]:
  model_accs = []
  for run in ["run_0", "run_1", "run_2"]:
    prd_main = read_file(f"results/{model}/{run}/All/test5k_all.prd")
    correct_main = tgt_main==prd_main

    dist_acc = []
    for idx, i in enumerate(dist_range):
      count = dist_count[idx]
      dist_acc.append(sum(correct_main[dist==i])/count)
    model_accs.append(dist_acc)
  dist_splits[model] = {"acc": np.mean(model_accs, 0), "std": np.std(model_accs, 0)}

In [0]:
plt.figure(figsize=(4.1,3.1))

plt.title("Accuracy as function of maximum search distance", fontsize=fontsize)

for model in dist_splits:
  if model == "lstms2s":
    # p = plt.plot(dist_range, dist_splits[model]["acc"], label=lstm_label)
    p = plt.errorbar(dist_range, dist_splits[model]["acc"], dist_splits[model]["std"], label=lstm_label)
  elif model == "grus2s":
    # p = plt.plot(dist_range, dist_splits[model]["acc"], label=gru_label)
    p = plt.errorbar(dist_range, dist_splits[model]["acc"], dist_splits[model]["std"], label=gru_label)
  if model == "random":
    # p = plt.plot(dist_range, dist_splits[model]["acc"], label=rand_label, color=randcolor)
    p = plt.errorbar(dist_range, dist_splits[model]["acc"], dist_splits[model]["std"], label=rand_label, color=randcolor)
  plt.scatter(dist_range, dist_splits[model]["acc"], color=p[0].get_color(), s=13)

plt.bar(0,0, color="gray", alpha=0.3, label="Samples")
plt.legend()
plt.xlabel("Search distance", fontsize=fontsize)
plt.ylabel("Accuracy", fontsize=fontsize)

ax = plt.twinx()
plt.bar(dist_range, dist_count, alpha=0.3, color="gray")
plt.ylabel("Samples", fontsize=fontsize)
plt.yscale("log")
# ax.yaxis.label.set_color("#888888")

plt.tight_layout()
plt.savefig("dist_plot.pdf")
plt.show()