In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

from keras.models import Model
from keras.layers import *
from keras import layers, constraints, callbacks
from keras.preprocessing import sequence
from keras import optimizers, utils
from keras import backend as K

from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.sql import Row

import numpy as np

import tensorflow as tf

import random
import nltk

Using TensorFlow backend.


In [2]:
import os
from pathlib import Path
fasttext_file = os.path.join(os.getcwd(), 'crawl-300d-2M-subword.bin')
if not Path(fasttext_file).exists():
  !gsutil cp gs://dsx_embeddings/crawl-300d-2M-subword.bin .

In [3]:
from gensim.models import FastText
from gensim.models.keyedvectors import Vocab

class FTLoader(FastText):
  def FTLoader():
    super(self, FastText).__init__()

  def _load_dict(self, file_handle, encoding='utf8'):
      """Load a previously saved dictionary from disk, stored in Facebook's native fasttext format.

      Parameters
      ----------
      file_handle : file-like object
          The opened file handle to the persisted dictionary.
      encoding : str
          Specifies the encoding.

      """
      vocab_size, nwords, nlabels = self.struct_unpack(file_handle, '@3i')
      # Vocab stored by [Dictionary::save](https://github.com/facebookresearch/fastText/blob/master/src/dictionary.cc)
      if nlabels > 0:
          raise NotImplementedError("Supervised fastText models are not supported")

      self.struct_unpack(file_handle, '@1q')  # number of tokens
      if self.new_format:
        pruneidx_size, = self.struct_unpack(file_handle, '@q')
        for i in range(vocab_size):
            word_bytes = b''
            char_byte = file_handle.read(1)
            # Read vocab word
            while char_byte != b'\x00':
                word_bytes += char_byte
                char_byte = file_handle.read(1)
            word = word_bytes.decode(encoding, errors='ignore')
            count, _ = self.struct_unpack(file_handle, '@qb')

            self.wv.vocab[word] = Vocab(index=i, count=count)
            self.wv.index2word.append(word)

        assert len(self.wv.vocab) == nwords, (
            'mismatch between final vocab size ({} words), '
            'and expected number of words ({} words)'.format(len(self.wv.vocab), nwords))
        
      if self.new_format:
        for j in range(pruneidx_size):
            self.struct_unpack(file_handle, '@2i')
 
  def load_binary_data(self, encoding='utf8'):
    """Load data from a binary file created by Facebook's native FastText.

    Parameters
    ----------
    encoding : str, optional
        Specifies the encoding.

    """

    # TODO use smart_open again when https://github.com/RaRe-Technologies/smart_open/issues/207 will be fixed
    with open(self.file_name, 'rb') as f:
        self._load_model_params(f)
        self._load_dict(f, encoding='utf-8')
        self._load_vectors(f)
            
fasttext_model = FTLoader.load_fasttext_format(fasttext_file)

In [80]:
spark.read.load('gs://project_joyce_scribo/phrases').withColumn('label', F.lit(0)) \
              .union(spark.read.load('gs://project_joyce/phrases').withColumn('label', F.lit(1))) \
                .repartition(50) \
                .write.mode('overwrite').parquet('gs://project_joyce/combined_tokens')

In [4]:
tokens = spark.read.load('gs://project_joyce/combined_tokens') \
        .withColumn('token', F.explode(F.col('phrase'))) \
        .withColumn('token', F.udf(lambda x : x.lower(), StringType())(F.col('token'))) \
        .select(['token']).distinct().collect()

In [5]:
word_and_vectors = []
for token in tokens:
    try:
        word_and_vectors.append((token.token, fasttext_model.wv[token.token]))
    except KeyError:
        pass

In [8]:
# word_index = 0 is the zero vector
sorted_vectors = sorted(word_and_vectors, key=lambda x : x[0])
word_index = dict([(v[0], i + 1) for (i, v) in enumerate(sorted_vectors)])
word_matrix = np.vstack([np.zeros(sorted_vectors[0][1].size)] + [v[1] for v in sorted_vectors])

In [10]:
word_index_bc = sc.broadcast(word_index)

max_length = spark.read.load('gs://project_joyce/combined_tokens').agg(F.max(F.size(F.col('phrase')))).collect()[0]
def index_tokens(tokens):
  import numpy as np
  vector = np.zeros(max_length)
  for index, token in enumerate(tokens):
    if token.lower() in word_index_bc.value:
      vector[index] = word_index_bc.value[token.lower()]
    else:
      vector[index] = 0
    
  return DenseVector(vector)

indexed = spark.read.load('gs://project_joyce/combined_tokens').repartition(50) \
    .withColumn('indices', F.udf(index_tokens, VectorUDT())(F.col('phrase'))) \
    .select(['label', 'indices', 'source']) \
    .write.mode('overwrite').parquet('gs://project_joyce/indexed')

In [11]:
train_works, test_works = spark.read.load('gs://project_joyce/indexed').drop_duplicates(['source']) \
                    .select(['source']).randomSplit([0.9, 0.1])
train_set = train_works.join(spark.read.load('gs://project_joyce/indexed'), on='source')
test_set = test_works.join(spark.read.load('gs://project_joyce/indexed'), on='source')
train_set_size = train_set.count()

In [73]:
train_set, test_set = spark.read.load('gs://project_joyce/indexed').randomSplit([0.9, 0.1])
train_set_size = train_set.count()

In [12]:
latent_dim = 256
batch_size = 4096

In [13]:
def combine_vectors(row_iter):
  labels = []
  indices = []
  for row in row_iter:
    labels.append(row.label)
    indices.append(row.indices)
  return (np.hstack(labels), np.vstack(indices))

In [14]:
train = train_set.repartition(200).rdd.mapPartitions(combine_vectors).collect()
train_matrix = np.vstack([elem for index, elem in enumerate(train) if index % 2 == 1])
train_labels = np.hstack([elem for index, elem in enumerate(train) if index % 2 == 0])

In [15]:
valid = test_set.repartition(200).rdd.mapPartitions(combine_vectors).collect()
valid_matrix = np.vstack([elem for index, elem in enumerate(valid) if index % 2 == 1])
valid_labels = np.hstack([elem for index, elem in enumerate(valid) if index % 2 == 0])

In [75]:
K.clear_session()

inp = Input((None, ))
encoder_embedding = Embedding(word_matrix.shape[0], word_matrix.shape[1], trainable=False)(inp)
lstm = CuDNNGRU(latent_dim, return_sequences=True)(SpatialDropout1D(0.5)(encoder_embedding))
lstm_2 = CuDNNGRU(int(latent_dim / 2), return_sequences=True)(lstm)
conv = Conv1D(64, kernel_size=3)(lstm_2)

pools = GlobalMaxPooling1D()(conv)

dense_1 = Dense(128, activation='relu')(pools)
dense_2 = Dense(64, activation='relu')(dense_1)
output = Dense(1, activation='sigmoid', use_bias=False)(dense_2)
train_model = Model(inputs=[inp], outputs=[output])

In [None]:
early_stop = callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=5, verbose=0, mode='auto')
save_model = callbacks.ModelCheckpoint('model_3.h5', period = 1, save_best_only=True, verbose=1)

# Compile & run training
train_model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])
#training_model.compile(optimizer='rmsprop', loss='mean_squared_error')

#generator = yield_examples(1000000, batch_size=batch_size)
with tf.device('/gpu:0'):
    train_model.fit(train_matrix, train_labels,
                    batch_size=batch_size,
                    callbacks = [early_stop, save_model],
                    shuffle=True,
                    validation_data = (valid_matrix, valid_labels),
                    epochs=1000)

Train on 1544842 samples, validate on 268735 samples
Epoch 1/1000


In [56]:
#train_model.load_weights('model_3.h5')

def adjusted_tokenize(text):
    import nltk
    nltk.download('punkt', '.')
    tokens = []
    for token in nltk.word_tokenize(text):
      if token == '\'\'':
        tokens.append('"')
      elif token == '``':
        tokens.append('"')
      else:
        tokens.append(token)
    return tokens
  
def custom_example(model, phrase):
  return model.predict(index_tokens(adjusted_tokenize(phrase)).toArray().reshape(1, -1))

In [57]:
def random_spanner(model, full_text, low=50, high=80, per_word=5):
  tokenized = adjusted_tokenize(full_text)
  indices = []
  vectors = []
  lengths = []
  for index, token in enumerate(tokenized):
    for iter in range(per_word):
      length = random.randint(low, high)
      vectors.append(index_tokens(tokenized[index : min(index + length, len(tokenized))]).toArray())
      indices.append(index)
      lengths.append(length)
  
  from collections import defaultdict
  word_scores = defaultdict(list)
  predictions = model.predict(np.vstack(vectors), batch_size=2048)
  
  for index, length, prediction in zip(indices, lengths, predictions):
    for i in range(length):
      word_scores[index + i].append(max(prediction, 0))
      
  final_scores = [np.mean(word_score_set) for word_score_set in word_scores.values()]
  return tokenized, final_scores

In [58]:
def color_hex(r, g, b):
  return '%02x%02x%02x' % (int(r), int(g), int(b))

def color_quality_text(model, text):
  tokens, scores = random_spanner(model, text)
  from IPython.core.display import display, HTML
  html_text = ""
  for token, score in zip(tokens, scores):
    html_text += "<i style=\"color:#" + color_hex((1 - score) * 255, score * 255, 0) + "\">" + token +" </i>"
  display(HTML(html_text))

In [59]:
with open('housekeeping_marilynne.txt', 'r') as f:
  color_quality_text(train_model, '\n'.join(f.readlines()))


[nltk_data] Downloading package punkt to ....
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [72]:
color_quality_text(train_model, 'where it grew exactly as the nap stands out on folded plush . Every tree bore bright fruit ,\
and showy birds nested in the boughs , and every fruit and bird was plumb with the warp in the earth . Oversized beasts , \
spotted and striped , could be seen running unimpeded up the right side and unhastened down the left . Whether the genius of \
this painting was ignorance or fancy I never could decide . One spring my grandfather quit his subterraneous house , walked to \
the railroad , and took a train west . He told the ticket agent that he wanted to go to the mountains , and the man arranged to\
have him put off here ')

[nltk_data] Downloading package punkt to ....
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [71]:
color_quality_text(train_model, 'where it grew exactly as the nap stands out on folded plush . Every tree bore bright fruit ,\
and showy birds nested in the boughs , and every fruit and bird was plumb with the warp in the earth . Oversized beasts , \
spotted and striped , could be seen running unimpeded up the right side and unhastened down the left . I could never decide \
whether the genius of this painting was ignorance or fancy but one spring, my grandfather left his subterraneous house , walked to\
the railroad , and took a train west . He told the ticket agent that he wanted to go to the mountains , and the man arranged to\
have him put off here ')

[nltk_data] Downloading package punkt to ....
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [53]:
with open('twilight_meyers.txt', 'r') as f:
  color_quality_text(train_model, '\n'.join(f.readlines()))


[nltk_data] Downloading package punkt to ....
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [50]:
with open('the_firm_grisham.txt', 'r') as f:
  color_quality_text(train_model, '\n'.join(f.readlines()))


[nltk_data] Downloading package punkt to ....
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [51]:
with open('murder_orient_agatha_christie.txt', 'r') as f:
  color_quality_text(train_model, '\n'.join(f.readlines()))


[nltk_data] Downloading package punkt to ....
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [52]:
with open('the_bear_faulkner.txt', 'r') as f:
  color_quality_text(train_model, '\n'.join(f.readlines()))


[nltk_data] Downloading package punkt to ....
[nltk_data]   Unzipping tokenizers/punkt.zip.
