In [None]:
fn = "/data/pan15-authorship-verification-test-dataset2-english-2015-04-19/truth.txt"


In [56]:
# Helper functions for preprocessing raw text before feeding it into a Neural Net
import string
import numpy as np

# Map printable characters to ints and vice-versa
ALPHABET = string.printable

char2int = dict((c, i) for i, c in enumerate(ALPHABET))
int2char = dict((i, c) for i, c in enumerate(ALPHABET))


def c2i(char):
    return char2int[char]

def i2c(num):
    return int2char[num]


def vectorize_text(text):
    """Convert a text into integers"""
    X = []
    text = ''.join(list(filter(lambda x: x in ALPHABET, text)))
    X = [c2i(char) for char in text]
    X = np.array(X)
    X = X / float(len(ALPHABET))
    return X

def vectorize_text(text, seq_length=100, constant_length=1200):
    """Convert a text into integers"""
    X = []
    if len(text) < constant_length:
        print("text too short {} < {}".format(len(text), constant_length))
    text = ''.join(list(filter(lambda x: x in ALPHABET, text)))[:1200]
    nchars = len(text)
    for i in range(0, nchars - seq_length, 1):
        seq_in = text[i:i + seq_length]
        X.append([c2i(char) for char in seq_in])
    return X

In [57]:
print(np.array(vectorize_text_seq("a"*2000)).shape)

(1100, 100)


In [58]:
import os

class TextPair:
    def __init__(self, author, known, unknown, max_length=1200):
        self.author = author
        self.known = known
        self.unknown = unknown
        self.max_length = max_length

def get_string(filename):
    with open(filename, encoding="utf8") as f:
        s = f.read()
    return s

def get_texts(directory):
    authors = [x for x in os.listdir(directory) if x.startswith("EN")]
    tps = []
    for author in authors:
        known = os.path.join(directory, author, "known01.txt")
        unknown = os.path.join(directory, author, "unknown.txt")
        tps.append(TextPair(author, get_string(known), get_string(unknown)))
    return tps

def get_data(directory):
    
    # read all texts into known, unknown pairs
    tps = get_texts(directory)
    
    # get labels
    truthfile = os.path.join(directory, "truth.txt")
    with open(truthfile) as f:
        lines = f.read().strip().split("\n")
    y = [1 if line.split()[1] == "Y" else 0 for line in lines]
    y = np.array(y)
    
    # transform texts into vectors
    knownX = [vectorize_text(tp.known) for tp in tps]
    unknownX = [vectorize_text(tp.unknown) for tp in tps]
    
    # create pairs
    pairs = []
    for i in range(len(knownX)):
        pairs += [[knownX[i], unknownX[i]]]
    pairs = np.array(pairs)
    print(pairs.shape)
    return pairs, y


In [36]:
pan15train = "/data/pan15-authorship-verification-training-dataset-english-2015-04-19/"
pan15test = "/data/pan15-authorship-verification-test-dataset2-english-2015-04-19/"
tr_pairs, tr_y = get_data(pan15train)
te_pairs, te_y = get_data(pan15test)

(100, 2, 1100, 100)
(500, 2, 1100, 100)


(100, 2, 1100, 100)

In [59]:
from __future__ import absolute_import
from __future__ import print_function
import numpy as np
np.random.seed(1337)  # for reproducibility

import random
from keras.datasets import mnist
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Input, Lambda, LSTM
from keras.optimizers import RMSprop
from keras import backend as K


def euclidean_distance(vects):
    x, y = vects
    return K.sqrt(K.sum(K.square(x - y), axis=1, keepdims=True))


def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)


def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    return K.mean(y_true * K.square(y_pred) + (1 - y_true) * K.square(K.maximum(margin - y_pred, 0)))


def create_pairs(x, digit_indices):
    '''Positive and negative pair creation.
    Alternates between positive and negative pairs.
    '''
    pairs = []
    labels = []
    n = min([len(digit_indices[d]) for d in range(10)]) - 1
    for d in range(10):
        for i in range(n):
            z1, z2 = digit_indices[d][i], digit_indices[d][i + 1]
            pairs += [[x[z1], x[z2]]]
            inc = random.randrange(1, 10)
            dn = (d + inc) % 10
            z1, z2 = digit_indices[d][i], digit_indices[dn][i]
            pairs += [[x[z1], x[z2]]]
            labels += [1, 0]
    return np.array(pairs), np.array(labels)


def create_base_network(input_dim):
    '''Base network to be shared (eq. to feature extraction).
    '''
    seq = Sequential()
    layers = [LSTM(256, input_shape=(input_dim), return_sequences=True),LSTM(256)]
    for layer in layers:
        layer.trainable = False
        seq.add(layer)
    seq.add(Dropout(0.3))
    seq.add(Dense(len(ALPHABET), activation='softmax'))
    return seq


def get_basic_model(seq_length=100, features=1, freeze_feature_layers=False):

    feature_layers = [LSTM(256, input_shape=(seq_length, features), return_sequences=True),
                      LSTM(256),
                      Dropout(0.4)]

    classification_layers = [Dense(len(ALPHABET), activation='softmax')]
    if freeze_feature_layers:
        for layer in feature_layers:
            layer.trainable = False

    model = Sequential(feature_layers + classification_layers)
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    return model             

def compute_accuracy(predictions, labels):
    '''Compute classification accuracy with a fixed threshold on distances.
    '''
    return labels[predictions.ravel() < 0.5].mean()

In [70]:
# network definition
print(input_dim)
base_network = create_base_network(input_dim)
# base_network.load_weights("../charlm/server_weights/generic_wonderland.h5")

(1100, 100)


In [3]:
input_dim = (tr_pairs.shape[-2], tr_pairs.shape[-1])

# network definition
base_network = create_base_network(input_dim)
# base_network.load_weights("../charlm/server_weights/generic_wonderland.h5")

input_a = Input(shape=(input_dim))
input_b = Input(shape=(input_dim))

# because we re-use the same instance `base_network`,
# the weights of the network
# will be shared across the two branches
processed_a = base_network(input_a)
processed_b = base_network(input_b)

distance = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([processed_a, processed_b])

model = Model(input=[input_a, input_b], output=distance)
# model.


NameError: name 'tr_pairs' is not defined

In [65]:
model.layers

[<keras.engine.topology.InputLayer at 0x16c9e6e10>,
 <keras.engine.topology.InputLayer at 0x174300c50>,
 <keras.models.Sequential at 0x16c9e6dd8>,
 <keras.layers.core.Lambda at 0x174565748>]

In [72]:
# train
rms = RMSprop()
model.compile(loss=contrastive_loss, optimizer=rms)
model.fit([tr_pairs[:, 0], tr_pairs[:, 1]], tr_y,
          validation_split=0.2,
          batch_size=256,
          nb_epoch=5)



Train on 80 samples, validate on 20 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x17d2b5710>

In [2]:
# compute final accuracy on training and test sets
pred = model.predict([tr_pairs[:, 0], tr_pairs[:, 1]])
tr_acc = compute_accuracy(pred, tr_y)
pred = model.predict([te_pairs[:, 0], te_pairs[:, 1]])
te_acc = compute_accuracy(pred, te_y)

print('* Accuracy on training set: %0.2f%%' % (100 * tr_acc))
print('* Accuracy on test set: %0.2f%%' % (100 * te_acc))

NameError: name 'model' is not defined