In [7]:
import numpy as np
import torch
import matplotlib.pyplot as plt
import seaborn as sns
import textwrap
import re
import pynlpir
pynlpir.open()
from sklearn import metrics
from scipy import stats
import scipy as sp

from helpers import *
from generate import *
from diagnostic import *

from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import Ridge

from scipy.stats import pearsonr

# Analysis on linux source code language model

### Import model

In [2]:
model_lstm = torch.load('models/linux_3x512_0d3_lstm_200l_40000E.model').cuda()
model_gru = torch.load('models/linux_3x512_0d3_gru_200l_40000E.model').cuda()
print('Perplexity LSTM:', 2**np.mean([test_model(model_lstm, 'data/linux/test.txt') for _ in range(1)]))
print('Perplexity GRU: ', 2**np.mean([test_model(model_gru, 'data/linux/test.txt') for _ in range(1)]))



Perplexity LSTM: 2.26045721122
Perplexity GRU:  2.30455089006


### Generate text

In [3]:
print('\n============== LSTM MODEL ==============\n')
text, hiddens = generate(model_lstm, '\n\n', 500, 0.8, True)
print(text)

print('\n============== GRU MODEL ===============\n')
text, hiddens = generate(model_gru, '\n\n', 500, 0.8, True)
print(text)



tatic void atomisp_css_hs_enable_attributes(struct sock *sk)
{
	int err;

	switch (osd_req->chanspec) {
	case SYS_CTL_DAC_OP:
		if (status & APD_SOT_SUPPORTED) {
			if (sg->soft_soft_state != ASIC_IC_OPTION) {
				sg_send_msg(as, state->soft_seq, mask, sg_size_entry);
				snd_soc_update_bits(sdc, cycles_state, skb);

				spin_unlock(&dss->set_ssid_lock);
				ssa_port_set_current_dump(support_start, SSP_SRC_UNKNOWN);
				success_ops->soc_ops(sk, SAS_OK);
				msg_set_speed_base(asus->sdma_cpu, SAS


if (dev->irq == NULL) {
		pr_err("%s: failed to read this reset\n");
		return;
	}

	/* hardware colorspace in State */
	if (action) {
		for (i = 0; i < 1000;
}

static void atmel_walk_state_transaction(struct sk_buff *header)
{
	int rc = 0;

	/* Check if the static values */
	if (cpuid_connect < 0 || chpi->base_state == QLA82XX_DIG_REQUEST_ATTR_FLAG_LOAD)
		cs->enable_idx = 1;
	else
		addr |= HIDMAV_IN_TX_DMA_CONFIGURED;
	if (!(mode & MISC_REG_DISPLAY_ENCODE)) {
		if (priv->cma_enabled & (c

### Define Code Hypotheses

In [4]:
def hypothesis_inlinecounter(text):
    hyp = np.concatenate([np.linspace(1, -1, len(x)+1) for x in text.split('\n')])[:-1]
    return hyp

def hypothesis_inside_one(text, single):
    hyp = re.sub('\{}.*?\{}'.format(single, single), lambda m: single+'#'*(len(m.group())-2)+single, text)
    return np.array([1 if x == '#' else -1 for x in hyp])

def hypothesis_inside_two(text, left, right):
    hyp = np.full(len(text), -1)
    inside = False
    for i in range(len(text) - 1):
        if text[i] == left:
            inside = True
        elif text[i] == right:
            inside = False
        if inside:
            hyp[i+1] = 1
    return hyp

hypothesis_inside_quotation = lambda x: hypothesis_inside_one(x, '"')
hypothesis_inside_parantheses = lambda x: hypothesis_inside_two(x, '(', ')')

def hypothesis_comments(text):
    hyp = np.full(len(text), -1)
    in_brac_comment = False
    in_line_comment = False
    for i in range(len(text)):
        if text[i:i+2] == '//':
            in_line_comment = True
        elif text[i] == '\n':
            in_line_comment = False
        elif text[i:i+2] == '/*':
            in_brac_comment = True
        elif text[i:i+2] == '*/':
            in_brac_comment = False
        if in_brac_comment:
            hyp[i:i+3] = 1
        if in_line_comment:
            hyp[i:i+1] = 1
    return hyp

def hypothesis_indentation(text, level):
    hyp = np.full(len(text), -1)
    cur_level = 0
    for i, char in enumerate(text):
        if char == '\n':
            cur_level = 0
        elif char == '\t':
            cur_level += 1
        if cur_level >= level:
            hyp[i] = 1
    return hyp

# plot_colored_text(text, hypothesis_inlinecounter(text), title='Hypothesis: Inline counter', save_file='plots/hyp_inline_counter.png')
# plot_colored_text(text, hypothesis_inside_quotation(text), title='Hypothesis: Inside quotation', save_file='plots/hyp_inside_quotation.png')
# plot_colored_text(text, hypothesis_inside_parantheses(text), title='Hypothesis: Inside parantheses', save_file='plots/hyp_inside_parantheses.png')
# plot_colored_text(text, hypothesis_comments(text), title='Hypothesis: Comments', save_file='plots/hyp_comments.png')
# plot_colored_text(text, hypothesis_indentation(text, 1), title='Hypothesis: Indent level 1', save_file='plots/hyp_indent_1.png')
# plot_colored_text(text, hypothesis_indentation(text, 2), title='Hypothesis: Indent level 2', save_file='plots/hyp_indent_2.png')
# plot_colored_text(text, hypothesis_indentation(text, 3), title='Hypothesis: Indent level 3', save_file='plots/hyp_indent_3.png')

### Validate hypotheses

In [5]:
def full_test(model, hypothesis, name, plot=False, train_len=95, test_len=10,
              ex_name='test'):
    y_true, y_pred = validate_hypothesis(model, LogisticRegression(), hypothesis,
                                         train_len=train_len, test_len=train_len,
                                         save_hyp='plots/hyp_{}.png'.format(ex_name),
                                         save_diag='plots/diag_{}.png'.format(ex_name),
                                         save_resp='plots/resp_{}.png'.format(ex_name))
    metric_pearsonr = lambda a, b: stats.pearsonr(a, b)[0]
    
    print("Hypothesis: {} (normal)".format(name))
    print('acc:      ', metrics.accuracy_score(y_true, y_pred))
    print('prec:     ', metrics.precision_score(y_true, y_pred))
    print('recall:   ', metrics.recall_score(y_true, y_pred))
    print('f1-score: ', metrics.f1_score(y_true, y_pred))
    print('pearsonr: ', metric_pearsonr(y_true, y_pred))
    y_true, y_pred = validate_hypothesis(model, LogisticRegression(class_weight='balanced'),
                                         hypothesis, train_len=train_len, test_len=test_len,
                                         save_hyp='plots/hyp_{}_balanced.png'.format(ex_name),
                                         save_diag='plots/diag_{}_balanced.png'.format(ex_name),
                                         save_resp='plots/resp_{}_balanced.png'.format(ex_name))
    print("Hypothesis: {} (balanced)".format(name))
    print('acc:      ', metrics.accuracy_score(y_true, y_pred))
    print('prec:     ', metrics.precision_score(y_true, y_pred))
    print('recall:   ', metrics.recall_score(y_true, y_pred))
    print('f1-score: ', metrics.f1_score(y_true, y_pred))
    print('pearsonr: ', metric_pearsonr(y_true, y_pred))

In [9]:
def validate_hypothesis(model, diag_classifier, hypothesis, train_len=50,
                        test_len=1, text_len=500, temperature=0.8,
                        save_hyp=None, save_diag=None, save_resp=None):
    # Generate hypothesis data
    def gen_hyp_data(model, N, text_len=500):
        texts, hiddens, hyps = [], [], []
        for i in range(N):
            text, hidden = generate(model, '\n\n', text_len, temperature, True)
            hidden = hidden.reshape(hidden.shape[0], -1)
            hyp = hypothesis(text)
            hiddens.append(hidden)
            hyps.append(hyp)
            texts.append(text)
        return ''.join(texts), np.concatenate(hyps), np.concatenate(hiddens)

    # Generate train and test data
    _, train_hyps, train_hiddens = gen_hyp_data(model, train_len)
    test_texts, test_hyps, test_hiddens = gen_hyp_data(model, test_len)
    print(pearsonr(train_hiddens, train_hyps))
    print(pearsonr(test_hiddens, test_hyps))

    # Train Diagnostic Classifier
    diag_classifier.fit(train_hiddens, train_hyps)
    
    # Predict with Diagnostic Classifier
    pred_hyps = diag_classifier.predict(test_hiddens)
    
    # Find responsible neuron
    resp_neuron = np.argmax(np.abs(diag_classifier.coef_))
    print(resp_neuron)
    
    # Plot results
    if save_hyp:
        plot_colored_text(test_texts[:text_len], test_hyps[:text_len],
                          title='Formed Hypothesis',
                          save_file=save_hyp)
    if save_diag:
        plot_colored_text(test_texts[:text_len], pred_hyps[:text_len],
                          title='Diagnostic Classifier Prediction',
                          save_file=save_diag)
    if save_resp:
        plot_colored_text(test_texts[:text_len], test_hiddens[:text_len, resp_neuron],
                          title='Most Responsible Neuron {}'.format(resp_neuron),
                          save_file=save_resp)
        
    del(train_hyps)
    del(train_hiddens)
    del(test_texts)
    del(test_hiddens)
    gc.collect()
    
    return test_hyps, pred_hyps

In [10]:
for model_name, model in [('linux_lstm', model_lstm), ('linux_gru', model_gru)]:
    print("MODEL: ", model_name)
    y_true, y_pred = validate_hypothesis(model, LinearRegression(), hypothesis_inlinecounter,
                                         train_len=95, test_len=1,
                                         save_hyp='plots/{}_hyp_inlinecounter.png'.format(model_name),
                                         save_diag='plots/{}_diag_inlinecounter.png'.format(model_name),
                                         save_resp='plots/{}_resp_inlinecounter.png'.format(model_name))

MODEL:  linux_lstm


ValueError: operands could not be broadcast together with shapes (47500,3072) (47500,) 

In [None]:
for model_name, model in [('linux_lstm', model_lstm), ('linux_gru', model_gru)]:
    print("MODEL: ", model_name)
    full_test(model, hypothesis_inside_quotation, 'Inside Quotation',
              train_len=95, test_len=10, ex_name='inside_quotation'.format(model_name))
    full_test(model, hypothesis_comments, 'Comments',
              train_len=95, test_len=10, ex_name='{}_inside_comments'.format(model_name))
    full_test(model, lambda x: hypothesis_indentation(x, 1), 'Indentation level 1',
              train_len=95, test_len=10, ex_name='{}_inside_indent_1'.format(model_name))
    full_test(model, lambda x: hypothesis_indentation(x, 2), 'Indentation level 2',
              train_len=95, test_len=10, ex_name='{}_inside_indent_2'.format(model_name))
    full_test(model, lambda x: hypothesis_indentation(x, 3), 'Indentation level 3',
              train_len=95, test_len=10, ex_name='{}_inside_indent_3'.format(model_name))
    full_test(model, hypothesis_inside_parantheses, 'Inside Parantheses',
              train_len=95, test_len=10, ex_name='{}_inside_parantheses'.format(model_name))

# Analysis on Shakespeare model 

### Load Shakespeare language odel

In [None]:
# model = torch.load('models/simple.model').cuda()
# print('Perplexity:', 2**np.mean([test_model(model, 'data/tiny-shakespeare/test.txt') for _ in range(1)]))
model_lstm = torch.load('models/shake_2x128_lstm_3000').cuda()
print('Perplexity:', 2**np.mean([test_model(model_lstm, 'data/tiny-shakespeare/test.txt') for _ in range(1)]))
model_gru = torch.load('models/shake_2x128_gru_3000').cuda()
print('Perplexity:', 2**np.mean([test_model(model_gru, 'data/tiny-shakespeare/test.txt') for _ in range(1)]))

### Generate text

In [None]:
print('\n============== LSTM MODEL ==============\n')
text, hiddens = generate(model_lstm, '\n\n', 500, 0.8, True)
print(text)

print('\n============== GRU MODEL ===============\n')
text, hiddens = generate(model_gru, '\n\n', 500, 0.8, True)
print(text)


### Define language hypotheses

In [None]:
def hypothesis_inlinecounter(text):
    hyp = np.concatenate([np.linspace(1, -1, len(x)+1) for x in text.split('\n')])[:-1]
    return hyp

def hypothesis_capswords(text):
    hyp = np.concatenate([np.full(len(x)+1, 1) if re.sub('[^a-zA-Z]+', '', x).isupper() else np.full(len(x)+1, -1) for x in text.split('\n')])[:-1]
    return hyp

def hypothesis_pos(text, pos_tag):
    hyp = text.replace('1', '0')
    for word, tag in pynlpir.segment(text):
        if tag == pos_tag:
            hyp = hyp.replace(word, '1'*len(word), 1)
        else:
            hyp = hyp.replace(word, '0'*len(word), 1)
    hyp = [1 if x=='1' else -1 for x in re.sub('[^1]', '0', hyp)]
    
    return hyp

def hypothesis_verbs(text):
    return hypothesis_pos(text, 'verb')

def hypothesis_nouns(text):
    return hypothesis_pos(text, 'noun')

In [None]:
# plot_colored_text(text, hypothesis_inlinecounter(text), title='Hypothesis: Inline Counter', save_file='plots/hyp_inlinecounter.png')
# plot_colored_text(text, hypothesis_capswords(text), title='Hypothesis: Capitalized Words', save_file='plots/hyp_capswords.png')
# plot_colored_text(text, hypothesis_verbs(text), title='Hypothesis: Verbs', save_file='plots/hyp_verbs')
# plot_colored_text(text, hypothesis_nouns(text), title='Hypothesis: Nouns', save_file='plots/hyp_nouns')

## Validate hypotheses

In [None]:
# Generate hypothesis data
def gen_hyp_data(model, N, text_len=500):
    texts, hiddens, hyps = [], [], []
    for i in range(N):
        text, hidden = generate(model, '\n\n', text_len, 0.8, True)
        hidden = hidden.reshape(hidden.shape[0], -1)
        hyp = hypothesis_inlinecounter(text)
        hiddens.append(hidden)
        hyps.append(hyp)
        texts.append(text)
    return ''.join(texts), np.concatenate(hyps), np.concatenate(hiddens)

# Generate train and test data
train_texts, train_hyps, train_hiddens = gen_hyp_data(model_gru, 500)
test_texts, test_hyps, test_hiddens = gen_hyp_data(model_gru, 2)

In [None]:
diag_classifier = LinearRegression()
# Train Diagnostic Classifier
diag_classifier.fit(train_hiddens, train_hyps)

In [None]:
np.argsort(np.abs(diag_classifier.coef_))

In [None]:
scipy.stats.pearsonr(x, y)

In [None]:
# Find responsible neuron
resp_neuron = np.argmax(np.abs(diag_classifier.coef_))
print(resp_neuron)

In [None]:
for i in np.argsort(np.abs(diag_classifier.coef_))[-10:]:
    plot_colored_text(train_texts[:500], train_hiddens[:500, i],
                      title='Most Responsible Neuron {}'.format(i),
                      save_file='plots/temp_{}.png'.format(i))


In [None]:
for i in range(200):
    plot_colored_text(train_texts[:500], train_hiddens[:500, i],
                      title='Most Responsible Neuron {}'.format(i),
                      save_file='plots/temp_{}.png'.format(i))


In [None]:
y_true, y_pred = validate_hypothesis(model, LinearRegression(), hypothesis_inlinecounter,
                                     train_len=95, test_len=5,
                                     save_hyp='plots/hyp_inlinecounter_shake.png',
                                     save_diag='plots/diag_inlinecounter_shake.png',
                                     save_resp='plots/resp_inlinecounter_shake.png')

In [None]:
for model_name, model in [('shake_lstm', model_lstm), ('shake_gru', model_gru)]:
    print("MODEL: ", model_name)
    
    full_test(model, hypothesis_capswords, 'Capitalized Words',
              train_len=95, test_len=5, ex_name='{}_capswords'.format(model_name))
    full_test(model, hypothesis_nouns, 'Nouns',
              train_len=95, test_len=5, ex_name='{}_nouns'.format(model_name))
    full_test(model, hypothesis_verbs, 'Verbs',
              train_len=95, test_len=5, ex_name='{}_verbs'.format(model_name))