In [146]:
import numpy as np
from sklearn.model_selection import train_test_split

In [147]:
punc = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''

def remove_punc(l):
    split_l = l.split()
    new_l = []
    for word in split_l:
        no_punc = ''
        for char in word:
            if char not in punc:
                no_punc += char
        new_l.append(no_punc.lower())
    return new_l

def poem_to_list(txt_file):
    '''train test split + split lines from input'''
    saved_lines = []
    with open(txt_file, 'r') as f:
        lines = f.readlines()
        for l in lines:
            # check that line has data
            if len(l.strip()) > 0:
                # split and remove punctuation
                split_l = remove_punc(l)
                saved_lines.append(split_l)
    return saved_lines

In [148]:
poe_lines = poem_to_list('input/edgar_allan_poe.txt')
frost_lines = poem_to_list('input/robert_frost.txt')

poe_labels = ['poe' for line in poe_lines]
frost_labels = ['frost' for line in frost_lines]

X = poe_lines + frost_lines
y = poe_labels + frost_labels

In [149]:
X_train, X_test, y_train, y_test = train_test_split(
...     X, y, test_size=0.2)

In [150]:
print(X_train[509], y_train[509])

['of', 'which', 'it', 'doth', 'now', 'know'] poe


In [151]:
def build_word_mapping(train, test):
    word2int = {}
    word_freq = {}
    idx = 0

    # create unique int index for each word in train
    for line in train:
        for word in line:
            if word not in word2int:
                word2int[word] = idx
                word_freq[word] = 1
                idx += 1
            else:
                word_freq[word] += 1
    
    # use same index for all words in train not in test
    idx += 1
    for line in test:
        for word in line:
            if word not in word2int:
                word2int[word] = idx
                word_freq[word] = 1
            else:
                word_freq[word] += 1

    return word2int, word_freq

word2int, word_freq = build_word_mapping(X_train, X_test)

In [152]:
def words_to_ints(X_data):
    new_X = []
    for line in X_data:
        int_line = [word2int[word] for word in line]
        new_X.append(int_line)
    
    return new_X

X_train_int = words_to_ints(X_train)
X_test_int = words_to_ints(X_test)

In [153]:
def build_markov_model(X_train_data):

    # initialize state transition matrix
    # size = num of unique words in train + extra token for non-train words
    A_shape = max(word2int.values()) + 1
    A = np.ones((A_shape, A_shape))

    # initialize initial state distribution, vector of len A_shape
    pi = np.ones((A_shape, 1))

    # loop through all words and add to both transition matrix & initial dist.
    for line in X_train_data:
        for idx in range(len(line) - 1):
            curr_word = line[idx]
            next_word = line[idx + 1]
            A[curr_word, next_word] += 1

            if idx == 0:
                pi[curr_word] += 1

    # get probabilities for initial state by dividing counts by total num seqs
    # Denom needs N (num sequences in dataset, aka len(X_train)??) + M (possible words aka A_shape)
    pi /= (len(X_train_data) + A_shape)

    # divide each count by total appearances of first word
    # (in the transition matrix)
    # the -2 weirdness in the loop accounts for the "token" word, i.e. a bucket for words only in the test set
    int2word = {v: k for k, v in word2int.items()}
    for r in range(A_shape - 2):
        word = int2word[r]
        A[r] /= (word_freq[word] + A_shape)
        
    # convert to log probabilities
    pi = np.log(pi)
    A = np.log(A)

    return pi, A

In [154]:
# separate out X_train_int for frost v poe? to build separate MMs?

poe_train_data = []
frost_train_data = []

for idx, line in enumerate(X_train_int):
    if y_train[idx] == 'poe':
        poe_train_data.append(line)
    elif y_train[idx] == 'frost':
        frost_train_data.append(line)
    else:
        print('wtf')
        break

In [155]:
poe_pi, poe_A = build_markov_model(poe_train_data)
frost_pi, frost_A = build_markov_model(frost_train_data)

print(poe_A.shape, frost_A.shape)

(2639, 2639) (2639, 2639)


In [156]:
def prob_from_mm(line, pi, A):
    '''take line of ints from X_test_ints plus the markov model A/pi matrices
    return a probability'''

    prob_sum = 0
    
    for idx in range(len(line) - 1):
        curr_word = line[idx]
        next_word = line[idx + 1]
        if idx == 0:
            prob_sum += pi[curr_word]
        else:
            prob_sum += A[curr_word, next_word]
    
    return prob_sum[0]

In [157]:
def get_priors(y_train, author):
    auth_count = sum(y == author for y in y_train)
    return np.log(auth_count / len(y_train))

def calc_accuracy_with_priors(X_data, labels, poe_prior, frost_prior):
    num_correct = 0.
    for idx, line in enumerate(X_data):
        if len(line) > 1:
            poe_prob = prob_from_mm(line, poe_pi, poe_A)
            frost_prob = prob_from_mm(line, frost_pi, frost_A)

            guess = 'poe' if poe_prob > frost_prob else 'frost'
            answer = labels[idx]

            if guess == answer:
                num_correct += 1.
    
    return num_correct / float(len(labels))


In [158]:
poe_prior = get_priors(y_train, 'poe')
frost_prior = get_priors(y_train, 'frost')

print(calc_accuracy_with_priors(X_train_int, y_train, poe_prior, frost_prior))
print(calc_accuracy_with_priors(X_test_int, y_test, poe_prior, frost_prior))

0.9912942542077772
0.8167053364269141


In [159]:
def get_confusion_matrix(X_data, labels):
    # matrix shape:
    ###         guessP  guessF
    # actualP   [    ]  [    ]
    # actualF   [    ]  [    ]

    cm = np.zeros((2,2))

    for idx, line in enumerate(X_data):
        # prevent case where it's only one word in the line.
        if len(line) > 1:
            answer = 0 if labels[idx] == 'poe' else 1
            poe_prob = prob_from_mm(line, poe_pi, poe_A) + poe_prior
            frost_prob = prob_from_mm(line, frost_pi, frost_A) + frost_prior
            guess = 0 if poe_prob > frost_prob else 1
            
            cm[answer, guess] += 1
    
    return cm

In [160]:
cm_train = get_confusion_matrix(X_train_int, y_train)
cm_test = get_confusion_matrix(X_test_int, y_test)

print(cm_train)
print(cm_test)

[[ 566.   21.]
 [   0. 1129.]]
[[ 69.  61.]
 [  5. 296.]]


In [161]:
def f1_score_from_cm(cm):
    precision = cm[1,1] / (cm[1,1] + cm[0,1])
    recall = cm[1,1] / (cm[1,1] + cm[1,0])

    return 2 * ((precision * recall) / (precision + recall))

print(f'F1 score for train: {f1_score_from_cm(cm_train)}')
print(f'F1 score for test: {f1_score_from_cm(cm_test)}')


F1 score for train: 0.9907854322071084
F1 score for test: 0.8996960486322189
