In [0]:
!pip install -q gpt_2_simple
!mkdir /content/results
!mkdir /content/fake
import gpt_2_simple as gpt2
from datetime import datetime
from google.colab import files
import json
import numpy as np
import random
from os import listdir
from os.path import isfile, join, splitext
import tensorflow as tf
import nltk
import nltk.sentiment.vader as vader
import shutil

In [0]:
nltk.download('vader_lexicon')

In [0]:
gpt2.download_gpt2()

In [0]:
gpt2.mount_gdrive()
gpt2.copy_checkpoint_from_gdrive()

In [0]:
sid = vader.SentimentIntensityAnalyzer()

In [0]:
def sample_random(sess, prefix, temp=0.7, top_k=0):
  return "".join(gpt2.generate(sess, 
                               return_as_list=True,
                               length=100, 
                               temperature=temp, 
                               prefix=prefix, 
                               top_k=top_k, 
                               include_prefix=False)
                )[len(prefix):]

def sample_sentiment(sess, prefix, sentiment_vec, max_iter=15, thresh=0):
  """ 
  Sentiment based rejection sampling
  returns the best sample
  Sample either hundred times or when we do better than threshold distance
  note thresh is between [0,2]. It's an early cutoff
  """
  best_score = 2
  best_result = None
  for i in range(max_iter):
    sample = sample_random(sess, prefix, random.random())
    if not sample:
      continue
    ss = sid.polarity_scores(sample)
    curr_sent = np.array([ss["compound"], ss["neg"], ss["neu"], ss["pos"]])
    dist = np.linalg.norm(sentiment_vec-curr_sent)
    if (dist < thresh):
      return sample
    if dist < best_score:
      best_result = sample
      best_score = dist
  return best_result

In [0]:
def test_single(sid, train_file, test_instance, save_dest):
  """
  Takes the single testing JSON file as a dict.
  Runs sampling on all 3, saves results to dest dir, each training example to
  its own file
  """
  tf.reset_default_graph()
  sess = gpt2.start_tf_sess()
#   shutil.copyfile('drive/My Drive/checkpoint/run1/checkpoint', 'checkpoint/run1/checkpoint')
  
  gpt2.finetune(sess,
                dataset=train_file,
                steps=2,
                print_every=5,
                sample_every=1000,
                sample_length=1,
                restore_from='fresh',
                save_every=30,
                run_name="fake/"
                ) # don't want to save
  print('finetune done')
  results = []
  for i in range(1,4):
    inp = test_instance["test{}_input".format(i)]
    tru = test_instance["test{}_true".format(i)]
    id_ = test_instance["test{}_id".format(i)]
    inp = "".join(inp.split("\n")[-4:])
    # Generate random sample
    samp_rand = sample_random(sess, inp)
    # Generate sentiment sample
    ss = sid.polarity_scores(tru)
    sentiment_vec = np.array([ss["compound"], ss["neg"], ss["neu"], ss["pos"]])
    samp_sent = sample_sentiment(sess, inp, sentiment_vec)
    sample = {"id": id_,
              "prefix": inp,
              "truth": tru,
              "random": samp_rand,
              "sentiment": samp_sent
             }
    print("Sample: {}".format(sample))
    results.append(sample)
  with open(save_dest, "w") as f:
    json.dump(results, f)
  print('finished test single')

def run_tests(test_dir, train_dir):
  # Load model from checkpoint
  files = sorted(listdir(train_dir))[3::4] #change to your number
  for f in files:
    fname = splitext(f)[0]
    train = join(train_dir, f)
    test = join(test_dir, fname+".json")
    if isfile(train) and isfile(test):
      with open(test, "r") as test_file:
        test_instance = json.load(test_file)
        print(f'Testing {fname}')
        test_single(sid, train, test_instance, "/content/results/{}_sample.json".format(f))

In [0]:
run_tests('/content/drive/My Drive/test_cases/', '/content/drive/My Drive/training_cases/')