In [None]:
from sketch.examples.prompt_machine import *
PM_SETTINGS["VERBOSE"] = False

In [None]:
# Test prompt works
prompt = asyncGPT3Prompt("whitespace_iterations", """{{ head }}{{ body }}""", temperature=0.0)
await prompt("      ", "What is a joke that invovles a hippo?")

In [None]:
import random
import itertools
from itertools import product



async def get_n_samples(N, whitespace_char_N, samples=1, body="What is a joke that invovles a hippo?", request_horizontal_limit=10, temperature=0.0, seed="testing-potato"):
    prompt = asyncGPT3Prompt("whitespace_iterations", """{{ head }}{{ body }}""", temperature=temperature)
    
    def get_random_whitespace_sequence(whitespace_char_N):
        results = []
        for _ in range(0, whitespace_char_N):
            r = random.random()
            results.append(" " if r < 0.33333 else ("\t" if r < 0.66667 else "\n"))
        return "".join(results)
    
    def get_batch(whitespace_char_N, seen=None):
        seen = seen or set()
        if  (3**(whitespace_char_N) - len(seen)) < request_horizontal_limit:
            all_possible = set(["".join(x) for x in product([" ", "\t", "\n"], repeat=whitespace_char_N)])
            headers = list(all_possible - seen)
        else:
            headers = set()
            while len(headers) < request_horizontal_limit:
                header = get_random_whitespace_sequence(whitespace_char_N)
                if header not in seen:
                    headers.add(header)
                    seen.add(header)
            headers = list(headers)
        print(len(headers))

        assert len(headers) > 0, "Something went wrong"
        
        async def get_wrapped(header):
            return (header, await prompt(head=header, body=body))
    
        results = asyncio.gather(*[get_wrapped(header) for header in headers])
        return results
    
    samp = []
    for i in range(samples):
        random.seed(seed)
        results = []
        while (len(results) < N) and (len(results) < 3**whitespace_char_N):
            print(whitespace_char_N)
            new_results = await get_batch(whitespace_char_N, seen=set([x[0] for x in results]))
            for j, (header, result) in enumerate(new_results):
                print(f"==== {len(results)+j:05d}/~{N:05d}\nHeader:{repr(header)}\nResult:\n{repr(result)}\n====")
                results.append((header, result))
        samp.append(results)
    return samp


In [None]:
results = []
for i in [0, 1, 2, 3, 4, 8, 12, 16, 32, 64, 128, 256, 512, 1024, 2048]:
    results.append(await get_n_samples(40, i, samples=1))

In [None]:
for run in results:
    unique_inputs = set([header for header, _ in run[0]])
    unique_outputs = set([result for _, result in run[0]])
    avg_whitespace = sum([len(header) for header, output in run[0]]) / len(run[0])
    hippo_cross = sum([1 for header, output in run[0] if "Why did the hippo cross the road?" in output])
    empty_headers = sum([1 for header, output in run[0] if header == ""])
    print({"avg_whitespace": avg_whitespace, "hippo_cross": hippo_cross, "empty_prompt": empty_headers, "total": len(run[0]), "unique_inputs": len(unique_inputs), "unique_outputs": len(unique_outputs)})

In [None]:
from collections import defaultdict


def get_unique_counts(rr, input_filter = lambda *_: True, output_filter = lambda *_: True):
    unique_inputs = set([header for header, _ in rr if input_filter(header)])
    unique_outputs = set([result.strip() for _, result in rr if output_filter(result)])
    return (len(unique_inputs), len(unique_outputs))

def get_unique_without_hippo(rr, input_filter = lambda*_: True, output_filter= lambda *_: True):
    return get_unique_counts(rr, input_filter=input_filter, output_filter=output_filter)

def get_unique_output_without_hippo(rr, input_filter = lambda*_: True, output_filter= lambda *_: True):
    return get_unique_without_hippo(rr, input_filter=input_filter, output_filter=output_filter)[1]


def get_stacks(x, y, levels=20):
    y_max = max(y)
    # assume y_min = 0
    # assume x is sorted.
    stacks = defaultdict(dict)
    for x_i, y_i in zip(x, y):
        for i in range(levels):
            if y_i > (i / levels) * y_max:
                stacks[i][x_i] = "*"
            else:
                stacks[i][x_i] = " "
    return stacks

def print_stacks(xvals, yvals, title="", x_axis_label=""):
    x_labels = " "*10 + "  ".join([f"{yval:>07.2f}" for yval in xvals])
    if title:
        left_pad = (len(x_labels) - len(title))//2
        print("="*len(x_labels))
        print(" "*left_pad + title)
        print("="*len(x_labels))
    stacks = get_stacks(xvals, yvals)
    max_yval = max(yvals)
    for res in reversed(stacks):
        print(f"{res*max_yval/len(stacks):>9.4f}", end="")
        for aws in stacks[res]:
            print(f" {stacks[res][aws] * 7} ", end="")
        print()
    print(x_labels)
    if x_axis_label:
        left_pad = (len(x_labels) - len(x_axis_label))//2
        print(" "*left_pad + x_axis_label)


output_filter = lambda output: "Why did the hippo cross the road?" not in output
xvals = [sum([len(header) for header, output in run[0]]) / len(run[0]) for run in results]
yvals = [get_unique_output_without_hippo(run[0], output_filter=output_filter) for run in results]
print_stacks(xvals, yvals, title="Number of unique output.strip() continuations generated (T = 0 K)", x_axis_label="Number of leading space whitespace characters {`\\n`, `\\t`, ` `}")

xvals = [sum([len(header) for header, output in run[0]]) / len(run[0]) for run in results]
yvals = [get_unique_without_hippo(run[0], output_filter=output_filter)[1] / get_unique_without_hippo(run[0], output_filter=output_filter)[0] for run in results]
print_stacks(xvals, yvals, title="Ratio of output.strip() in this size that are novel (unique out / unique in) (T = 0 K)", x_axis_label="Number of leading space whitespace characters {`\\n`, `\\t`, ` `}")

In [None]:
seen = set()
for run in results:
    new_responses = set([x.strip() for x in set(y[1] for y in run[0]) if output_filter(x)])
    print(len(run[0][0][0]), len(new_responses))
    print("\n----\n".join(new_responses))
    seen |= new_responses

In [None]:
# jupyter nbconvert --to html --execute random_whitespace_sampling.ipynb
# change the file `random_whitespace_sampling.html` to `random_whitespace_sampling_{date}.html` and commit it to the repo
! jupyter nbconvert --to html random_whitespace_sampling_2.ipynb && mv random_whitespace_sampling_2.html random_whitespace_sampling_2_$(date -u +"%Y-%m-%dT%H:%M:%SZ").html