In [1]:
import sys
assert sys.version_info >= (3, 5)

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

#if not tf.config.list_physical_devices('GPU'):
#    print("No GPU was detected. LSTMs and CNNs can be very slow without a GPU.")
#    if IS_COLAB:
#        print("Go to Runtime > Change runtime and select a GPU hardware accelerator.")

# Common imports
import numpy as np
import os

from psutil import virtual_memory

from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout, Embedding, Input
from keras.preprocessing.text import Tokenizer

from tensorflow.keras.optimizers import Adam

import re
from keras.utils.np_utils import to_categorical

#from gensim.models import Word2Vec

# to make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

Using TensorFlow backend.


In [2]:
%tensorflow_version 2.x
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
    print('GPU device not found')
else:
    print('Found GPU at: {}'.format(device_name))

gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
    print('Select the Runtime > "Change runtime type" menu to enable a GPU accelerator, ')
    print('and then re-execute this cell.')
else:
    print(gpu_info)

ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
    print('To enable a high-RAM runtime, select the Runtime > "Change runtime type"')
    print('menu, and then select High-RAM in the Runtime shape dropdown. Then, ')
    print('re-execute this cell.')
else:
    print('You are using a high-RAM runtime!')

try: # detect TPUs
    # detect and init the TPU
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)

    # instantiate a distribution strategy
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except ValueError: # detect GPUs
    #strategy = tf.distribute.MirroredStrategy() # for GPU or multi-GPU machines
    strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU
    #strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() # for clusters of multi-GPU machines

AUTO = tf.data.experimental.AUTOTUNE
print("Number of accelerators: ", strategy.num_replicas_in_sync)

Found GPU at: /device:GPU:0
Wed Jul 15 19:37:40 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.51.05    Driver Version: 418.67       CUDA Version: 10.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   42C    P0    34W / 250W |    353MiB / 16280MiB |      0%      Default |
|                               |                      |                 ERR! |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------

In [3]:
!tar -xzf RomanticsTexts.tar.gz

# Text preprocess

In [4]:
# Text cleaning functions:
def deRomanNumeral(words): #note: doesn't remove roman numeral I, as that is used often...
    pattern = '^(cm?|cd?|d?c?c?c?)(xc?|xl?|l?x?x?x?)(ix|iv|v?i?ii)$'
    words = [ w for w in words if re.search(pattern,w) is not None ]
    return words

# Text cleaning functions:
def checkRomanNumeral(word): #note: doesn't remove roman numeral I, as that is used often...
    if word == 'i':
        return None
    pattern = '^(cm?|cd?|d?c?c?c?)(xc?|xl?|l?x?x?x?)(m|d|c|x|v|l|ix|iv|v?i?i?i)$'
    return re.search(pattern,word)

In [5]:
def newlinepartition(text):
    # cut up a single entry
    septext = text.split("\n\n\n")
    if len(septext) == 2 and len(septext[0]) < 200:
        return septext[1]
    if len(septext) > 2:
        newtext = []
        for tt in septext:
            if len(tt) < 100:
                pass
            elif tt[:9]=='FOOTNOTES':
                print("removed footnotes")
                pass
            elif tt[:9]=='LINENOTES':
                print("removed linenotes")
                pass
            elif tt[:9]=='NOTE':
                print("removed note")
                pass
            else:
                newtext.append(tt)
        if(len(newtext)>1):
            print('check this text')
        return "\n\n".join(newtext)
    if len(septext) < 2:
        print("assumed empty")
        return None
    
def cleanup_text_A(text): # Keats # Coleridge
    words = text.replace("\n\n","\n")
    words = words.replace("\n"," \n ")
    words = words.replace("-"," ")
    words = words.replace("."," . ")
    words = words.replace("!"," ! ")
    words = words.replace("?"," ? ")
    words = words.split(" ")
    table = str.maketrans('', '', '…”_-.,;!:?*/()[]{}0123456789"')
    stripped = [w.translate(table).lower() for w in words]
    stripped = [w for w in stripped if not w=='' and not checkRomanNumeral(w)]
    cleanedtext = " ".join(stripped)
    cleanedtext = cleanedtext.replace("\n part \n","\n")
    cleanedtext = cleanedtext.replace("part i \n","")
    cleanedtext = cleanedtext.replace("\n book \n","\n")
    cleanedtext = cleanedtext.replace("book i \n","")
    cleanedtext = cleanedtext.replace("\n scene \n","\n")
    cleanedtext = cleanedtext.replace("scene i \n","")
    cleanedtext = cleanedtext.replace("\n \n","\n")
    cleanedtext = cleanedtext.replace("\n note \n", 'TERMINATE')
    cleanedtext = cleanedtext.replace("\n notes \n", 'TERMINATE')
    cleanedtext = cleanedtext.replace("\n footnote \n", 'TERMINATE')
    cleanedtext = cleanedtext.replace("\n footnotes \n", 'TERMINATE')
    cleanedtext = cleanedtext.split('TERMINATE')[0]
    cleanedtext = re.sub("(\n |)verse (st|nd|rd|th) \n","\n", cleanedtext)
    cleanedtext = cleanedtext.lstrip("i ")
    cleanedtext = cleanedtext.split(" ed \n")[-1] # remove editors notes in wordsworth (may be dangerous)
    cleanedtext = cleanedtext.replace("composed published ","")
    cleanedtext = cleanedtext.lstrip("\n")
    return cleanedtext

def cleanup_text_B(text): 
    words = text.replace("\n\n","\n")
    words = words.replace("\n"," \n ")
    words = words.replace("-"," ")
    words = words.replace("."," . ")
    words = words.replace("!"," ! ")
    words = words.replace("?"," ? ")
    words = words.split(" ")
    table = str.maketrans('', '', '…”_-.,;!:?*/()[]{}0123456789"')
    stripped = [w.translate(table).lower() for w in words]
    stripped = [w for w in stripped if not w=='' and not checkRomanNumeral(w)]
    cleanedtext = " ".join(stripped)
    cleanedtext = cleanedtext.replace("\n part \n","\n")
    cleanedtext = cleanedtext.replace("part i \n","")
    cleanedtext = cleanedtext.replace("\n book \n","\n")
    cleanedtext = cleanedtext.replace("book i \n","")
    cleanedtext = cleanedtext.replace("\n scene \n","\n")
    cleanedtext = cleanedtext.replace("scene i \n","")
    cleanedtext = cleanedtext.replace("\n \n","\n")
    cleanedtext = re.sub("(\n |)verse (st|nd|rd|th) \n","\n", cleanedtext)
    cleanedtext = cleanedtext.lstrip("i ")
    cleanedtext = cleanedtext.lstrip("\n")
    return cleanedtext
    
def split_into_works(maintext,iStart,iEnd, type='A'):
    # first identify where legal / forewardend etc, set these to iStart and iEnd (with a +!)
    textlist = []
    for i in range(iStart,iEnd):
        print(i)
        text = newlinepartition(maintext[i])
        if text is not None: 
            if type == 'A' or type == '-':
                textlist.append(cleanup_text_A(text))
            elif type == 'B':
                textlist.append(cleanup_text_B(text))
    return textlist

def split_works_into_words(works):
    return [l.split(" ") for l in works ]

## Keats Embedding

In [8]:
filedictionary = {'Keats_1.txt': {'zones': 2, 'len': [8, 22], 'cleanup': 'A'},
                  'Keats_2.txt':  {'zones': 2, 'len': [5, 37],'cleanup': 'A'},
                  'Keats_3.txt': {'zones': 2, 'len': [6, 10], 'cleanup': 'A'},
                 }

In [11]:
keatsfilelist = [ file for file in os.listdir() if file[-4:]=='.txt' if file[:5]=='Keats']

In [12]:
def read_and_process_file(idx,filelist=keatsfilelist):
    filename = filelist[idx]
    file = open(filename, 'rt')
    text = file.read()
    file.close()
    adict = filedictionary[filename]
    # split into words by white space
    Zones = text.split("***")
    lens = adict['len']
    maintext = []
    if adict['zones'] == 'asmain':
        maintext = Zones
    else:
        maintext = Zones[adict['zones']].split("\n\n\n\n\n")
    maintext = split_into_works(maintext,lens[0],lens[1],type=adict['cleanup'])
    return maintext

In [None]:
workslist = []
for i in range(len(keatsfilelist)): 
    workslist+= read_and_process_file(i,keatsfilelist)

In [59]:
# set \n to "newln" so that new lines are recorded
def convert_newline(text, tagword = "newln"):
    text = text.replace('\n',tagword)
    return text

def convert_newlines(texts, tagword = "newln"):
    newtexts = list()
    for text in texts:
        newtexts.append(convert_newline(text,tagword=tagword))
    return newtexts

tokenizer = Tokenizer(char_level = False)
workslist_t = convert_newlines(workslist)
tokenizer.fit_on_texts(workslist_t)
total_words = len(tokenizer.word_index)+1 # +! for 0 offset
token_texts = tokenizer.texts_to_sequences(workslist_t)

In [157]:
SEQ_LENGTH = 30

def gen_sequences_text(token_text,step=1,lseq=SEQ_LENGTH):
    x, y = list(), list()
    for i in range(0, len(token_text) - lseq,step):
        x.append(token_text[i:i+lseq])
        y.append(token_text[i+lseq])
    return x, y

def gen_sequences(token_texts, step = 1,lseq=SEQ_LENGTH):
    x, y = list(), list()
    for text in token_texts:
        xt, yt = gen_sequences_text(text,step,lseq)
        x += xt
        y += yt
    # use sparse representation
    y = np.array(y)
    return np.array(x), np.array(y) 


In [158]:
x, y = gen_sequences(token_texts)

In [None]:
keatsEmbedded = load_model("keats_embedded_gen.h5")

In [161]:
nLSTM = 256
dEmbedding = 100

keatsEmbedded = Sequential([Input(shape = (None,)),
                            Embedding(total_words, dEmbedding),
                            LSTM(nLSTM, return_sequences=True),
                            Dropout(0.3),
                            LSTM(nLSTM),
                            Dropout(0.3),
                            Dense(total_words,activation='softmax')
                            ])
keatsEmbedded.compile(loss='sparse_categorical_crossentropy',optimizer='rmsprop',metrics=['acc'])

In [None]:
epochs = 1000
batch_size = 256

history = keatsEmbedded.fit(x, y, epochs = epochs, batch_size = batch_size, shuffle = True)

In [197]:
# this allows to smooth out a distrubtion and equalize probabilities ()
def reweight_distribution(orig, temp=1.,eps=1e-8):
    new = np.exp(np.log(np.abs(orig)+eps) / temp)
    return new / np.sum(new)

def sample(preds, temp = 1.0):
    preds = np.asarray(preds).astype('float64')
    preds = reweight_distribution(preds, temp)
    probs = np.random.multinomial(1,preds,1)
    return np.argmax(probs)

# start poem at least SEQ_LENGTH out from seed, which seems to minimize the connection to the seed
# even with a fairly low temp, the results are pretty random
def generate_poem(seed, model, temp=0.5, stoplen=100, trainlen = SEQ_LENGTH, tagword = "newln",printseed = False):
    #print('begin poem')
    outputtext = ' '
    if type(seed[0])==str:
        token_list_total = tokenizer.texts_to_sequences([seed])[0]
    else:
        token_list_total = seed
    if printseed:
        print('begin poem from seed: ' + " ".join([tokenizer.index_word[i] for i in token_list_total] ))

    startrecord = False
    running = True
    i, k = 0, 0 
    while running:
        token_list = token_list_total[-trainlen:]
        token_list = np.reshape(token_list, (1,trainlen))

        probs = model.predict(token_list,verbose=0)[0]
        yind = sample(probs,temp = temp)
        #print(str(yind)+": "+str(probs))
        nextword = tokenizer.index_word[yind] if yind>0 else ''

        k+=1
        if nextword == tagword:
            nextword = "\n"
        #sys.stdout.write( nextword + " ")

        if startrecord:
            outputtext += nextword + ' '
            i+=1

        if nextword == '\n' and k > trainlen:
            if not startrecord:
                #print('begin poem')
                startrecord = True
            if i > stoplen:
                running = False  

        token_list_total = np.concatenate((token_list_total,[yind]))

    return outputtext

In [230]:
print(generate_poem(x[np.random.randint(len(x))], keatsEmbedded, temp=0.3))

 there stood a dote from an hollow noise 
 about the gentle air she seem'd in ease 
 sweet days and horrid on born moment stood 
 o think all that a sudden fear 
 its poor snake all high began and share 
 the convuls'd began with rich same ease 
 before by faint his little blue while far 
 each incense coolness with all a trembling plains 
 which every hast did out a pleasant tongue 
 divine of summer tongue whose lucid snowy 
 they gazed from each flowers for heaven again 
 a same eyed splendour as the last green 
 


In [189]:
# check passages of the poem that seem "too good" to make sure they aren't just quotes
# (note: hasn;t come up yet)
def come_original(phrase, workslist):
    print("checking")
    notfound = True
    for work in workslist:
        k = work.find(phrase)
        if k > 0: 
            notfound = False
            print(work[k: k + 500]) 
    if notfound:
        print("checking")

checking


In [196]:
keatsEmbedded.save("keats_embedded_gen.h5")