In [48]:
import pandas as pd
import numpy as np
import random
import transformers
import torch
import dotenv
import os
import matplotlib.pyplot as plt
import re
import string

dotenv.load_dotenv()

True

In [49]:
model_id = "meta-llama/Meta-Llama-3-8B-Instruct"

pipeline = transformers.pipeline(
    "text-generation",
    model=model_id,
    model_kwargs={"torch_dtype": torch.bfloat16},
    device_map="auto",
    token=os.getenv('HF_TOKEN')
)

terminators = [
    pipeline.tokenizer.eos_token_id,
    pipeline.tokenizer.convert_tokens_to_ids("<|eot_id|>")
]

Loading checkpoint shards: 100%|██████████| 4/4 [00:46<00:00, 11.74s/it]
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


# Rule 110 Cellular Automaton

In [50]:
def make_cellular_problem_set(size, steps, num_problems, boundary='wrap'):
    if steps < 3:
        raise ValueError("Need 3 steps to have a solution and 2 intermediates")
    def int_to_binary_list(n, min_length=8):
        binary = bin(n)[2:]  # Convert to binary string and remove '0b' prefix
        binary_list = [int(b) for b in binary.zfill(min_length)]  # Pad with zeros if necessary
        return binary_list
    
    dict_110 = {
        (0, 0, 0): 0,
        (0, 0, 1): 1,
        (0, 1, 0): 1,
        (0, 1, 1): 1,
        (1, 0, 0): 0,
        (1, 0, 1): 1,
        (1, 1, 0): 1,
        (1, 1, 1): 0
    }

    def rule_110(prev):
        next_state = []
        for i in range(len(prev)):
            left = prev[(i-1) % len(prev)] if boundary == 'wrap' or (i > 0 and i < len(prev)-1) else boundary
            center = prev[i]
            right = prev[(i+1) % len(prev)] if boundary == 'wrap' or (i > 0 and i < len(prev)-1) else boundary
            pattern = (left, center, right)
            next_state.append(dict_110[pattern])
        return next_state
    
    def make_rule_110_problem(initial_state, steps):
        current_state = initial_state
        states = [current_state]
        for _ in range(steps):
            current_state = rule_110(current_state)
            states.append(current_state)
        return (''.join(str(x) for x in initial_state),
            ''.join(str(x) for x in states[-1]),
            ''.join(str(x) for x in states[1]),
            ''.join(str(x) for x in states[-2]))

    return pd.DataFrame(
        [make_rule_110_problem(int_to_binary_list(((i+1)*33581)%(2**(size))), steps) for i in range(num_problems)]
        , columns=['problem', 'correct_solution', 'intermediate_1', 'intermediate_2'])

In [51]:
make_cellular_problem_set(10, 3, 1000, boundary=0)

Unnamed: 0,problem,correct_solution,intermediate_1,intermediate_2
0,1100101101,1001000011,1101111111,1111000001
1,1001011010,1010000110,1011111110,1110000010
2,110000111,110110001,110001101,110011111
3,10110100,10001100,11111100,10000100
4,1111100001,1011101101,1000100011,1001100111
...,...,...,...,...
995,1100010100,1111101100,1100111100,1101100100
996,1001000001,1001001101,1011000011,1111000111
997,101101110,100011010,111111010,100001110
998,10011011,10100011,10111111,11100001


# SAT

In [73]:
from pysat.formula import CNF
from pysat.solvers import Glucose3

def solve_nsat(clauses):
    # Create a CNF formula
    cnf = CNF()
    for clause in clauses:
        cnf.append(clause)

    # Create a SAT solver
    with Glucose3(bootstrap_with=cnf) as solver:
        # Check if the formula is satisfiable
        if solver.solve():
            return solver.get_model()
        else:
            return None

In [80]:
def make_nsat_problem_set(vars_per_clause, num_clauses, num_problems):
    def make_nsat_problem(vars_per_clause, num_clauses):
        text_variables = [string.ascii_lowercase[i] for i in range(vars_per_clause)]
        text_problem = []
        pysat_problem = []
        for _ in range(num_clauses):
            clause = random.sample(range(vars_per_clause), 3)
            signs = [random.choice([-1, 1]) for _ in range(3)]
            pysat_clause = [signs[i]*(var+1) for i, var in enumerate(clause)]
            pysat_problem.append(pysat_clause)
            pysat_solution = solve_nsat(pysat_problem)
            if pysat_solution is None:
                text_solution = None
            else:
                text_solution_letters = [f"{'¬' if var <0 else ''}{text_variables[abs(var)-1]}" for i, var in enumerate(pysat_solution)]
                text_solution = f"{' ^ '.join(text_solution_letters)}"

            text_clause = [f"{'¬' if signs[i] == -1 else ''}{text_variables[var]}" for i, var in enumerate(clause)]
            text_problem.append(f"({' v '.join(text_clause)})")
        return ' ^ '.join(text_problem), text_solution, None, None

    return pd.DataFrame(
        [make_nsat_problem(vars_per_clause, num_clauses) for _ in range(num_problems)],
        columns=['problem', 'correct_solution', 'intermediate_1', 'intermediate_2'])

In [81]:
make_nsat_problem_set(3, 20, 1000)

Unnamed: 0,problem,correct_solution,intermediate_1,intermediate_2
0,(c v b v ¬a) ^ (a v ¬c v ¬b) ^ (b v ¬a v ¬c) ^...,,,
1,(¬c v ¬a v ¬b) ^ (a v ¬b v ¬c) ^ (¬b v ¬a v c)...,¬a ^ ¬b ^ ¬c,,
2,(a v c v b) ^ (a v c v ¬b) ^ (b v ¬a v c) ^ (b...,a ^ ¬b ^ c,,
3,(¬c v b v a) ^ (b v ¬a v ¬c) ^ (a v c v b) ^ (...,a ^ ¬b ^ ¬c,,
4,(¬b v c v a) ^ (c v ¬a v b) ^ (¬c v ¬b v a) ^ ...,a ^ b ^ ¬c,,
...,...,...,...,...
995,(¬b v ¬c v ¬a) ^ (c v ¬a v ¬b) ^ (¬a v ¬b v ¬c...,,,
996,(¬c v ¬a v ¬b) ^ (¬c v a v b) ^ (¬b v c v ¬a) ...,,,
997,(¬a v c v ¬b) ^ (b v ¬c v ¬a) ^ (¬c v b v ¬a) ...,¬a ^ b ^ c,,
998,(¬b v a v c) ^ (b v ¬c v a) ^ (¬b v ¬c v a) ^ ...,a ^ b ^ ¬c,,


# Dot Product

In [94]:
def make_dot_product_problem_set(vec_len, vec_mag, num_problems):
    if vec_len < 2:
        raise ValueError("Need vectors of length 2 or greater to have two intermediates")
    def make_dot_product_problem(vec_len):
        a = np.random.randint(-vec_mag, vec_mag, vec_len)
        b = np.random.randint(-vec_mag, vec_mag, vec_len)
        return (f"[{', '.join([str(x) for x in a])}] ⋅ [{', '.join([str(x) for x in b])}]"
        , np.dot(a, b)
        , a[0]*b[0],
        a[-1]*b[-1])

    return pd.DataFrame(
        [make_dot_product_problem(vec_len) for _ in range(num_problems)],
        columns=['problem', 'correct_solution', 'intermediate_1', 'intermediate_2'])

In [97]:
dot_problems = make_dot_product_problem_set(3, 10, 1000)
dot_problems

Unnamed: 0,problem,correct_solution,intermediate_1,intermediate_2
0,"[-5, 5, 7] ⋅ [9, 1, 8]",16,-45,56
1,"[5, 3, 4] ⋅ [3, 6, 8]",65,15,32
2,"[4, -1, 0] ⋅ [-8, -8, 5]",-24,-32,0
3,"[9, -2, 0] ⋅ [-8, 6, -7]",-84,-72,0
4,"[-3, 8, -2] ⋅ [-9, -1, 1]",17,27,-2
...,...,...,...,...
995,"[-8, 1, 9] ⋅ [3, 9, -2]",-33,-24,-18
996,"[0, 5, 3] ⋅ [-3, 9, 8]",69,0,24
997,"[-8, -4, -1] ⋅ [9, -5, 8]",-60,-72,-8
998,"[4, 0, -8] ⋅ [-1, 4, 6]",-52,-4,-48


In [98]:
def solve_problem_cot(problem, sys_prompt, cot_prompt):
    messages = [
    {"role": "system", "content": sys_prompt + ' ' + cot_prompt},
    {"role": "user", "content": problem},
    ]

    outputs = pipeline(
    messages,
    max_new_tokens=256,
    eos_token_id=terminators,
    do_sample=True,
    temperature=0.6,
    top_p=0.9,
    pad_token_id=pipeline.tokenizer.eos_token_id
    )

    return outputs[0]['generated_text'][-1]['content']

In [101]:
for i, row in dot_problems.iterrows():
    print(solve_problem_cot(row['problem'], "What is the dot product of these two vectors?", "Show your work."))

  if eos_token_id is not None and torch.isin(elements=eos_token_id, test_elements=pad_token_id).any():


KeyboardInterrupt: 