In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
import numpy as np
import pickle

In [None]:
from tqdm import tqdm
tqdm.pandas()

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import classification_report

In [None]:
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
dev_papers = pd.read_hdf('/content/drive/MyDrive/master_thesis/baselines/tf-idf_baseline_ga/data/embeddings/dev_papers.h5', 'df')
dev_cases_data = pd.read_hdf('/content/drive/MyDrive/master_thesis/baselines/tf-idf_baseline_ga/data/embeddings/dev_cases_data.h5', 'df')

In [None]:
def get_case_embedding(df, case_id):
  return df[df['CaseID'] == case_id].iloc[0]['embedding']

In [None]:
def get_similarity_with_case_descriptions(df, embedding, case_id):
  case_embedding = get_case_embedding(df, case_id)
  return cosine_similarity(case_embedding, embedding).flatten()[0]

In [None]:
dev_papers['case_embedding_similarity'] = dev_papers.progress_apply(lambda x: get_similarity_with_case_descriptions(dev_cases_data, x.embedding, x.case_id), axis=1)

In [None]:
def get_optimal_threshold(precision, recall, thresholds):
    fscore = (2 * precision * recall) / (precision + recall)
    # locate the index of the largest f score
    ix = np.argmax(fscore)
    optimal_threshold = thresholds[ix]
    print('Best Threshold=%f, F-Score=%.3f' % (optimal_threshold, fscore[ix]))
    return optimal_threshold

In [None]:
def convert_y_to_label(y):
  labels = []
  for item in y:
    if item:
      labels.append('associated_paper')
    else:
      labels.append('irrelevant_paper')
  return labels

In [None]:
def create_checkpoint_wrapper(n_bits, n_iter, n_pop, r_cross, r_mut, path):
  checkpoint = {
      'n_bits': n_bits,
      'n_iter': n_iter,
      'n_pop': n_pop,
      'r_cross': r_cross,
      'r_mut': r_mut,
  }

  def create_checkpoint(best, best_score, current_iter, pop):
    run_info = {
        'best_mask': best,
        'best_score': best_score,
        'pop': pop,
        'current_iter': current_iter
    }

    checkpoint.update(run_info)
    print('Creating checkpoint!')
    print(checkpoint)
    with open(path, 'wb') as fp:
      pickle.dump(checkpoint, fp)
    print('Checkpoint created!')
  return create_checkpoint

def get_checkpoint(n_bits, n_iter, n_pop, r_cross, r_mut, path):
  try:
    with open(path, 'rb') as fp:
      state = pickle.load(fp)
      print('Checkpoint loaded!')
      print(state)
      return state
  except FileNotFoundError:
    pop = [randint(0, 2, n_bits).tolist() for _ in range(n_pop)]
    return {
      'n_bits': n_bits,
      'n_iter': n_iter,
      'n_pop': n_pop,
      'r_cross': r_cross,
      'r_mut': r_mut,
      'best_mask': pop[0],
      'best_score': -1000,
      'current_iter': 0,
      'pop': pop
    }

In [None]:
def save_best_mask(mask, score, path):
  with open(path, 'wb') as fp:
    np.savez(fp, mask=np.array(mask), score=score)
    print('Best mask saved!')

In [None]:
n_iter = 100
# bits per variable
n_bits = 100
# define the population size
n_pop = 10
# crossover rate
r_cross = 0.9
# mutation rate
r_mut = 5.0 / float(n_bits)

In [None]:
# genetic algorithm search for continuous function optimization
from numpy.random import randint
from numpy.random import rand
 
# objective function
def objective(mask):
  dev_papers['embedding_masked'] = dev_papers['embedding'].apply(lambda x: x.multiply(mask))
  dev_papers['case_embedding_similarity_masked'] = dev_papers.apply(lambda x: get_similarity_with_case_descriptions(dev_cases_data, x.embedding_masked, x.case_id), axis=1)
  precision, recall, thresholds = metrics.precision_recall_curve(dev_papers['type'] == 'associated_paper', dev_papers['case_embedding_similarity_masked'])
  optimal_threshold = get_optimal_threshold(precision, recall, thresholds)
  print(f"optimal threshold {optimal_threshold}")
  dev_papers['pred_is_associated_masked'] = dev_papers['case_embedding_similarity_masked'].apply(lambda x: x >= optimal_threshold)
  y_target = convert_y_to_label(dev_papers['type'] == 'associated_paper')
  y_pred = convert_y_to_label(dev_papers['pred_is_associated_masked'])
  f1_score = metrics.f1_score(y_target, y_pred, average='macro')
  return f1_score
 
# tournament selection
def selection(pop, scores, k=3):
	# first random selection
	selection_ix = randint(len(pop))
	for ix in randint(0, len(pop), k-1):
		# check if better (e.g. perform a tournament)
		if scores[ix] > scores[selection_ix]:
			selection_ix = ix
	return pop[selection_ix]
 
# crossover two parents to create two children
def crossover(p1, p2, r_cross):
	# children are copies of parents by default
	c1, c2 = p1.copy(), p2.copy()
	# check for recombination
	if rand() < r_cross:
		# select crossover point that is not on the end of the string
		pt = randint(1, len(p1)-2)
		# perform crossover
		c1 = p1[:pt] + p2[pt:]
		c2 = p2[:pt] + p1[pt:]
	return [c1, c2]
 
# mutation operator
def mutation(bitstring, r_mut):
	for i in range(len(bitstring)):
		# check for a mutation
		if rand() < r_mut:
			# flip the bit
			bitstring[i] = 1 - bitstring[i]
 
# genetic algorithm
def genetic_algorithm(objective, checkpoint, make_checkpoint, best_mask_path):
	n_iter = checkpoint['n_iter']
	r_cross = checkpoint['r_cross']
	r_mut = checkpoint['r_mut']
	pop = checkpoint['pop']
	best = checkpoint['best_mask']
	best_eval = checkpoint['best_score']

	n_pop = len(pop)
	# enumerate generations
	for gen in range(checkpoint['current_iter'], n_iter):
		print(f"GEN: {gen}")
		# evaluate all candidates in the population
		scores = [objective(p) for p in pop]
		# check for new best solution
		for i in range(n_pop):
			if scores[i] > best_eval:
				best, best_eval = pop[i], scores[i]
				print(">%d, new best f(%s)" % (gen, scores[i]))
		# select parents
		selected = [selection(pop, scores) for _ in range(n_pop)]
		# create the next generation
		children = list()
		for i in range(0, n_pop, 2):
			# get selected parents in pairs
			p1, p2 = selected[i], selected[i+1]
			# crossover and mutation
			for c in crossover(p1, p2, r_cross):
				# mutation
				mutation(c, r_mut)
				# store for next generation
				children.append(c)
		# replace population
		pop = children
		# save best mask
		save_best_mask(best, best_eval, best_mask_path)
		# save checkpoint
		make_checkpoint(best, best_eval, gen + 1, pop)
	return best, best_eval
 

In [None]:
def run_ga():
  # define the total iterations
  n_iter = 100
  # bits per variable
  n_bits = 100
  # define the population size
  n_pop = 10
  # crossover rate
  r_cross = 0.9
  # mutation rate
  r_mut = 5.0 / float(n_bits)
  checkpoint_path = 'checkpoint.pkl'
  checkpoint_state = get_checkpoint(n_bits, n_iter, n_pop, r_cross, r_mut, checkpoint_path)
  make_checkpoint = create_checkpoint_wrapper(n_bits, n_iter, n_pop, r_cross, r_mut, checkpoint_path)
  best_mask_path = 'best_mask.npz'
  # perform the genetic algorithm search
  best, score = genetic_algorithm(objective, checkpoint_state, make_checkpoint, best_mask_path)
  print('Done!')
  print(best, score)

In [None]:
run_ga()