In [4]:
import numpy as np
import pandas as pd
import itertools
from matplotlib import pyplot as plt

# Linear equations


In [5]:
import numpy as np
import itertools

In [6]:
def get_par(root):
    if root == 0:
        return 'x'
    elif root > 0:
        return f'(x-{root})'
    else:
        return f'(x+{np.abs(root)})'
        
def get_coef(roots, power):
    s = np.sum([np.prod(cfs) for cfs in itertools.combinations(roots, len(roots)-power)])
    return pow(-1, power) * s

def get_monome(coef, power, dim=2):
    monome = ''

    if coef == 0:
        return monome

    if coef == -1:
        monome += '-' if power > 0 else '-1'
    elif coef == 1:
        if power == 0:
            monome += '+1'
        elif power < dim:
            monome += '+'
    elif coef != 1:
        if coef < 0:
            c = f'{coef}'
        elif power == dim:
            c = f'{coef}'
        else:
            c = f'+{coef}'
        monome += c

        
        if power > 0:
            monome += '*'
        # if power == dim:
        #     monome = monome[1:]
    
    if power == 0:
        return monome

    if power == 1:
        monome += 'x'
    elif power > 1:
        monome += f'x^{power}'
    return monome

In [7]:
max_root = 100
dim = 2
precision=0
shuffle_limit = 10

# roots = np.random.uniform(-max_root, max_root, dim).round(precision)
roots = np.random.uniform(-max_root, max_root, dim).astype(int)

In [8]:
# paretheses form
pars = [get_par(r) for r in sorted(roots)]
par_eq = '*'.join(pars) + '=0'

# canonic form
coeffs = [int(get_coef(roots, n)) for n in range(len(roots)+1)][::-1]
canonic_eq = ''.join([get_monome(get_coef(roots, n), n) for n in range(len(roots)+1)][::-1]) + '=0'

# discriminant
D = int(coeffs[1]**2 - 4*coeffs[0]*coeffs[2])
sqd = int(np.sqrt(D))
discr_eq = f'D={coeffs[1]}^2-4*{coeffs[0]}*{coeffs[2]}={D}={sqd}^2'
solution_eq = f'x=({-coeffs[1]}-{sqd})/{int(2*coeffs[0])}={min(roots)};x=({-coeffs[1]}+{sqd})/{int(2*coeffs[0])}={max(roots)}'

# shuffled form
shuffle_coeff = np.random.choice([-1, 1]) * round(np.random.uniform(1, shuffle_limit), precision)
base_coeffs = np.array(coeffs) * shuffle_coeff

if precision == 0:
    base_coeffs = base_coeffs.astype(int)
base_eq = ''.join([get_monome(coef, n) for coef, n in zip(base_coeffs, range(len(roots)+1)[::-1])]) + '=0'

In [9]:
def gen_equation_solution(max_root = 100,
                        dim = 2,
                        shuffle_limit = 10,
                        with_D = True,
                        with_canonic = True):
    roots = np.random.uniform(-max_root, max_root, dim).astype(int)
    # # paretheses form
    # pars = [get_par(r) for r in sorted(roots)]
    # par_eq = '*'.join(pars) + '=0'

    # canonic form
    canonic_coeffs = [int(get_coef(roots, n)) for n in range(len(roots)+1)][::-1]
    canonic_eq = ''.join([get_monome(get_coef(roots, n), n) for n in range(len(roots)+1)][::-1]) + '=0'

    # shuffled form
    shuffle_coeff = np.random.choice([-1, 1]) * round(np.random.uniform(1, shuffle_limit), precision)
    base_coeffs = (np.array(canonic_coeffs) * shuffle_coeff).astype(int)

    # discriminant
    if with_canonic:
        coeffs = canonic_coeffs
    else:
        coeffs = base_coeffs

    D = int(coeffs[1]**2 - 4*coeffs[0]*coeffs[2])
    sqd = int(np.sqrt(D))
    discr_eq = f'D={np.abs(coeffs[1])}^2-4*{coeffs[0]}*{coeffs[2]}={D}={sqd}^2'
    solution_eq = f'x=({-coeffs[1]}-{sqd})/{int(2*coeffs[0])}={min(roots)};x=({-coeffs[1]}+{sqd})/{int(2*coeffs[0])}={max(roots)}'

    if precision == 0:
        base_coeffs = base_coeffs.astype(int)
    base_eq = ''.join([get_monome(coef, n) for coef, n in zip(base_coeffs, range(len(roots)+1)[::-1])]) + '=0'

    answer = ','.join(np.unique(roots).astype(str))
    
    solution_equations = []
    if with_canonic:
        solution_equations.append(canonic_eq)
    if with_D:
        solution_equations.append(discr_eq)
        solution_equations.append(solution_eq)
        
    solution = ';'.join(solution_equations)
    return base_eq, solution, answer

In [10]:
coeffs

[1, 4, -6396]

In [11]:
print(roots)
print(par_eq)
print(solution_eq)
print(discr_eq)
print(canonic_eq)
print(base_eq)

[-82  78]
(x+82)*(x-78)=0
x=(-4-160)/2=-82;x=(-4+160)/2=78
D=4^2-4*1*-6396=25600=160^2
x^2+4*x-6396=0
-8*x^2-32*x+51168=0


In [12]:
eq, sol, ans = gen_equation_solution(with_canonic=True, with_D=True)
total = ';'.join((eq, sol, ans))

In [13]:
def gen_no_roots_equation(max_root=100, with_canonic=True, with_D=True):
    b = np.random.randint(max_root*2)
    c = np.random.randint(b**2/4, max_root**2)
    canonic_coeffs = [1, b, c]

    canonic_eq = ''.join([get_monome(c, pow) for c, pow in zip(canonic_coeffs, range(len(canonic_coeffs))[::-1])]) + '=0'

    # shuffled form
    shuffle_coeff = np.random.choice([-1, 1]) * round(np.random.uniform(1, shuffle_limit), precision)
    base_coeffs = np.array(canonic_coeffs) * shuffle_coeff
    if precision == 0:
        base_coeffs = base_coeffs.astype(int)

    # discriminant
    coeffs = canonic_coeffs if with_canonic else base_coeffs
    D = int(coeffs[1]**2 - 4*coeffs[0]*coeffs[2])
    # sqd = int(np.sqrt(D))
    discr_eq = f'D={np.abs(coeffs[1])}^2-4*{coeffs[0]}*{coeffs[2]}={D}'
    solution_eq = ''#f'x=({-coeffs[1]}-{sqd})/{int(2*coeffs[0])}={min(roots)},x=({-coeffs[1]}+{sqd})/{int(2*coeffs[0])}={max(roots)}'


    
    base_eq = ''.join([get_monome(coef, n) for coef, n in zip(base_coeffs, range(len(roots)+1)[::-1])]) + '=0'

    answer = ''#','.join(np.unique(roots).astype(str))
    solution_equations = []
    if with_canonic:
        solution_equations.append(canonic_eq)
    if with_D:
        solution_equations.append(discr_eq)
        solution_equations.append(solution_eq)
    solution = ';'.join(solution_equations)
    # equations = [canonic_eq, discr_eq, solution_eq]
    # equations = [eq for i, eq in enumerate(equations) if i not in skip_steps]
    return base_eq, solution, answer

In [14]:
import torch

class equation_generator:
    def __init__(self, power=2, max_root=1000, shuffle_limit=10, batch_size=32, seq_len=30, prob_no_roots=0.2, with_canonic=True, with_D=True, add_eos=False):
        self.batch_size = batch_size
        self.power = power
        self.max_root = max_root
        self.shuffle_limit = shuffle_limit
        self.set_dict()
        self.seq_len = seq_len
        self.src_mask = self.tgt_mask = None
        self.prob_no_roots = prob_no_roots

        self.with_canonic = with_canonic
        self.with_D = with_D
        
        self.add_eos = add_eos
        self.generated = []

    
    def set_dict(self):
        keys = [str(i) for i in range(10)]
        self.c2token = dict(zip(keys + ['+', '-', '*', '/', '^', '=', '(', ')', 'x', 'D', ';', ','], range(2, 24)))
        self.eos = 22
        
    
    def __next__(self):
        seq_len = self.seq_len

        if self.with_D:
            if self.with_canonic:
                N = 6
            else:
                N = 5
        else:
            N = 2

        src = np.zeros([self.batch_size, seq_len*N]).astype(int)
        tgt = np.zeros([self.batch_size, seq_len*N]).astype(int)
        # src[:, 0] = 1
        for i in range(self.batch_size):
            if np.random.uniform(0, 1) < self.prob_no_roots:
                eq, sol, ans = gen_no_roots_equation(with_canonic=self.with_canonic, with_D=self.with_D)
            else:
                eq, sol, ans = gen_equation_solution(with_canonic=self.with_canonic, with_D=self.with_D)
            total = ';'.join((eq, sol, ans))
            self.generated.append({'problem': eq, 'steps': sol, 'answer': ans})
            eq_tokens = [self.c2token[c] for c in eq]
            sol_tokens = [[self.c2token[c] for c in sol] for sol in sol.split(';')]
            ans_tokens = [self.c2token[c] for c in ans]
            
            src[i, :len(eq_tokens)] = eq_tokens
            tgt[i, :len(eq_tokens)-1] = eq_tokens[1:]
            
            if self.add_eos:
                src[i, len(eq_tokens)] = self.eos
                tgt[i, len(eq_tokens)-1] = self.eos
            
            for j, tokens in enumerate(sol_tokens):
                src[i, seq_len*(j+1):seq_len*(j+1) + len(tokens)] = tokens
                tgt[i, seq_len*(j+1)-1:seq_len*(j+1) + len(tokens)-1] = tokens

                if self.add_eos:
                    src[i, seq_len*(j+1) + len(tokens)] = self.eos
                    tgt[i, seq_len*(j+1) + len(tokens)-1] = self.eos

            # print(len(ans_tokens), ans_tokens)
            src[i, seq_len*(N-1):seq_len*(N-1)+len(ans_tokens)] = ans_tokens
            tgt[i, seq_len*(N-1)-1:seq_len*(N-1)+len(ans_tokens)-1] = ans_tokens
            
            if self.add_eos:
                src[i, seq_len*(N-1)+len(ans_tokens)] = self.eos
                tgt[i, seq_len*(N-1)+len(ans_tokens)-1] = self.eos
            

        return torch.tensor(src), torch.tensor(tgt), self.src_mask, self.tgt_mask  

In [15]:
gen = equation_generator(with_canonic=True, with_D=True, add_eos=True)

In [18]:

def generate_data(generator, task_name, path='../data', train_size=10_000, val_size=1_000, test_size=2_000, batch_size=32):
    Xs, ys = [], []
    total_size = train_size + test_size + val_size
    num_batches = total_size // batch_size * 3
    for _ in range(num_batches):
        X, y, _, _ = next(generator)
        Xs.append(X)
        ys.append(y)

    Xs = torch.vstack(Xs)
    ys = torch.vstack(ys)
    
    _, inds = np.unique(Xs, axis=0, return_index=True)
    inds = np.random.permutation(inds)
    
    Xs = Xs[inds][:total_size]
    ys = ys[inds][:total_size]
    
    np.save(f'{path}/{task_name}_train_X.npy', Xs[:train_size].cpu())
    np.save(f'{path}/{task_name}_train_y.npy', ys[:train_size].cpu())

    np.save(f'{path}/{task_name}_val_X.npy', Xs[train_size:train_size+val_size].cpu())
    np.save(f'{path}/{task_name}_val_y.npy', ys[train_size:train_size+val_size].cpu())

    np.save(f'{path}/{task_name}_test_X.npy', Xs[train_size+val_size:train_size+val_size+test_size].cpu())
    np.save(f'{path}/{task_name}_test_y.npy', ys[train_size+val_size:train_size+val_size+test_size].cpu())
    return inds

In [20]:
gen = equation_generator(with_canonic=True, with_D=True)
generate_data(gen, task_name='sqeq_cd', path='../data', train_size=100_000, val_size=10_000, test_size=20_000, batch_size=32)

array([ 76901, 239565,  53367, ..., 368242, 111061,  44081])