In [0]:
import os, pkgutil, numpy as np, re, shutil

from tensorflow.keras.layers import Embedding, LSTM, Dense
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.callbacks import ModelCheckpoint

from collections import OrderedDict
from io import BytesIO
from tokenize import tokenize, STRING, INDENT, DEDENT

DEBUG = False

In [None]:
def read_file(path, verbose=False):
    """
    Read data from a file and return its contents in a string.
    :param path: str, path to file's location
    :param verbose: bool, whether to print error message
    :return: str, file's content or empty string if file not found.
    """
    try:
        with open(path, 'r') as file:
            return file.read()
    except FileNotFoundError:
        if verbose:
            print('INCORRECT FILE PATH:', path)
        return ''


def write_file(data, path, append=False):
    """
    Write information provided into file. overwrites all existing data and creates new file if necessary.
    :param data: str, information to write to file
    :param path: path to data's destination
    :param append: bool, whether to append or overwrite file
    :return: None
    """
    mode = 'w'
    if append:
        mode = 'a'

    with open(path, mode) as file:
        file.write(data)


def copy_file(source_path, dest_path):
    """
    Copies the content of a source file to either another arbitrary file path or to an index in the buffer.
    :param source_path: str, path to the source file
    :param dest_path: str, path to files destination
    :return: bool, success or failure
    """
    data = read_file(source_path)
    if data:
        write_file(data, dest_path)
        return True

    return False


def get_dir_length(path):
    """
    Gets number of files in buffer.
    :return: int, number of files in buffer directory
    """
    return len([0 for name in os.listdir(path) if os.path.isfile(name)])


def get_importable_modules():
    """
    get a list of all importable modules in current venv.
    :return: list, list of strs, each of which is the name of an importable module
    """
    modules = []
    for pkg in pkgutil.iter_modules():
        modules.append(pkg.name)

    return modules


In [None]:
class PyTokenizer:
    def __init__(self, max_vocab_len):
        """
        Create a tokenizer for python scripts.
        :param max_vocab_len: int, maximum size of vocabulary length. Actual length may be less
        """
        self.max_vocab_len = max_vocab_len
        self.word_idx = {}
        self.idx_word = {}

    def fit_on_data(self, data):
        """
        Create token index from data.
        :param data: str, corpus to create tokenizer on
        :return: None
        """
        tokens = PyTokenizer.py_tokenize(data)
        word_counts = OrderedDict()
        for t in tokens:
            if t in word_counts:
                word_counts[t] += 1
            else:
                word_counts[t] = 1

        wcounts = list(word_counts.items())
        wcounts.sort(key=lambda x: x[1], reverse=True)
        wcounts.insert(0, ['OOV', None])

        if len(wcounts) > self.max_vocab_len:
            wcounts = wcounts[:self.max_vocab_len]

        self.word_idx = dict(zip([wc[0] for wc in wcounts], list(range(len(wcounts)))))
        self.idx_word = dict(zip(list(range(1, len(wcounts) + 1)), [wc[0] for wc in wcounts]))

    def text_to_sequence(self, text):
        """
        Convert string to sequence of token indices.
        :param text: str, text to tokenize
        :return: list, list of token indices
        """
        tokens = PyTokenizer.py_tokenize(text)
        return [self.word_idx.get(t, 1) for t in tokens]

    def sequence_to_text(self, seq):
        """
        Convert list of token indices to python string.
        :param seq: list, list of integer indices
        :return: str, joined token list
        """
        string_tokens = [self.idx_word.get(i, 'OOV') for i in seq]
        print(string_tokens, '\n\n')
        return PyTokenizer.py_untokenize(string_tokens)

    @property
    def real_vocab_len(self):
        """
        Get actual length of vocabulary
        :return: int, actual vocab length
        """
        return len(self.word_idx)

    @staticmethod
    def py_tokenize(data):
        """
        Convert py string into tokens.
        :param data: str, python script
        :return: list, list of string tokens.
        """
        token_generator = tokenize(BytesIO(data.encode('utf-8')).readline)
        tokens = []
        print_next = False
        i = 0
        while True:
            try:
                token_type, val, start, end, line = next(token_generator)
            except Exception:
                break

            if DEBUG and ("No data provided for" in val or (print_next and i < 40)):
                print('In function py_tokenize. TOKEN_TYPE:', token_type, 'VALUE:', val, 'START_POS:', start,
                      'END_POS:', end, 'FULL_LINE:', line[:-1])
                print_next = True
                i += 1

            if token_type == STRING:
                if val[0] != '"' and val[0] != "'":
                    str_contents = val[2:-1].split(' ')
                else:
                    str_contents = val[1:-1].split(' ')
                str_contents = [t for t in str_contents if t]
                tokens.extend(["'", *str_contents, "'"])
            elif token_type == INDENT:
                tokens.append('INDENT')
            elif token_type == DEDENT:
                tokens.append('DEDENT')
            elif val == 'utf-8':
                continue
            else:
                tokens.append(val)

        return tokens

    @staticmethod
    def py_untokenize(tokens):
        """
        Convert list of string tokens to single python script string
        :param tokens: list, list of strings
        :return: str, joined tokens
        """
        joined_tokens = ''
        indent = 0
        cont_str = False
        str_buffer = ''
        start_line = False
        num_lines = 1
        for i, t in enumerate(tokens):
            if start_line and t != 'INDENT' and t != 'DEDENT':
                joined_tokens += ' ' * indent
                start_line = False
            elif t == 'INDENT':
                indent += 4
                continue
            elif t == 'DEDENT':
                indent = max(0, indent - 4)
                continue

            if cont_str:
                if t == '"' or t == "'":
                    joined_tokens += str_buffer + t + ' '
                    str_buffer = ''
                    cont_str = False
                elif t == 'EOF':
                    if DEBUG:
                        print('ERROR: OPEN STRING WHEN EOF REACHED @ token', i, '@ line', num_lines,
                              'in Function: py_untokenize')
                    joined_tokens += str_buffer[0] * 2 + '\nEOF\n'
                    last_10 = joined_tokens[-10:]
                    str_buffer = ''
                    cont_str = False
                else:
                    str_buffer += t + ' '
            elif t == "'" or t == '"':
                str_buffer += "'"
                cont_str = True
            elif t == '\n':
                start_line = True
                joined_tokens += '\n'
            elif t == 'EOF':
                num_lines += 1
                continue
            else:
                joined_tokens += t + ' '

        return joined_tokens

In [None]:
class PreProcessor:
    def __init__(self, buffer_dir, clean_file, tokenizer):
        """
        Create processor.
        :param buffer_dir: str, path to buffer directory
        :param clean_file: str, path to cleaned data file
        :param tokenizer: Tokenizer, tokenizer object
        """
        self.buffer_dir = buffer_dir
        self.clean_file = clean_file
        self.tokenizer = tokenizer

    def get_training_data(self, sample_len=50, step=1, one_hot_input=False):
        """
        get training data in form necessary for model training.
        :param sample_len: int, length of the samples to generate
        :param step: step to travel training sequence with
        :param one_hot_input: bool, whether to convert input to one hot vectors or not
        :return x: np.array, training inputs w/ dim (#samples, sample_len)
        :return y: np.array, training labels w/ dim (#samples, vocab_len)
        """
        if self.tokenizer.real_vocab_len == 0:
            data = read_file(self.clean_file)
            self.tokenizer.fit_on_data(data)

        data = read_file(self.clean_file, True)
        tokens = self.tokenizer.text_to_sequence(data)

        statements = []
        next_statements = []
        for i in range(0, len(tokens) - sample_len, step):
            statements.append(tokens[i:i + sample_len])
            next_statements.append(tokens[i + sample_len])

        if one_hot_input:
            raise NotImplementedError("Need to implement this")
        else:
            x = np.array(statements, dtype=np.int)

        y = np.zeros((len(statements), self.tokenizer.real_vocab_len), dtype=np.int)
        for i, next_statement in enumerate(next_statements):  # one hots y matrix
            y[i, next_statement] = 1

        if DEBUG:
            print('x shape:', x.shape, 'y shape:', y.shape)

        return x, y

    # DATA CLEANSING

    def clean_buffer(self, empty=True, append=True):
        """
        clean all files in buffer and add to cleaned data file and empty buffer if necessary.
        :param empty: bool, whether to empty buffer or not.
        :param append: bool, whether to append cleaned data into clean file or overwrite it.
        :return: None
        """
        write_file('', self.clean_file, append)  # clear old file if necessary
        for i, file in enumerate(os.listdir(self.buffer_dir)):
            path = os.path.join(self.buffer_dir, file)
            data = read_file(path)
            if not data:
                os.remove(path)
                continue
            try:
                clean_data = self.process_text(data)
                write_file(clean_data, self.clean_file, True)
            except Exception:
                if DEBUG:
                    print('Error found tokenizing', path)

        if empty:
            shutil.rmtree(self.buffer_dir)
            os.mkdir(self.buffer_dir)

    @staticmethod
    def process_text(data):
        """
        Clean a text string of python code.
        :param data: str, python code
        :return: str, cleaned python code
        """
        # delete comments
        def comment_subber(match_obj):
            string = match_obj.group(0)
            if string.startswith("'''") or string.startswith('"""') or string.startswith('#'):
                return ''
            return string

        comment_pattern = '""".*?"""|\'\'\'.*?\'\'\'|"(\\[\s\S]|[^"])*"|\'(\\[\s\S]|[^\'])*\'|#[\s\S]*'
        data = re.compile(comment_pattern, re.DOTALL).sub(comment_subber, data)

        # remove imports
        data = re.sub('(\n|^)(import|from).*', '', data)

        # add special EOF token
        data = re.sub('[\n\s]*EOF[\n\s]*', '', data)  # deletes any old EOF tokens
        data += '\nEOF\n'

        # remove unnecessary newlines
        data = PreProcessor.remove_newlines(data)

        return data

    @staticmethod
    def remove_newlines(data):
        """
        Remove unnecessary newlines from python string
        :param data: str, python code
        :return: str, cleaned python code
        """
        data = re.sub(r'\n[\n\s]*\n', '\n', data)
        while data[0] == '\n':  # check for newline @ file start
            data = data[1:]

        return data

In [None]:
class Model:
    def __init__(self, vocab_len, sample_len, **hyper_params):
        self.vocab_len = vocab_len
        self.sample_len = sample_len

        self.model = Sequential()
        self.model.add(Embedding(vocab_len, 512))
        self.model.add(LSTM(300, return_sequences=True))
        self.model.add(LSTM(300, return_sequences=True))
        self.model.add(LSTM(300, return_sequences=True))
        self.model.add(LSTM(128))
        self.model.add(Dense(vocab_len, activation='softmax'))

        optimizer = RMSprop(lr=1e-5)
        self.model.compile(loss='categorical_crossentropy',
                           optimizer=optimizer,
                           metrics=['accuracy'])

    def train(self, x, y, epochs=1, mini_batch_size=128):
        """
        train model on given data
        :param x: np.array, training inputs, dims (#samples, sample_len)
        :param y: np.array, training labels, dims (#samples, vocab_len)
        :param epochs: int, number of iterations to train on data
        :param mini_batch_size: int, size of mini batches
        :return: None
        """
        assert x.shape[1] == self.sample_len, 'Incorrect sample length. Given: {}, Expecting: {}'.format(
            x.shape[1], self.sample_len)

        checkpoint = ModelCheckpoint("best_model",
                                     monitor='loss',
                                     verbose=1,
                                     save_best_only=True,
                                     mode='auto',
                                     period=1)

        self.model.fit(x, y, mini_batch_size, epochs, verbose=1, callbacks=[checkpoint])

    def generate_script(self, seed, temp=0.5, **stop):
        """
        generate a script of certain length or until a token idx is reached.
        :param seed: np.array, input into model to generate sample from, dims (1, sample_len)
        :param temp: float, softmax temperature, amount of entropy to include in sample
        :param stop: kwargs, either len (int), number of new tokens to generate; or token (int) idx of token to stop at
        :return: list, full sequence generated as list of token indices (includes seed)
        """
        assert seed.shape[1] == self.sample_len, 'Incorrect sample length. Given: {}, Expecting: {}'.format(
            seed.shape[1], self.sample_len)
        generated_sequence = list(seed[0])

        while True:
            pred = self.model.predict(seed, verbose=0)[0]
            next_idx = self.sample_next_token(pred, temp)
            generated_sequence.append(next_idx)
            if stop.get('len', -1) == len(generated_sequence) - self.sample_len or stop.get('token', -1) == next_idx:
                break
            seed[0] = Model.np_shift(seed[0], -1)
            seed[0, -1] = next_idx

        return generated_sequence

    @staticmethod
    def sample_next_token(seed, temp):
        """
        sample next token given model's output sequence.
        :param seed: np.array, output from model used to pick next token
        :param temp: float, amount of randomness to use when sampling next token
        :return: int, index of sampled token
        """
        preds = np.asarray(seed).astype('float64')
        preds = np.log(preds) / temp
        exp_preds = np.exp(preds)
        probs = np.random.multinomial(1, exp_preds / np.sum(exp_preds), 1)
        return np.argmax(probs)

    @staticmethod
    def np_shift(xs, n):
        if n >= 0:
            return np.concatenate((np.full(n, np.nan), xs[:-n]))
        else:
            return np.concatenate((xs[-n:], np.full(-n, np.nan)))


In [None]:
t = PyTokenizer(5000)
p = PreProcessor('', 'clean.py', t)
x, y = p.get_training_data(50)

m = Model(t.real_vocab_len, 50)
m.train(x, y, 20, 64)

In [None]:
!zip -r ./best_model.zip ./best_model/

In [None]:
from tensorflow.keras.models import load_model


def generate_seed(tokenizer, sample_len):
    """
    Get seed from cleaned file.
    :param tokenizer: PyTokenizer, tokenizer to use.
    :param sample_len: int, length of seed to generate
    :return: np.array, model seed shape (1, sample_len)
    """
    tokens = []
    with open('clean.py', 'r') as file:
        while len(tokens) < sample_len:
            tokens.extend(tokenizer.text_to_sequence(file.readline()))
            print(len(tokens))

    if len(tokens) != sample_len:
        tokens = tokens[:sample_len]

    return np.asarray(tokens, dtype=np.int).reshape((1, sample_len))

m.model = load_model('best_model')
print(m.sample_len)
script = m.generate_script(generate_seed(t, m.sample_len), token=t.word_idx['EOF'])
print(t.sequence_to_text(script))

In [None]:
m.model = load_model('best_model')
m.train(x, y, 20, 64)

In [0]:
m.model = load_model('best_model')
m.train(x, y, 20, 64)

In [0]:
class PyTokenizer:
    def __init__(self, max_vocab_len):
        """
        Create a tokenizer for python scripts.
        :param max_vocab_len: int, maximum size of vocabulary length. Actual length may be less
        """
        self.max_vocab_len = max_vocab_len
        self.word_idx = {}
        self.idx_word = {}

    def fit_on_data(self, data):
        """
        Create token index from data.
        :param data: str, corpus to create tokenizer on
        :return: None
        """
        tokens = PyTokenizer.py_tokenize(data)
        word_counts = OrderedDict()
        for t in tokens:
            if t in word_counts:
                word_counts[t] += 1
            else:
                word_counts[t] = 1

        wcounts = list(word_counts.items())
        wcounts.sort(key=lambda x: x[1], reverse=True)
        wcounts.insert(0, ['OOV', None])

        if len(wcounts) > self.max_vocab_len:
            wcounts = wcounts[:self.max_vocab_len]

        self.word_idx = dict(zip([wc[0] for wc in wcounts], list(range(len(wcounts)))))
        self.idx_word = dict(zip(list(range(1, len(wcounts) + 1)), [wc[0] for wc in wcounts]))

    def text_to_sequence(self, text):
        """
        Convert string to sequence of token indices.
        :param text: str, text to tokenize
        :return: list, list of token indices
        """
        tokens = PyTokenizer.py_tokenize(text)
        return [self.word_idx.get(t, 1) for t in tokens]

    def sequence_to_text(self, seq):
        """
        Convert list of token indices to python string.
        :param seq: list, list of integer indices
        :return: str, joined token list
        """
        string_tokens = [self.idx_word.get(i, 'OOV') for i in seq]
        print(string_tokens, '\n\n')
        return PyTokenizer.py_untokenize(string_tokens)

    @property
    def real_vocab_len(self):
        """
        Get actual length of vocabulary
        :return: int, actual vocab length
        """
        return len(self.word_idx)

    @staticmethod
    def py_tokenize(data):
        """
        Convert py string into tokens.
        :param data: str, python script
        :return: list, list of string tokens.
        """
        token_generator = tokenize(BytesIO(data.encode('utf-8')).readline)
        tokens = []
        print_next = False
        i = 0
        while True:
            try:
                token_type, val, start, end, line = next(token_generator)
            except Exception:
                break

            if DEBUG and ("No data provided for" in val or (print_next and i < 40)):
                print('In function py_tokenize. TOKEN_TYPE:', token_type, 'VALUE:', val, 'START_POS:', start,
                      'END_POS:', end, 'FULL_LINE:', line[:-1])
                print_next = True
                i += 1

            if token_type == STRING:
                if val[0] != '"' and val[0] != "'":
                    str_contents = val[2:-1].split(' ')
                else:
                    str_contents = val[1:-1].split(' ')
                str_contents = [t for t in str_contents if t]
                tokens.extend(["'", *str_contents, "'"])
            elif token_type == INDENT:
                tokens.append('INDENT')
            elif token_type == DEDENT:
                tokens.append('DEDENT')
            elif val == 'utf-8':
                continue
            else:
                tokens.append(val)

        return tokens

    @staticmethod
    def py_untokenize(tokens):
        """
        Convert list of string tokens to single python script string
        :param tokens: list, list of strings
        :return: str, joined tokens
        """
        joined_tokens = ''
        indent = 0
        cont_str = False
        str_buffer = ''
        start_line = False
        num_lines = 1
        for i, t in enumerate(tokens):
            if start_line and t != 'INDENT' and t != 'DEDENT':
                joined_tokens += ' ' * indent
                start_line = False
            elif t == 'INDENT':
                indent += 4
                continue
            elif t == 'DEDENT':
                indent = max(0, indent - 4)
                continue

            if cont_str:
                if t == '"' or t == "'":
                    joined_tokens += str_buffer + t + ' '
                    str_buffer = ''
                    cont_str = False
                elif t == 'EOF':
                    if DEBUG:
                        print('ERROR: OPEN STRING WHEN EOF REACHED @ token', i, '@ line', num_lines,
                              'in Function: py_untokenize')
                    joined_tokens += str_buffer[0] * 2 + '\nEOF\n'
                    last_10 = joined_tokens[-10:]
                    str_buffer = ''
                    cont_str = False
                else:
                    str_buffer += t + ' '
            elif t == "'" or t == '"':
                str_buffer += "'"
                cont_str = True
            elif t == '\n':
                start_line = True
                joined_tokens += '\n'
            elif t == 'EOF':
                num_lines += 1
                continue
            else:
                joined_tokens += t + ' '

        return joined_tokens

In [0]:
class PreProcessor:
    def __init__(self, buffer_dir, clean_file, tokenizer):
        """
        Create processor.
        :param buffer_dir: str, path to buffer directory
        :param clean_file: str, path to cleaned data file
        :param tokenizer: Tokenizer, tokenizer object
        """
        self.buffer_dir = buffer_dir
        self.clean_file = clean_file
        self.tokenizer = tokenizer

    def get_training_data(self, sample_len=50, step=1, one_hot_input=False):
        """
        get training data in form necessary for model training.
        :param sample_len: int, length of the samples to generate
        :param step: step to travel training sequence with
        :param one_hot_input: bool, whether to convert input to one hot vectors or not
        :return x: np.array, training inputs w/ dim (#samples, sample_len)
        :return y: np.array, training labels w/ dim (#samples, vocab_len)
        """
        if self.tokenizer.real_vocab_len == 0:
            data = read_file(self.clean_file)
            self.tokenizer.fit_on_data(data)

        data = read_file(self.clean_file, True)
        tokens = self.tokenizer.text_to_sequence(data)

        statements = []
        next_statements = []
        for i in range(0, len(tokens) - sample_len, step):
            statements.append(tokens[i:i + sample_len])
            next_statements.append(tokens[i + sample_len])

        if one_hot_input:
            raise NotImplementedError("Need to implement this")
        else:
            x = np.array(statements, dtype=np.int)

        y = np.zeros((len(statements), self.tokenizer.real_vocab_len), dtype=np.int)
        for i, next_statement in enumerate(next_statements):  # one hots y matrix
            y[i, next_statement] = 1

        if DEBUG:
            print('x shape:', x.shape, 'y shape:', y.shape)

        return x, y

    # DATA CLEANSING

    def clean_buffer(self, empty=True, append=True):
        """
        clean all files in buffer and add to cleaned data file and empty buffer if necessary.
        :param empty: bool, whether to empty buffer or not.
        :param append: bool, whether to append cleaned data into clean file or overwrite it.
        :return: None
        """
        write_file('', self.clean_file, append)  # clear old file if necessary
        for i, file in enumerate(os.listdir(self.buffer_dir)):
            path = os.path.join(self.buffer_dir, file)
            data = read_file(path)
            if not data:
                os.remove(path)
                continue
            try:
                clean_data = self.process_text(data)
                write_file(clean_data, self.clean_file, True)
            except Exception:
                if DEBUG:
                    print('Error found tokenizing', path)

        if empty:
            shutil.rmtree(self.buffer_dir)
            os.mkdir(self.buffer_dir)

    @staticmethod
    def process_text(data):
        """
        Clean a text string of python code.
        :param data: str, python code
        :return: str, cleaned python code
        """
        # delete comments
        def comment_subber(match_obj):
            string = match_obj.group(0)
            if string.startswith("'''") or string.startswith('"""') or string.startswith('#'):
                return ''
            return string

        comment_pattern = '""".*?"""|\'\'\'.*?\'\'\'|"(\\[\s\S]|[^"])*"|\'(\\[\s\S]|[^\'])*\'|#[\s\S]*'
        data = re.compile(comment_pattern, re.DOTALL).sub(comment_subber, data)

        # remove imports
        data = re.sub('(\n|^)(import|from).*', '', data)

        # add special EOF token
        data = re.sub('[\n\s]*EOF[\n\s]*', '', data)  # deletes any old EOF tokens
        data += '\nEOF\n'

        # remove unnecessary newlines
        data = PreProcessor.remove_newlines(data)

        return data

    @staticmethod
    def remove_newlines(data):
        """
        Remove unnecessary newlines from python string
        :param data: str, python code
        :return: str, cleaned python code
        """
        data = re.sub(r'\n[\n\s]*\n', '\n', data)
        while data[0] == '\n':  # check for newline @ file start
            data = data[1:]

        return data

In [0]:
class Model:
    def __init__(self, vocab_len, sample_len, **hyper_params):
        self.vocab_len = vocab_len
        self.sample_len = sample_len

        self.model = Sequential()
        self.model.add(Embedding(vocab_len, 512))
        self.model.add(LSTM(300, return_sequences=True))
        self.model.add(LSTM(300, return_sequences=True))
        self.model.add(LSTM(300, return_sequences=True))
        self.model.add(LSTM(128))
        self.model.add(Dense(vocab_len, activation='softmax'))

        optimizer = RMSprop(lr=1e-5)
        self.model.compile(loss='categorical_crossentropy',
                           optimizer=optimizer,
                           metrics=['accuracy'])

    def train(self, x, y, epochs=1, mini_batch_size=128):
        """
        train model on given data
        :param x: np.array, training inputs, dims (#samples, sample_len)
        :param y: np.array, training labels, dims (#samples, vocab_len)
        :param epochs: int, number of iterations to train on data
        :param mini_batch_size: int, size of mini batches
        :return: None
        """
        assert x.shape[1] == self.sample_len, 'Incorrect sample length. Given: {}, Expecting: {}'.format(
            x.shape[1], self.sample_len)

        checkpoint = ModelCheckpoint("best_model",
                                     monitor='loss',
                                     verbose=1,
                                     save_best_only=True,
                                     mode='auto',
                                     period=1)

        self.model.fit(x, y, mini_batch_size, epochs, verbose=1, callbacks=[checkpoint])

    def generate_script(self, seed, temp=0.5, **stop):
        """
        generate a script of certain length or until a token idx is reached.
        :param seed: np.array, input into model to generate sample from, dims (1, sample_len)
        :param temp: float, softmax temperature, amount of entropy to include in sample
        :param stop: kwargs, either len (int), number of new tokens to generate; or token (int) idx of token to stop at
        :return: list, full sequence generated as list of token indices (includes seed)
        """
        assert seed.shape[1] == self.sample_len, 'Incorrect sample length. Given: {}, Expecting: {}'.format(
            seed.shape[1], self.sample_len)
        generated_sequence = list(seed[0])

        while True:
            pred = self.model.predict(seed, verbose=0)[0]
            next_idx = self.sample_next_token(pred, temp)
            generated_sequence.append(next_idx)
            if stop.get('len', -1) == len(generated_sequence) - self.sample_len or stop.get('token', -1) == next_idx:
                break
            seed[0] = Model.np_shift(seed[0], -1)
            seed[0, -1] = next_idx

        return generated_sequence

    @staticmethod
    def sample_next_token(seed, temp):
        """
        sample next token given model's output sequence.
        :param seed: np.array, output from model used to pick next token
        :param temp: float, amount of randomness to use when sampling next token
        :return: int, index of sampled token
        """
        preds = np.asarray(seed).astype('float64')
        preds = np.log(preds) / temp
        exp_preds = np.exp(preds)
        probs = np.random.multinomial(1, exp_preds / np.sum(exp_preds), 1)
        return np.argmax(probs)

    @staticmethod
    def np_shift(xs, n):
        if n >= 0:
            return np.concatenate((np.full(n, np.nan), xs[:-n]))
        else:
            return np.concatenate((xs[-n:], np.full(-n, np.nan)))


In [8]:
t = PyTokenizer(5000)
p = PreProcessor('', 'clean.py', t)
x, y = p.get_training_data(50)

m = Model(t.real_vocab_len, 50)
m.train(x, y, 20, 64)

Epoch 1/20
Epoch 00001: loss improved from inf to 6.92105, saving model to best_model
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
INFO:tensorflow:Assets written to: best_model/assets
Epoch 2/20
Epoch 00002: loss improved from 6.92105 to 6.19113, saving model to best_model
INFO:tensorflow:Assets written to: best_model/assets
Epoch 3/20
Epoch 00003: loss improved from 6.19113 to 5.64619, saving model to best_model
INFO:tensorflow:Assets written to: best_model/assets
Epoch 4/20
Epoch 00004: loss improved from 5.64619 to 5.23476, saving model to best_model
INFO:tensorflow:Assets written to: best_model/assets
Epoch 5/20
Epoch 00005: loss improved from 5.23476 to 4.94722, saving model to best_model
INFO:tensorflow:Assets written to: best_model/assets
Epoch 6/20
Epoch 00006: loss improved from 4.94722 to 4.76749, saving model to best_model
INFO:tensorflow:Assets written to: best_model/assets
Epoch 7/20
Epoch 00007: loss improved from 4.76749 to 4.66832, sa

KeyboardInterrupt: ignored

In [10]:
!zip -r ./best_model.zip ./best_model/

updating: best_model/ (stored 0%)
updating: best_model/variables/ (stored 0%)
updating: best_model/variables/variables.index (deflated 68%)
updating: best_model/variables/variables.data-00000-of-00001 (deflated 6%)
updating: best_model/saved_model.pb (deflated 90%)
updating: best_model/assets/ (stored 0%)


In [17]:
from tensorflow.keras.models import load_model


def generate_seed(tokenizer, sample_len):
    """
    Get seed from cleaned file.
    :param tokenizer: PyTokenizer, tokenizer to use.
    :param sample_len: int, length of seed to generate
    :return: np.array, model seed shape (1, sample_len)
    """
    tokens = []
    with open('clean.py', 'r') as file:
        while len(tokens) < sample_len:
            tokens.extend(tokenizer.text_to_sequence(file.readline()))
            print(len(tokens))

    if len(tokens) != sample_len:
        tokens = tokens[:sample_len]

    return np.asarray(tokens, dtype=np.int).reshape((1, sample_len))

m.model = load_model('best_model')
print(m.sample_len)
script = m.generate_script(generate_seed(t, m.sample_len), token=t.word_idx['EOF'])
print(t.sequence_to_text(script))

50
11
19
26
38
45
54
y loss = sample_weight_mode , add_metaclass = abc ( OOV _compile_weighted_metrics converted weighted_metrics , h5py ( . OOV _compile_weighted_metrics : if 3 , 'OOV : filter_sk_params ) Loss = fpath = reduction ' OOV _compile_weighted_metrics : kernel_size ) kwargs ( . OOV : = filter_sk_params ) 
'. ( OOV 
 ' ', 
 OOV ' OOV OOV ', , 
 
 ) OOV , OOV 
 OOV 
 ( OOV OOV OOV : OOV : OOV 
 OOV OOV ( ( OOV OOV ' 
'OOV . ' , ( OOV ) ( , OOV ( , in OOV '= , . 
 ** ) OOV = ' ( : ( ( 
: OOV ) , 'OOV ) ( kwargs 
 ' OOV 
OOV = OOV args OOV ( : OOV ( ( ( = ( ( OOV = OOV def OOV 
) OOV ) OOV OOV OOV OOV ', 
 ( OOV OOV , self ( ' ( OOV , , , OOV OOV 'OOV 
 OOV 
 OOV ) OOV : ' OOV '' OOV 
        ( ( OOV ) OOV = None ) , , 
            ) OOV OOV ( , ) OOV [ = def = OOV , . ( = OOV OOV 
            ( OOV OOV 
            , 
            
            ) ( ) = 
            
            
            return OOV OOV ( OOV OOV OOV ) ', . ] ' 
            ) '. = , OOV 
 
 OOV OOV : 
 
 
 ' 
 

In [0]:
m.model = load_model('best_model')
m.train(x, y, 20, 64)

Epoch 1/20