In [None]:
!pip install transformers
from transformers import pipeline

size="800m"
model_sizes = ["70m",'160m','410m','1b','1.4b','2.8b','6.9b','12b']
model_name_template = "EleutherAI/pythia-{size}-deduped"
model_name = model_name_template.format(size=size)
generator = pipeline('text-generation', model=model_name,device=0) #device=0 = GPU; device=-1 = CPU



Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Downloading (…)lve/main/config.json:   0%|          | 0.00/569 [00:00<?, ?B/s]

Downloading (…)"pytorch_model.bin";:   0%|          | 0.00/2.09G [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/394 [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/2.11M [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/99.0 [00:00<?, ?B/s]

Basic GA

In [None]:
from tensorflow.python.ops import gen_batch_ops
import random

#make a random initial population of letters -- e.g. one genome is like "ADGCB"
def create_population(size=20,glength=7,gen_set='ABCDEFGH'):
  pop = []
  for k in range(size):
    candidate = ""
    for i in range(glength):
      rand = random.choice(gen_set)
      candidate+=rand
    pop.append(candidate)
  return pop


#make simple "crossover" prompt composed of random genomes from pop
def create_crossover_prompt(examples):
  prompt = ""
  for candidate in examples:
    #candidate = ' '.join([k for k in candidate])
    prompt+=candidate+"\n"
  #prompt+="G: "
  return prompt

#do "crossover" by calling model
def do_crossover(pop,examples=5,temp=1.0,chosen_examples=None,prompt_generator=create_crossover_prompt):
  if chosen_examples==None:
    chosen_examples = np.random.choice(pop,examples,replace=False)

  prompt = prompt_generator(chosen_examples)
  #prompt = create_directed_crossover_prompt(pop,examples=examples)

  #print(prompt)
  model_output = generator(prompt,
                         do_sample=True,
                         max_length=150,
                         #temperature=temp,
                         top_p=0.8,
                         top_k=30,
                         batch_size=1, #JOELNOTE -- does this mess anything up?
                         #penalty_alpha=0.4, top_k=10,
                         return_full_text=False)
  return model_output[0]['generated_text']

#process output of model into potential offspring
def process_output(output,take_offspring=3,verbose=True):
  candidates = []
  genomes = output.split("\n")[:-1]
  for genome in genomes[:take_offspring]:
    genome = genome.replace(' ','')
    if verbose: print(genome)
    try:
      #genome = genome.split("G:")[1]
      if len(genome)!=glength:
        if verbose: print(genome,'wrong length')
        continue
      if verbose: print(genome)
      arr = sum([k not in gen_set for k in genome])
      if arr!=0:
        if verbose: print(genome,'bad element')
        continue

      candidates.append(genome)
    except:
      if verbose: print(genome,"failed")
  return list(set(candidates))


# Crossover explorations

In [None]:
import pandas as pd
import numpy as np
import random
offspring = []
offspring_valid = []

n_examples = 3
number_to_take = 3

binary_digits = []

for n in range(0,64):
  num = str(bin(n))[2:]
  while len(num)<6:
    num= "0"+num
  binary_digits.append(num)

def is_valid(candidate,target_len=6):
  if len(candidate)!=target_len:
    return False
  for k in candidate:
    if k not in ['0','1']:
      return False
  return True

def hamming_distance(chaine1, chaine2):
    return sum(c1 != c2 for c1, c2 in zip(chaine1, chaine2))
  
def calc_crossover_stats(n_examples,inputs=binary_digits,trials=20):
  chosen_examples=list(np.random.choice(inputs,n_examples))

  generated = set()
  offspring = []
  offspring_valid = []
  duplicates = 0
  valid = 0
  total_cnt = 0
  for k in range(trials):
    random.shuffle(chosen_examples)
    out = do_crossover([],chosen_examples=chosen_examples,prompt_generator=create_crossover_prompt)
    res = out.split("\n")[:number_to_take]

    for ind in res:
      if is_valid(ind):
        valid+=1
        if ind in chosen_examples:
          duplicates+=1
        else:
          generated.add(ind)
          offspring_valid+=[ind]
      total_cnt+=1
    offspring += res


    print(res)
    offspring+=res
  return valid,duplicates,total_cnt,generated,offspring_valid,offspring


# sweep across # parents
if False:
  res_dict = {}
  for k in range(1,6):
    print(k)
    res_dict[k] = [calc_crossover_stats(k) for _ in range(5)]

# heritability experiments
if False:
  ones_neighborhood=['111110','111101','111011','110111','101111','011111']
  zeros_neighborhood=['000001','000010','000100','001000','010000','100000']
  if True:
    ones_stats_runs = [calc_crossover_stats(3,ones_neighborhood) for _ in range(10)]
    zeros_stats_runs = [calc_crossover_stats(3,zeros_neighborhood) for _ in range(10)]


In [None]:
# model sweep experiments

import pandas as pd
import numpy as np
import random
offspring = []
offspring_valid = []

n_examples = 3
number_to_take = 3

binary_digits = []

for n in range(0,64):
  num = str(bin(n))[2:]
  while len(num)<6:
    num= "0"+num
  binary_digits.append(num)

def is_valid(candidate,target_len=9):
  if len(candidate)!=target_len:
    return False
  for k in candidate:
    if k not in ['0','1']:
      return False
  return True

def hamming_distance(chaine1, chaine2):
    return sum(c1 != c2 for c1, c2 in zip(chaine1, chaine2))
  
def calc_crossover_stats(n_examples,inputs=binary_digits,trials=20):
  chosen_examples=list(np.random.choice(inputs,n_examples))

  generated = set()
  offspring = []
  offspring_valid = []
  duplicates = 0
  valid = 0
  total_cnt = 0
  for k in range(trials):
    random.shuffle(chosen_examples)
    out = do_crossover([],chosen_examples=chosen_examples,prompt_generator=create_crossover_prompt)
    res = out.split("\n")[:number_to_take]

    for ind in res:
      if is_valid(ind):
        valid+=1
        if ind in chosen_examples:
          duplicates+=1
        else:
          generated.add(ind)
          offspring_valid+=[ind]
      total_cnt+=1
    offspring += res


    print(res)
    offspring+=res
  return valid,duplicates,total_cnt,generated,offspring_valid,offspring

In [None]:
import pickle
out = open("variation_new3.pkl","wb")
pickle.dump(res_dict,out)

In [None]:
model_sizes = ["70m",'160m','410m','800m','1.4b','2.8b','6.9b','12b']
model_sizes = ['1b']
examples=[]
for k in range(100):
    beg = ""
    for b in range(9):
      beg = beg + random.choice(["0","1"])
    examples.append(beg)

for model_size in model_sizes:
  try:
    del generator
  except:
    pass
  import torch
  torch.cuda.empty_cache()

  model_name_template = "EleutherAI/pythia-{size}-deduped"
  model_name = model_name_template.format(size=model_size)
  generator = pipeline('text-generation', model=model_name,device=0) #

  scale_runs = [calc_crossover_stats(3,examples) for _ in range(10)]


  out = open("scale_crossover_{model}.pkl".format(model=model_size),"wb")
  pickle.dump(scale_runs,out)

In [None]:
import pickle
pickle.dump(scale_runs,out)

In [None]:
results = {'ones':ones_stats_runs,'zeros':zeros_stats_runs}
import pickle
out = open("heritable.pkl","wb")
pickle.dump(results,out)

In [None]:
out = open("evolution.pkl","wb")
pickle.dump(res,out)

## Simple genetic algorithm core loop

In [None]:
# simple GA

pop_size = 20
def fitness(x):
  return x.count("1")

def rand_fitness(x):
  return random.random()

#make a random initial population of letters -- e.g. one genome is like "ADGCB"
def create_population(size=pop_size,glength=6,gen_set='10'):
  pop = []
  for k in range(size):
    candidate = random.choice(zeros_neighborhood)[:]
    pop.append(candidate)
  print(pop)
  return pop

#process output of model into potential offspring
def process_output(output,take_offspring=2,verbose=True):
  candidates = []
  genomes = output.split("\n")[:-1]
  for genome in genomes[:take_offspring]:
    genome = genome.replace(' ','')
    if verbose: print(genome)
    try:
      if not is_valid(genome):
        continue
      candidates.append(genome)
    except:
      if verbose: print(genome,"failed")
  #return list(set(candidates))
  return list(candidates)


def do_evolution(fitness,pop_size=10,gens=10):
  #create pop
  pop = create_population(size=pop_size,glength=glength,gen_set = gen_set)
  #eval fit
  fit = list(map(fitness,pop))

  avg_fit_chart = []
  max_fit_chart = []
  pops = [pop]
  for generations in range(gens):
    #calc stats
    avg_fit = sum(fit)/pop_size
    max_fit = max(fit)

    avg_fit_chart.append(avg_fit)
    max_fit_chart.append(max_fit)

    print('gen ',generations, len(pop))
    print('avg fit ', avg_fit, 'max fit', max_fit)
    print(pop[fit.index(max_fit)])

    #create offspring
    off_pop = []

    #elitism of 1
    # find max fit indiv
    max_fit_indiv = pop[fit.index(max(fit))]
    off_pop.append(max_fit_indiv[:])

    while len(off_pop) < pop_size:
      out = do_crossover(pop,examples=3)
      candidates = process_output(out)
      off_pop += candidates
    off_pop = off_pop[:pop_size]

    #merged pop is old pop plus new offspring
    merged_pop = off_pop + pop
    fit = list(map(fitness,merged_pop))

    next_pop = []
    
    # do tournament selection to get back down to pop-size
    while len(next_pop) < pop_size:
      c1i = random.randint(0,pop_size*2-1)
      c2i = random.randint(0,pop_size*2-1)
      if fit[c1i] > fit[c2i]:
        print(fit[c1i],merged_pop[c1i])
        next_pop.append(merged_pop[c1i][:])
      else:
        next_pop.append(merged_pop[c2i][:])
    
    pop = next_pop
    fit = list(map(fitness,next_pop))
    print(next_pop)
    pops.append(pop)
  return avg_fit_chart,max_fit_chart,pops

In [None]:
res = {}
res['regular'] = [do_evolution(fitness,pop_size=30) for _ in range(10)]
res['random'] = [do_evolution(rand_fitness,pop_size=30) for _ in range(10)]
