# Generating data

The purpose of this notebook is to generate and save some common datasets for training/testing. This way, results can easily be compared across different models.

In [47]:
import numpy as np
from itertools import product
import random
import pandas as pd
from pathlib import Path

from sklearn.model_selection import train_test_split

## Directories

In [48]:
code_dir = Path('../Code')
data_dir = code_dir / Path('data')
random_dir = data_dir / Path('random')
sum_strat_dir = data_dir / Path('sum_strat')
uniform_sum_dir = data_dir / Path('uniform_sum')

## Generation functions

Functions copy-pasted from my code.

### Utils functions

In [49]:
def one_hot(n, max_value):
    # One-hots a positive integer n where n <= max_value
    one_hot_n = np.zeros(max_value)
    one_hot_n[n] = 1
    return one_hot_n


def undo_one_hot(v):
    # If an integer is one-hot encoded using the one_hot function above, return the integer n
    return np.argmax(v)


def one_hot_matrix(M, max_value):
    # Given a matrix M of size (n_samples, n_ints) return the matrix one-hotted. The return matrix is
    # of size (n_samples, n_ints, max_value)
    n_samples, seq_length = M.shape
    M_oh = np.array([one_hot(r, max_value) for r in np.array(M).flatten()]).reshape(
        (n_samples, seq_length, max_value))

    # In case this is a target vector, we don't want to include an unnecessary axis
    return np.squeeze(M_oh)


def undo_one_hot_matrix(M, decoder_map):
    # Given a matrix M of size (n_samples, timesteps, vocab_size) coming from one_hot_matrix, return the sequence
    # that was encoded.
    decoded_list = []
    for i in range(M.shape[0]):
        decoded = ''
        sample = M[i]
        for ts in range(sample.shape[0]):
            decoded += decoder_map[undo_one_hot(sample[ts])]
        decoded_list.append(decoded)
    return decoded_list


def char_to_int_map(max_value=9, min_value=0):
    char_to_int = {str(n): n for n in range(min_value, max_value+1)}
    n_terms = max_value - min_value + 1
    char_to_int['+'] = n_terms
    char_to_int['\t'] = n_terms + 1
    char_to_int['\n'] = n_terms + 2
    char_to_int[' '] = n_terms + 3
    return char_to_int


def input_seq_length(n_terms, n_digits):
    # Given an addition sequence with n_terms terms each with n_digits, return how many characters the (non-padded)
    # resulting input string can be (maximum possible length)
    # n_digits for each term, and n_terms - 1 "plus signs", along with an end-of-string character \n and a
    # start-of-string character \t
    return n_terms * n_digits + (n_terms - 1) + 1


def target_seq_length(n_terms, n_digits):
    # Given an addition sequence with n_terms terms each with n_digits, return how many characters the (non-padded)
    # resulting output string can be (maximum possible length)
    # All terms except the final +2 come from simple algebra computing the max number of digits possible.
    # The final +1 comes from the start-of-sequence character \t that is prepended to all target sequences.
    # The inital +1 comes from the \n appended
    return 1 + n_digits + 1 + int(np.floor(np.log10(n_terms))) + 1


def reverse_dict(d):
    return {v: k for k, v in d.items()}


int_to_char = reverse_dict(char_to_int_map())


def decode_sample(x, decoder_map, one_hot=False):
    # Given an array with integer encoding (or optionally one-hot encoding), decode it into
    # a string
    if one_hot:
        x = undo_one_hot(x)
    return ''.join([decoder_map[s] for s in x])


def decode_matrix(X, decoder_map, one_hot=False):
    # Given a matrix with integer encoding (or optionally one-hot encoding), decode it into
    # a list of strings
    if one_hot:
        X = undo_one_hot_matrix(X)
    decoded_strs = [decode_sample(x, decoder_map) for x in X] 
    return decoded_strs

### Helper functions

In [50]:
def _generate_sample(n_terms, n_digits, allow_less_terms=False):
    # Generate a sample of the form "number_1+number_2+...+number_{n_terms}=answer"
    x = []
    if allow_less_terms:
        for _ in range(np.random.randint(2, n_terms + 1)):
            x.append(np.random.randint(10 ** n_digits - 1))
    else:
        for _ in range(n_terms):
            x.append(np.random.randint(10 ** n_digits - 1))

    y = np.sum(x)

    x_str = '+'.join(str(n) for n in x)
    y_str = str(y)
    return x_str.strip(), y_str.strip()


def _generate_sample_from_y(n_terms, n_digits, y):
    # Generates a sample which sums to y (used to uniformly distribute the sums)
    x = []
    while len(x) < n_terms - 1:
        # Don't allow it to pick a number causing sum(x) to exceed y, but also subject
        # to the restriction of n_digits.

        # Also, don't allow it to pick such a small number that it would be impossible
        # for the remaining terms to be chosen to sum to y (for example, if y = 150 and
        # n_terms = 2, n_digits = 2, we can't pick 49, or else you would need 101 to sum
        # to y.
        y_upper_bound = y - np.sum(x)
        n_digits_upper_bound = 10 ** n_digits - 1
        upper_bound = min([y_upper_bound, n_digits_upper_bound])
        lower_bound = (y - np.sum(x) - (10 ** n_digits - 1) * (n_terms - len(x) - 1))
        lower_bound = max([0, lower_bound])

        if upper_bound > 0:
            x.append(np.random.randint(lower_bound, upper_bound + 1))
        else:
            x.append(0)
    x.append(y - np.sum(x))
    random.shuffle(x)

    x_str = '+'.join(str(n) for n in x)
    y_str = str(y)
    return x_str.strip(), y_str.strip()


def _format_sample(x_str, y_str, n_terms, n_digits, int_encoder=None, reverse=False):
    # Format a sample of the form "number_1+number_2+...+number_{n_terms}=answer".
    # Each number_i has n_digits digits
    # If a dictionary is passed for int_encoder then use the it to convert characters to integers (so for instance
    # convert '3' to 3 or '+' to 12)

    if reverse:
        x_str = x_str[::-1]

    # Prepend an end-of-sequence character \n and for the target append a start-of-sequence character \t
    x_str = x_str + '\n'
    y_str = '\t' + y_str + '\n'
    
    # Pad x so that is always has the same length.
    max_input_digits = input_seq_length(n_terms, n_digits)
    x_str = x_str.ljust(max_input_digits)
    max_target_digits = target_seq_length(n_terms, n_digits)
    y_str = y_str.ljust(max_target_digits)

    if int_encoder is not None:
        assert isinstance(int_encoder, dict), 'int_encoder must be a dictionary mapping characters to integers'
        x_list = [int_encoder[c] for c in x_str]
        y_list = [int_encoder[c] for c in y_str]

    return x_list, y_list


def _generate_samples(n_samples, n_terms=2, n_digits=2, int_encoder=None, one_hot=False, reverse=False, allow_less_terms=False):
    # Generate n_samples examples of addition problems as defined in _generate_sample above
    X = []
    y = []
    for _ in range(n_samples):
        x_str, y_str = _generate_sample(n_terms, n_digits, allow_less_terms=allow_less_terms)
        x_sample, y_sample = _format_sample(x_str, y_str, n_terms, n_digits, int_encoder, reverse)
        X.append(x_sample)
        y.append(y_sample)

    X = np.array(X)
    y = np.array(y)

    if one_hot:
        X = one_hot_matrix(X, len(int_encoder))
        y = one_hot_matrix(y, len(int_encoder))

    return X, y


def _generate_uniform_samples(n_samples, n_terms=2, n_digits=2, int_encoder=None, one_hot=False, reverse=False):
    # Generate samples uniformly w.r.t. the sum
    max_sum = (10**n_digits - 1) * n_terms
    possible_sums = range(max_sum + 1)

    X = []
    y = []
    for _ in range(n_samples):
        x_str, y_str = _generate_sample_from_y(n_terms, n_digits, np.random.choice(possible_sums))
        x_sample, y_sample = _format_sample(x_str, y_str, n_terms, n_digits, int_encoder, reverse)
        assert len(x_sample) == 6, f'x_str = {x_str}, x_sample = {x_sample}'
        X.append(x_sample)
        y.append(y_sample)

    X = np.array(X)
    y = np.array(y)

    if one_hot:
        X = one_hot_matrix(X, len(int_encoder))
        y = one_hot_matrix(y, len(int_encoder))

    return X, y

In [51]:
def generate_samples(n_samples, n_terms=2, n_digits=2, int_encoder=None, one_hot=False, reverse=False,
                     allow_less_terms=False, uniform=False):
    # Generate n_samples series with conditions n_terms and n_digits
    if uniform:
        X, y = _generate_uniform_samples(n_samples, n_terms, n_digits, int_encoder, one_hot, reverse)
    else:
        X, y = _generate_samples(n_samples, n_terms, n_digits, int_encoder, one_hot, reverse, allow_less_terms)
    return np.array(X), np.array(y)


def generate_all_samples(n_terms=2, n_digits=2, int_encoder=None, one_hot=False, reverse=False):
    # Generate ALL possible integer addition problems with conditions n_terms and n_digits
    X = []
    y = []

    x_all = range(10 ** n_digits)
    x_cartesian = list(product(x_all, repeat=n_terms))
    for x in x_cartesian:
        x_str = '+'.join([str(a) for a in x])
        y_str = str(sum(x))
        x_str = x_str.strip()
        y_str = y_str.strip()
        x_sample, y_sample = _format_sample(x_str, y_str, n_terms, n_digits, int_encoder, reverse)
        X.append(x_sample)
        y.append(y_sample)

    assert len(X) == 10 ** (n_digits * n_terms), "You didn't generate all possible problems..."

    X = np.array(X)
    y = np.array(y)

    if one_hot:
        X = one_hot_matrix(X, len(int_encoder))
        y = one_hot_matrix(y, len(int_encoder))

    return X, y

## All samples

### Helper functions

In [52]:
def create_df(X, dataset, one_hot=False):
    assert dataset.lower() in ['train', 'test', 'validation'], 'Dataset must be one of "train", "test", "validation"'
    # Note that y is not needed, since we can infer the sum from X
    
    # Split into columns for each term
    string_col = decode_matrix(X, int_to_char, one_hot=one_hot)
    df = pd.DataFrame({'string': string_col})
    summands_df = df['string'].str.split('+', expand=True)
    summands_df.columns = [f'term_{i}' for i in range(summands_df.shape[1])]
    summands_df[summands_df.columns[-1]] = summands_df[summands_df.columns[-1]].str.replace('\n', '')
    df = df.join(summands_df)
    
    # Clean up the columns
    cols = list(df.columns)
    cols.remove('string')
    for c in cols:
        df[c] = df[c].str.strip('\n')
        df[c] = df[c].str.strip()
        df[c] = df[c].astype(int)
        
    df['sum'] = sum([df[c] for c in cols])
    
    if dataset.lower() == 'train':
        df['set'] = 'Train'
    elif dataset.lower() == 'test':
        df['set'] = 'Test'
    elif dataset.lower() == 'validation':
        df['set'] = 'Validation'
    
    return df

In [53]:
def save_full_df(X_train, X_test, path, fname):
    df_train = create_df(X_train, dataset='train')
    df_test = create_df(X_test, dataset='test')
    df = pd.concat([df_train, df_test])
    if not isinstance(path, Path):
        path = Path(path)
    if not isinstance(fname, Path):
        fname = Path(fname)
    df.to_csv(path / fname, index=False)

In [54]:
def train_test_from_idx(X, y, train_idx, test_idx=None, save=True, path=None, return_arrays=False):
    if test_idx is None:
        test_idx = list(set(range(X.shape[0])) - set(train_idx))
    
    X_train = X[train_idx]
    y_train = y[train_idx]
    
    X_test = X[test_idx]
    y_test = y[test_idx]
    
    if save and (path is not None):
        np.save(path / Path('X_train.npy'), X_train)
        np.save(path / Path('X_test.npy'), X_test)
        np.save(path / Path('y_train.npy'), y_train)
        np.save(path / Path('y_test.npy'), y_test)
    
    if return_arrays:
        return X_train, X_test, y_train, y_test

### 2 terms, 2 digits

In [55]:
n_terms = 3
n_digits = 2

model_dir = Path(f'{n_terms}term_{n_digits}digs')

random_dir = random_dir / model_dir
uniform_sum_dir = uniform_sum_dir / model_dir
sum_strat_dir = sum_strat_dir / model_dir

In [56]:
X, y = generate_all_samples(n_terms, n_digits, char_to_int_map(), one_hot=False)

In [57]:
X.shape

(1000000, 9)

In [58]:
y.shape

(1000000, 5)

In [59]:
df = create_df(X, dataset='train')

In [60]:
df.head()

Unnamed: 0,string,term_0,term_1,term_2,sum,set
0,0+0+0\n,0,0,0,0,Train
1,0+0+1\n,0,0,1,1,Train
2,0+0+2\n,0,0,2,2,Train
3,0+0+3\n,0,0,3,3,Train
4,0+0+4\n,0,0,4,4,Train


## Split randomly

Randomly split the data into training and testing (70% train, 30% test).

In [61]:
X_2_2_train, X_2_2_test, y_2_2_train, y_2_2_test = train_test_split(X, y, test_size=0.3)

In [62]:
np.save(random_dir / Path('X_train.npy'), X_2_2_train)
np.save(random_dir / Path('X_test.npy'), X_2_2_test)
np.save(random_dir / Path('y_train.npy'), y_2_2_train)
np.save(random_dir / Path('y_test.npy'), y_2_2_test)

In [63]:
save_full_df(X_2_2_train, X_2_2_test, random_dir, 'df.csv')

## Stratify by sum

Stratify the data by the sum, so that a fixed percentage of series summing to each value is kept.

In [70]:
df_sample = df.sample(frac=0.7, weights='sum', random_state=1)

In [71]:
df_sample.head()

Unnamed: 0,string,term_0,term_1,term_2,sum,set
501666,50+16+66\n,50,16,66,132,Train
778720,77+87+20\n,77,87,20,184,Train
358,0+3+58\n,0,3,58,61,Train
382484,38+24+84\n,38,24,84,146,Train
201255,20+12+55\n,20,12,55,87,Train


In [72]:
train_idx = np.array(df_sample.index)
test_idx = np.array(list(set(df.index) - set(train_idx)))

In [73]:
X_train, X_test, _, _ = train_test_from_idx(X, y, train_idx, test_idx, path=sum_strat_dir, return_arrays=True)

In [74]:
save_full_df(X_train, X_test, sum_strat_dir, 'df.csv')

## Uniform by sum

Sample (with replacement) so that each sum appears an equal number of times. From there, randomly sample some data for the test set.

In [75]:
df['sum'].value_counts().max()

7500

In [76]:
df_over = df.groupby('sum').sample(100, replace=True)

In [77]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

In [78]:
save_full_df(X_train, X_test, uniform_sum_dir, 'df.csv')

In [79]:
np.save(uniform_sum_dir / Path('X_train.npy'), X_train)
np.save(uniform_sum_dir / Path('X_test.npy'), X_test)
np.save(uniform_sum_dir / Path('y_train.npy'), y_train)
np.save(uniform_sum_dir / Path('y_test.npy'), y_test)