Required Imports

In [None]:
import IPython.display as ipd
import sys
sys.path.append('./network/')
from branched_network_class import branched_network
import tensorflow as tf
from scipy import signal
import scipy.io.wavfile as wav
import matplotlib as plt 
%pylab inline
from pycochleagram import cochleagram as cgram 
from PIL import Image
from pydub import AudioSegment
import os
import random
from sphfile import SPHFile
import string
import shutil
from heapq import nlargest
import copy

Helper functions for setup

In [2]:
def load_model():
    np.seterr(divide = 'ignore')
    tf.reset_default_graph()
    net_object = branched_network()
    word_key = np.load('./demo_stim/logits_to_word_key.npy')
    w1 = word_key[:242]
    w2 = word_key[243:588]
    word_key = np.concatenate((w1, w2))
    new_key = []
    trans = str.maketrans('', '', string.punctuation)
    for word in word_key:
        new_key.append(word.decode('UTF-8').lower().translate(trans).strip())
    return net_object, new_key

In [3]:
def build_word_bank():
    bank = set()
    trans = str.maketrans('', '', string.punctuation)
    with open('wordbank.txt','r') as f:
        for line in f:
            for word in line.split():
                bank.add(word.lower().translate(trans).strip())
    return bank

Helper functions for file processing

In [4]:
def process_wav(sr, wav_f, offset): #offset assumes sr of 16000
    min_len = 2 * sr
    if len(wav_f) < min_len + offset:
        buf = min_len + offset - len(wav_f)
        silence = np.zeros((buf,), dtype = int)
        wav_f = np.concatenate((wav_f, silence))
    cut = wav_f[offset : offset + 2 * sr]
    return cut

In [5]:
def resample(example, new_size):
    im = Image.fromarray(example)
    resized_image = im.resize(new_size, resample=Image.ANTIALIAS)
    return np.array(resized_image)

def plot_cochleagram(cochleagram, title): 
    plt.figure(figsize=(6,3))
    plt.matshow(cochleagram.reshape(256,256), origin='lower',cmap=plt.cm.Blues, fignum=False, aspect='auto')
    plt.yticks([]); plt.xticks([]); plt.title(title); 
    
def play_wav(wav_f, sr, title):   
    print (title+':')
    ipd.display(ipd.Audio(wav_f, rate=sr))

In [6]:
def generate_cochleagram(wav_f, sr, title):
    # define parameters
    n, sampling_rate = 50, 16000
    low_lim, hi_lim = 20, 8000
    sample_factor, pad_factor, downsample = 4, 2, 200
    nonlinearity, fft_mode, ret_mode = 'power', 'auto', 'envs'
    strict = True

    # create cochleagram
    #print(type(wav_f))
    #print(wav_f.shape)
    c_gram = cgram.cochleagram(wav_f, sr, n, low_lim, hi_lim, 
                               sample_factor, pad_factor, downsample,
                               nonlinearity, fft_mode, ret_mode, strict)
    #frequencies, times, c_gram = signal.spectrogram(wav_f, sr)
    
    # rescale to [0,255]
    c_gram_rescaled =  255*(1-((np.max(c_gram)-c_gram)/np.ptp(c_gram)))
    #print(type(c_gram_rescaled))
    
    # reshape to (256,256)
    c_gram_reshape_1 = np.reshape(c_gram_rescaled, (211,400))
    c_gram_reshape_2 = resample(c_gram_reshape_1,(256,256))
    
    #plot_cochleagram(c_gram_reshape_2, title)

    # prepare to run through network -- i.e., flatten it
    c_gram_flatten = np.reshape(c_gram_reshape_2, (1, 256*256)) 
    
    return c_gram_flatten

In [7]:
def check_bank(word, bank):
    trans = str.maketrans('', '', string.punctuation)
    return word.lower().translate(trans).strip() in bank

In [8]:
#extracts full string, speaker detail string, title, and offset
def extract_details(f):
    info = f[:f.index(".")]
    sp_des = info[:5]
    f = info[5:]
    title = f[:f.index("-")]
    offset = f[f.index("-") + 1 :]
    return sp_des, title, int(offset)

def extract_text(f):
    info = f[:f.index(".")]
    sp_des = info[:5]
    f = info[5:]
    return sp_des, f

In [9]:
def read_txt(path, bank): #reads the transcript of a txt file
    with open(path, encoding = 'us-ascii') as t:
        tscript = t.readlines()[0]
    tscript = tscript.split(' ')[2:]
    base = ''
    trans = str.maketrans('', '', string.punctuation)
    word_set = dict()
    for i in range(len(tscript)):
        x = tscript[i]
        base += x + ' '
        basic = x.lower().translate(trans).strip()
        if check_bank(basic, bank):
            if basic in word_set:
                word_set[basic].append(i + 1)
            else:
                word_set[basic] = [i + 1]           
    base = base[:len(base) - 1]
    return base, word_set

In [10]:
def check_txt(path, bank, mini, maxi): #reads and checks whther banked words are within [mini, maxi]
    with open(path, encoding = 'us-ascii') as t:
        tscript = t.readlines()[0]
    tscript = tscript.split(' ')[2:]
    base = ''
    trans = str.maketrans('', '', string.punctuation)
    word_set = dict()
    if len(tscript) < mini:
        return False
    for i in range(min(mini - 1, len(tscript) - 1), min(maxi, len(tscript))):
        x = tscript[i]
        #print(x)
        base += x + ' '
        basic = x.lower().translate(trans).strip()
        if check_bank(basic, bank):
            if basic in word_set:
                word_set[basic].append(i + 1)
            else:
                word_set[basic] = [i + 1]        
    base = base[:len(base) - 1]
    if len(word_set) == 0:
        return False
    #print(base)
    return word_set

Helper functions for calculating and displaying results

In [11]:
def run_file(sr, wav_f, title, model):
    c_gram = generate_cochleagram(wav_f, sr, title)
    logits = model.session.run(model.word_logits, feed_dict={model.x: c_gram})
    l1 = logits[0][:242]
    l2 = logits[0][243:588]
    return np.concatenate((l1, l2))

In [12]:
def run_cgram(cgram, model):
    logits = model.session.run(model.word_logits, feed_dict={model.x: cgram})
    l1 = logits[0][:242]
    l2 = logits[0][243:588]
    return np.concatenate((l1, l2))

In [13]:
def distrib(logits, n): #gets top n outcomes
    inds = np.argpartition(logits, -1 * n)[-1 * n:]
    return inds[np.argsort(logits[inds])][::-1]

Functions for moving or converting files

In [15]:
def dr_conv(dr):
    #Used to convert SPH files to wavs in each subdirectory in training dataset
    path = "./TIMIT/TEST/" + f"DR{dr}" #shouldnt ever need to do again but can change back to train
    for subdir in os.listdir(path):
        new_path = os.path.join(path, subdir)
        #print(new_path)
        if os.path.isdir(new_path):
            #print(subdir)
            wavpath = new_path + '/wavs'
            os.mkdir(wavpath)
            for filename in os.listdir(new_path):
                if filename.lower().endswith('.wav'):
                    audiopath = os.path.join(new_path, filename)
                    #print(audiopath)
                    sph = SPHFile(audiopath)
                    sph.write_wav(os.path.join(new_path + '/wavs', filename))

In [16]:
#semi-thoroughly tests all wav's in a single folder of timit data
def timit_folder_test(dr, folder):
    or_path = "./TIMIT/TRAIN/" + f"DR{dr}/" + folder
    full_path = or_path + "/wavs"
    print(f'Testing all files in {full_path}\n')
    np.seterr(divide = 'ignore')
    tf.reset_default_graph()
    net_object = branched_network()
    word_key = np.load('./demo_stim/logits_to_word_key.npy')
    w1 = word_key[:242]
    w2 = word_key[243:588]
    word_key = np.concatenate((w1, w2))
    for filename in os.listdir(full_path):
        print(filename)
        title = filename[:filename.index(".")]
        print(f'The file identifier is {title}.')
        audiopath = os.path.join(full_path, filename)
        textpath = os.path.join(or_path, title + '.txt')
        #print(textpath)
        with open(textpath, encoding = 'us-ascii') as t:
            tscript = t.readlines()[0]
        tscript = tscript.split(' ')[2:]
        base = ''
        for x in tscript:
            base += x + ' '
        base = base[:len(base) - 1]
        print(f'The sentence transcription is: {base}')
        min_len = 32000
        sr, wav_f = wav.read(audiopath)
        for i in range(1): #offset from 0 to .5 seconds to explore window
            #can change range to 0 for one iteration
            offset = i * (sr // 10)
            print(f'Testing offset of {i / 10} seconds.\n')
            if len(wav_f) < min_len + offset:
                buf = min_len + offset - len(wav_f)
                silence = np.zeros((buf,), dtype = int)
                wav_f = np.concatenate((wav_f, silence))
            cut = wav_f[offset : offset + 2 * sr]
            #print(cut.shape)
            c_gram = generate_cochleagram(cut, sr, f'{audiopath}')
            logits = net_object.session.run(net_object.word_logits, feed_dict={net_object.x: c_gram})
            l1 = logits[0][:242]
            l2 = logits[0][243:588]
            logits = np.concatenate((l1, l2))
            #printing top 5 distribution
            inds = np.argpartition(logits, -5)[-5:]
            inds = inds[np.argsort(logits[inds])][::-1]
            for i in range(5):
                pred = word_key[inds[i]].decode('UTF-8')
                print(f"Result #{i + 1}: {pred}")
            print('\n')        

In [17]:
def timit_dr_test(dr):
    dr_path = "./TIMIT/TRAIN/" + f"DR{dr}"
    print(f'Testing all files in DR{dr}\n')
    np.seterr(divide = 'ignore')
    tf.reset_default_graph()
    net_object = branched_network()
    word_key = np.load('./demo_stim/logits_to_word_key.npy')
    w1 = word_key[:242]
    w2 = word_key[243:588]
    word_key = np.concatenate((w1, w2))
    for sd in os.listdir(dr_path):
        set_path = os.path.join(dr_path, sd)
        if os.path.isdir(set_path):
            print(f'Testing the files in {set_path}\n')
            full_path = os.path.join(set_path, 'wavs')
            for filename in os.listdir(full_path):
                #print(filename)
                title = filename[:filename.index(".")]
                print(f'The file identifier is {title}.')
                audiopath = os.path.join(full_path, filename)
                textpath = os.path.join(set_path, title + '.txt')
                #print(textpath)
                with open(textpath, encoding = 'us-ascii') as t:
                    tscript = t.readlines()[0]
                tscript = tscript.split(' ')[2:]
                base = ''
                for x in tscript:
                    base += x + ' '
                base = base[:len(base) - 1]
                print(f'The sentence transcription is: {base}')
                min_len = 32000
                sr, wav_f = wav.read(audiopath)
                for i in range(1): #offset from 0 to .5 seconds to explore window
                    #can change range to 0 for one iteration
                    offset = i * (sr // 10)
                    print(f'Testing offset of {i / 10} seconds.\n')
                    if len(wav_f) < min_len + offset:
                        buf = min_len + offset - len(wav_f)
                        silence = np.zeros((buf,), dtype = int)
                        wav_f = np.concatenate((wav_f, silence))
                    cut = wav_f[offset : offset + 2 * sr]
                    #print(cut.shape)
                    c_gram = generate_cochleagram(cut, sr, f'{audiopath}')
                    logits = net_object.session.run(net_object.word_logits, feed_dict={net_object.x: c_gram})
                    l1 = logits[0][:242]
                    l2 = logits[0][243:588]
                    logits = np.concatenate((l1, l2))
                    #printing top 5 distribution
                    inds = np.argpartition(logits, -5)[-5:]
                    inds = inds[np.argsort(logits[inds])][::-1]
                    for i in range(5):
                        pred = word_key[inds[i]].decode('UTF-8')
                        print(f"Result #{i + 1}: {pred}")
                    print('\n')

In [18]:
def dialect_test(n):
    #Build the unchanging parts of the model
    net_object, word_key = load_model()
    word_bank = build_word_bank()
    
    #Loop through the 8 dialects
    base_dir = "./TIMIT/TRAIN"
    for i in range(1,9):
        dr_path = os.path.join(base_dir, f'DR{i}')
        print(f'Sampling from dialect {i}.\n')

        #Reset success and fail values
        s_count = 0
        f_count = 0
        
        #sample from n different folders to ensure uniquity
        sds = random.sample(os.listdir(dr_path), n)
        for sd in sds:
            set_path = os.path.join(dr_path, sd)
            if os.path.isdir(set_path):
                full_path = os.path.join(set_path, 'wavs')
                files = os.listdir(full_path)

                #bounce the shared sentences
                files.remove('SA1.WAV')
                files.remove('SA2.WAV')

                #select random audio file
                fd = random.sample(files, 1)[0] #change number of looks here
                title = fd[:fd.index(".")]
                print(f'Reading from set {sd}. The file identifier is {title}.')
                audiopath = os.path.join(full_path, fd)
                textpath = os.path.join(set_path, title + '.txt')

                #display transcription
                with open(textpath, encoding = 'us-ascii') as t:
                    tscript = t.readlines()[0]
                tscript = tscript.split(' ')[2:]
                base = ''
                trans = str.maketrans('', '', string.punctuation)
                word_set = set()
                flag = False
                for x in tscript:
                    base += x + ' '
                    basic = x.lower().translate(trans).strip()
                    word_set.add(basic)
                    if not flag:
                        if check_bank(basic, word_bank):
                            flag = True
                if not flag:
                    print('No words in training set.\n')
                    continue
                base = base[:len(base) - 1]
                print(f'The sentence transcription is: {base}')

                #pad out file if necessary
                sr, wav_f = wav.read(audiopath)
                wav_f = process_wav(sr, wav_f, 0) #no offset

                #generate cochleagram and run through model
                logits = run_file(sr, wav_f, f'{audiopath}', net_object)

                #get top 5 results
                inds = distrib(logits, 5)

                #measure success or failure
                flag = False
                for j in range(5):
                    pred = word_key[inds[j]].decode('UTF-8')
                    pred = pred.lower().translate(trans).strip()
                    if pred in word_set:
                        flag = True
                        print(f"Success, '{pred}' was correctly identified.\n")
                        s_count += 1
                        break
                if flag == False:
                    print('Failure.\n')
                    f_count += 1
        print(f"\nResult for dialect {i}: {s_count} successes out of {s_count + f_count} attempts.")

In [19]:
def dialect_random_test(n):
    #Build the unchanging parts of the model
    net_object, word_key = load_model()
    word_bank = build_word_bank()
    
    #Loop through the 8 dialects
    base_dir = "./TIMIT/TRAIN"
    for i in range(1,9):
        dr_path = os.path.join(base_dir, f'DR{i}')
        audio_set = []
        print(f'Sampling from dialect {i}.\n')

        #Reset success and fail values
        s_count = 0
        t_count = 0
        
        #build full set of audio options
        for sd in os.listdir(dr_path):
            set_path = os.path.join(dr_path, sd)
            if os.path.isdir(set_path):
                full_path = os.path.join(set_path, 'wavs')
                files = os.listdir(full_path)
                files.remove('SA1.WAV')
                files.remove('SA2.WAV')
                for f in files:
                    title = f[:f.index(".")]
                    audiopath = os.path.join(full_path, f)
                    textpath = os.path.join(set_path, title + '.txt')
                    audio_set.append((audiopath, textpath))
        

        #select n valid files
        while t_count < n and len(audio_set) != 0:
            fd = random.choice(audio_set)
            audio_set.remove(fd)
            audiopath = fd[0]
            textpath = fd[1]

            #check transcript and display
            with open(textpath, encoding = 'us-ascii') as t:
                tscript = t.readlines()[0]
            tscript = tscript.split(' ')[2:]
            base = ''
            trans = str.maketrans('', '', string.punctuation)
            word_set = set()
            flag = 0
            for x in tscript:
                base += x + ' '
                basic = x.lower().translate(trans).strip()
                word_set.add(basic)
                if flag != 2:
                    if check_bank(basic, word_bank):
                        flag += 1
            if flag != 2:
                print('Not enough words in training set.\n')
                continue
            base = base[:len(base) - 1]
            print(f'The sentence transcription is: {base}')

            #pad out file if necessary
            sr, wav_f = wav.read(audiopath)
            wav_f = process_wav(sr, wav_f, 0) #no offset

            #generate cochleagram and run through model
            logits = run_file(sr, wav_f, f'{audiopath}', net_object)

            #get top 5 results
            inds = distrib(logits, 5)

                #measure success or failure
            flag = False
            for j in range(5):
                pred = word_key[inds[j]].decode('UTF-8')
                pred = pred.lower().translate(trans).strip()
                if pred in word_set:
                    flag = True
                    print(f"Success, '{pred}' was correctly identified.\n")
                    s_count += 1
                    t_count += 1
                    break
            if flag == False:
                print('Failure.\n')
                t_count += 1

        print(f"\nResult for dialect {i}: {s_count} successes out of {t_count} attempts.")
    return

In [20]:
def test_core(dr, c1, c2, c3): #dr is the dialect, c1, c2 are the male core speakers, c3 is the female core speaker
    #Build the unchanging parts of the model
    net_object, word_key = load_model()
    word_bank = build_word_bank()
    dr_path = os.path.join("./TIMIT/TEST", f"DR{dr}")
    speakers = [f"M{c1}", f"M{c2}", f"F{c3}"]

    #Loop through the 3 core speakers
    for sd in os.listdir(dr_path):
        if sd in speakers:
            print(f"Testing core speaker {sd}.")
            base_path = os.path.join(dr_path, sd)
            full_path = os.path.join(base_path, 'wavs')
            
            #Reset success and fail values
            s_count = 0
            t_count = 0

            #Loop through wavs
            for f in os.listdir(full_path):
                title = f[:f.index(".")]
                audiopath = os.path.join(full_path, f)
                textpath = os.path.join(base_path, title + '.txt')
                print(f'The file identifier is {title}.')

                with open(textpath, encoding = 'us-ascii') as t:
                    tscript = t.readlines()[0]
                tscript = tscript.split(' ')[2:]
                base = ''
                trans = str.maketrans('', '', string.punctuation)
                word_set = set()
                for x in tscript:
                    base += x + ' '
                    basic = x.lower().translate(trans).strip()
                    if check_bank(basic, word_bank):
                        word_set.add(basic)
                
                base = base[:len(base) - 1]
                print(f'The sentence transcription is: {base}')
                if len(word_set) == 0:
                    print('No words in training set.\n')
                    continue
                print(f'Words in the training set are {word_set}.')

                #pad out file if necessary
                sr, wav_f = wav.read(audiopath)
                wav_f = process_wav(sr, wav_f, 0) #no offset

                #generate cochleagram and run through model
                logits = run_file(sr, wav_f, f'{title}', net_object)

                #get top 5 results
                inds = distrib(logits, 5)

                #interpret results
                flag = False
                for j in range(5):
                    pred = word_key[inds[j]].decode('UTF-8')
                    print(f"Result #{j + 1}: {pred}")
                    pred = pred.lower().translate(trans).strip()
                    if pred in word_set:
                        flag = True
                        print(f"Success, '{pred}' was correctly identified.")
                t_count += 1
                if flag == False:
                    print('Failure.\n')
                else:
                    print('Success.\n')
                    s_count += 1
            print(f"\nResult for speaker {sd}: {s_count} successes out of {t_count} attempts.")

In [21]:
#finds the position distribution of all correctly identified words in files with a certain offset in dr
def get_pos(dr, p_range, offset):
    #Build the unchanging parts of the model
    net_object, word_key = load_model()
    word_bank = build_word_bank()

    #set necessary paths
    bp = os.path.join("graph_data", f"dr{dr}")
    t_dir = os.path.join(bp, "txts")
    c_dir = os.path.join(bp, "cgrams")
    c_dir = os.path.join(c_dir, f"{offset}")

    #initialize data structures
    pos = dict()
    certs = dict()
    s_count = 0
    total = 0

    #loop through files in directory
    for fd in os.listdir(c_dir):
        #run through model
        sp_des, title, o = extract_details(fd)
        info = sp_des + title
        textpath = os.path.join(t_dir, f"{info}.txt")
        base, word_set = read_txt(textpath, word_bank)
        #print(f"{word_set} identifiable in '{base}'")
        #print(c_dir, fd)
        cgram = np.load(os.path.join(c_dir, fd))
        logits = run_cgram(cgram, net_object)
        inds = distrib(logits, p_range)

        #interpret results
        flag = False
        trans = str.maketrans('', '', string.punctuation)
        for j in range(p_range):
            pred = word_key[inds[j]]
            pred = pred.lower().translate(trans).strip()
            if pred in word_set:
                flag = True
                index = word_set[pred][0]
                #print(f"Success, '{pred}' was correctly identified in position {index}, with certainty of {j+1}.\n")
                if (j + 1) in certs:
                    certs[j+1] += 1
                else:
                    certs[j+1] = 1
                if index in pos:
                        pos[index] += 1
                else:
                    pos[index] = 1
                break
        total += 1
        if flag == False:
            pass
            #print('Failure.\n')
        else:
            s_count += 1
    print(f"Ratio: {s_count} / {total}")
    print(f'Positionings: {pos}')
    print(f'Certainties: {certs}\n')
    #return pos, certs, s_count, total

In [22]:
def reform_data(s, dr, offset):
    print(f"Reforming dr{dr} with offset {offset}.")
    np.seterr(divide = 'ignore')
    dest = os.path.join("graph_data", f"dr{dr}")
    #os.mkdir(dest)
    a_dir = os.path.join(dest, "wavs")
    t_dir = os.path.join(dest, "txts")
    c_dir = os.path.join(dest, "cgrams")
    if not os.path.isdir(a_dir):
        os.mkdir(a_dir)
    if not os.path.isdir(t_dir):
        os.mkdir(t_dir)
    if not os.path.isdir(c_dir):
        os.mkdir(c_dir)
    o_dir = os.path.join(c_dir, f"{offset}")
    if not os.path.isdir(o_dir):
        os.mkdir(o_dir)
    dr_path = os.path.join(f"./TIMIT/{s}", f"DR{dr}")
    word_bank = build_word_bank()

    sdc = 0

    for sd in os.listdir(dr_path):
        base_path = os.path.join(dr_path, sd)
        if os.path.isdir(base_path):
            sdc += 1
            print(f"Entering subdirectory number {sdc}; {sd}.")
            full_path = os.path.join(base_path, 'wavs')
            for f in os.listdir(full_path):
                frag = f[:f.index(".")]
                title = sd + frag

                audiopath = os.path.join(full_path, f)
                textpath = os.path.join(base_path, frag + '.txt')

                #check validity
                with open(textpath, encoding = 'us-ascii') as t:
                    tscript = t.readlines()[0]
                tscript = tscript.split(' ')[2:]
                trans = str.maketrans('', '', string.punctuation)
                word_set = set()

                for i in range(len(tscript)):
                    x = tscript[i]
                    basic = x.lower().translate(trans).strip()
                    if check_bank(basic, word_bank):
                        word_set.add(basic)
                        
                if len(word_set) == 0:
                            #print(f'No banked words for {title}.')
                            continue
                

                #print(f"{word_set} banked for {title}")

                sr, wav_f = wav.read(audiopath)
                wav_f = process_wav(sr, wav_f, offset)
                c_gram = generate_cochleagram(wav_f, sr, title)
                fname = os.path.join(a_dir, f"{title}.wav")
                if not os.path.isfile(fname):
                    shutil.copy(audiopath, fname)
                tname = os.path.join(t_dir, f"{title}.txt")
                if not os.path.isfile(tname):
                    shutil.copy(textpath, tname)
                cname = os.path.join(o_dir, f"{title}-{offset}.npy")
                np.save(cname, c_gram)
                #print(f"Complete")

In [23]:
def result_dist(word_set, word_key, inds):
    #returns true if one of the targets has been found
    #print(word_set)
    res = []
    trans = str.maketrans('', '', string.punctuation)
    for i in range(len(inds)):
        res.append(inds[i])
        pred = word_key[inds[i]]
        pred = pred.lower().translate(trans).strip()
        #print(pred)
        if pred in word_set:
            return True, res
    return False, res

In [24]:
def merge_dist(results, target, dist):
    if target in results:
        dictA = results[target]
    else:
        dictA = dict()
    l = len(dist)
    scale = (l**2 + l) // 2
    for i in range(l):
        place = l - i
        val = dist[i]
        dictA[val] = place / scale
    results[target] = dictA
    return results

In [25]:
#builds a graph for just one dr and just one offset
def build_standard_graph(dr, p_range, offset):
    #Build the unchanging parts of the model
    net_object, word_key = load_model()
    word_bank = build_word_bank()

    #set necessary paths
    bp = os.path.join("graph_data", f"dr{dr}")
    t_dir = os.path.join(bp, "txts")
    c_dir = os.path.join(bp, "cgrams")
    c_dir = os.path.join(c_dir, f"{offset}")

    #initialize data structures
    results = dict()
    cgram_set = []
    #print(word_bank)

    #loop through files in directory
    for f in os.listdir(c_dir):
        c_path = os.path.join(c_dir, f)
        sp_des, title, o = extract_details(f)
        info = sp_des + title
        t_path = os.path.join(t_dir, f"{info}.txt")
        s = check_txt(t_path, word_bank, 3, 5)
        if s:
            cgram_set.append((c_path, s))
            #print(s)
    #print(len(cgram_set))

    for c in cgram_set:
        cgram = np.load(c[0])
        word_set = c[1]
        logits = run_cgram(cgram, net_object)
        inds = distrib(logits, p_range)

        #scenario 1
        res, dist = result_dist(word_set, word_key, inds)
        #print(res)
        if len(word_set) == 1:
            for key in word_set:
                target = word_key.index(key)
            results = merge_dist(results, target, dist)

        else:
            res, dist = result_dist(word_set, word_key, inds)
            if res:
                target = dist[len(dist) - 1]
                results = merge_dist(results, target, dist)
            else:
                for key in word_set:
                    target = word_key.index(key)
                    results = merge_dist(results, target, dist)

    return(results)
        #interpret result
        

In [26]:
def binary(results):
    word_key = np.load('./demo_stim/logits_to_word_key.npy')
    #convert to binary
    for key in results:
        dictA = results[key]
        tVal = max(dictA, key= dictA.get)
        results[key] = tVal
        #print(key == tVal)

    #convert to numpy array
    #x_words = set()
    #y_words = set()
    words = set()
    for key in results:
        words.add(key)
        words.add(results[key])

    labels = sorted(list(words))

    m = len(labels)

    #rows results, columns expected
    data = np.zeros((m, m), dtype = int)

    for key in results:
        xpos = labels.index(key)
        ypos = labels.index(results[key])
        data[ypos][xpos] = 1

    for i in range(len(labels)):
        labels[i] = word_key[labels[i]].decode('UTF-8')
    return data, labels
    #return results


In [27]:
def results_to_nway(results, n):
    for key in results:
        dictA = results[key]
        tVal = nlargest(n, dictA, key = dictA.get)
        for i in range(len(tVal)):
            tVal[i] = (tVal[i], dictA[tVal[i]])
        results[key] = tVal

    return results

In [28]:
def n_way(results, n):
    word_key = np.load('./demo_stim/logits_to_word_key.npy')
    #convert to binary
    results = results_to_nway(results, n)

    #convert to numpy array
    #x_words = set()
    #y_words = set()
    words = set()
    for keys in results:
        words.add(keys)
        for key in results[keys]:
            words.add(key[0])

    labels = sorted(list(words))

    m = len(labels)
    #print(m)

    #rows results, columns expected
    data = np.zeros((m, m))

    for key in results:
        sub_arr = results[key]
        xpos = labels.index(key)
        s = 0
        for i in range(len(sub_arr)):
            s += sub_arr[i][1]

        for i in range(len(sub_arr)):
            ypos = labels.index(sub_arr[i][0])
            data[ypos][xpos] = sub_arr[i][1] / s
            
            
            #ypos = labels.index(results[key][i])
            
            #data[ypos][xpos] = 1 / (i + 1)

    for i in range(len(labels)):
        labels[i] = word_key[labels[i]].decode('UTF-8')

    #print(labels)
    return data, labels

In [29]:
def show_heatmap(data, x_labels, dr):
    fig, ax = plt.subplots()
    im = ax.imshow(data)

    ax.set_xticks(np.arange(data.shape[1]))
    ax.set_xticklabels(x_labels, rotation=-90)
    ax.set_yticks(np.arange(data.shape[0]))
    ax.set_yticklabels(x_labels)
    ax.set_title(f"Binary Results for DR {dr}")
    fig.set_size_inches(16, 16)
    fig.savefig(f"dr{dr}-binary.png")
    fig.tight_layout()
    plt.show()


In [30]:
def show_nway(data, x_labels, dr, n):
    fig, ax = plt.subplots()
    im = ax.imshow(data)

    ax.set_xticks(np.arange(data.shape[1]))
    ax.set_xticklabels(x_labels, rotation=-90)
    ax.set_yticks(np.arange(data.shape[0]))
    ax.set_yticklabels(x_labels)
    ax.set_title(f"{n}-Way Results for DR {dr}")
    cbar = ax.figure.colorbar(im, ax=ax)
    cbar.ax.set_ylabel("", rotation=-90, va="bottom")
    fig.tight_layout()
    fig.set_size_inches(12, 12)
    fig.savefig(f"dr{dr}-{n}.png")
    plt.show()

In [31]:
def binary_heatmap(dr, p_range, offset):
    r = build_standard_graph(dr, p_range, offset)
    data, labels = binary(r)
    show_heatmap(data, labels, dr)

def n_way_heatmap(dr, p_range, offset, n):
    r = build_standard_graph(dr, p_range, offset)
    data, labels = n_way(r, n)
    show_nway(data, labels, dr, n)

Helper Functions for Random Test

In [78]:
def random_sel(word_bank, n): #1 <= n <= 587
    l = random.sample(word_bank, n)
    bank = dict()
    for word in l:
        bank[word] = word_bank.index(word)
    return bank
#write random n-way to reorganize alphabetically and shift non-keys to end

def random_nway(results, n):
    word_key = np.load('./demo_stim/logits_to_word_key.npy')
    #print(word_key)

    results = results_to_nway(results, n)

    #convert to numpy array
    #x_words = set()
    #y_words = set()
    words = set()

    for keys in results:
        words.add(keys)
        #x_labels.append((keys, ))
        #for key in results[keys]:
            #words.add(key[0])
    y_adds = set()
    for keys in results:
        for key in results[keys]:
            if key[0] not in words:
                y_adds.add(key[0])
    xlabels = sorted(list(words), key = lambda x: word_key[x].decode('UTF-8'))
    y_adds = sorted(list(y_adds), key = lambda x: word_key[x].decode('UTF-8'))
    ylabels = xlabels + y_adds

    m = len(ylabels)
    n = len(xlabels)

    #print(ylabels)
    #print(xlabels)

    #rows results, columns expected
    data = np.zeros((m, n))

    for key in results:
        sub_arr = results[key]
        xpos = xlabels.index(key)
        s = 0
        for i in range(len(sub_arr)):
            s += sub_arr[i][1]

        for i in range(len(sub_arr)):
            ypos = ylabels.index(sub_arr[i][0])
            data[ypos][xpos] = sub_arr[i][1] / s

    for i in range(len(xlabels)):
        xlabels[i] = word_key[xlabels[i]].decode('UTF-8')
    for i in range(len(ylabels)):
        ylabels[i] = word_key[ylabels[i]].decode('UTF-8')

    #print(labels)
    return data, xlabels, ylabels

def show_random(data, xlabels, ylabels, dr, n):
    fig, ax = plt.subplots()
    im = ax.imshow(data)

    #ax.set_xticks(np.arange(data.shape[1]))
    #ax.set_xticklabels(xlabels, rotation=-90)
    #ax.set_yticks(np.arange(data.shape[0]))
    #ax.set_yticklabels(ylabels)
    ax.set_title(f"{n}-Way Results for DR {dr}")
    cbar = ax.figure.colorbar(im, ax=ax)
    cbar.ax.set_ylabel("", rotation=-90, va="bottom")
    fig.tight_layout()
    fig.set_size_inches(12, 12)
    fig.savefig(f"dr{dr}-{n}.png")
    plt.show()

In [216]:
def random_test(n, drs, p_range, graph = True):
    #Build the unchanging parts of the model
    net_object, word_key = load_model()
    
    word_bank = random_sel(word_key, n)
    ds = []

    for dr in drs:
        bp = os.path.join("graph_data", f"dr{dr}")
        t_dir = os.path.join(bp, "txts")
        c_dir = os.path.join(bp, "cgrams")
        
        results = dict()
        txt_set = []

        for f in os.listdir(t_dir):
            sp_des, title = extract_text(f)
            info = sp_des + title
            t_path = os.path.join(t_dir, f"{info}.txt")
            s = check_txt(t_path, word_bank, 3, 7)
            if s:
                txt_set.append((info, s))
        
        for t in txt_set:
            info = t[0]
            word_set = t[1]
            case = -1
            success = False
            for sd in os.listdir(c_dir):
                sd_path = os.path.join(c_dir, sd)
                if os.path.isdir(sd_path):
                    c_path = os.path.join(sd_path, f"{info}-{sd}.npy")
                    if os.path.isfile(c_path):
                        case += 1
                        cgram = np.load(c_path)
                        logits = run_cgram(cgram, net_object)
                        inds = distrib(logits, p_range)

                        res, dist = result_dist(word_set, word_key, inds)
                        if res:
                            success = True
                            target = dist[len(dist) - 1]
                            results = merge_dist(results, target, dist)
            if not success:
                cases = [[3,4], [3,4,5], [3,4,5,6,7]]
                for key in word_set:
                    for pos in cases[case]:
                        if pos in word_set[key]:
                            target = word_key.index(key)
                            results = merge_dist(results, target, dist)
                            break
        ds.append(results)
        if graph: #this being set to true messes up return
            data, xlabels, ylabels = random_nway(copy.deepcopy(results), 5)
            show_random(data, xlabels, ylabels, dr, 5)
    return ds, word_key

Fucntions to look at results data

(Helpers)

In [217]:
def check_self(word, dic):
    if word in dic:
        return dic[word]
    return 0

In [121]:
def zero_words(r, wb): #retrieves all words who never showed up in their own inputs
    print(f"{len(r)} total words.\n")
    zeros = []
    for key in r:
        if check_self(key, r[key]) == 0:
            zeros.append(wb[key])
    return zeros

def display_zero(zeros, dr):
    wstr = ''
    for word in zeros[:len(zeros) - 1]:
        wstr += f'{word}, '
    wstr += f"{zeros[len(zeros) - 1]}."
    print(f"{len(zeros)} words with 0 score in dialect {dr}, words are '{wstr}'\n")

def worst_words(r, wb, n): #retrieves n-worst non-zero results
    worst = []
    for key in r:
        val = check_self(key, r[key])
        if val != 0:
            if len(worst) < n:
                worst.append((wb[key], val))
                worst = sorted(worst, key = lambda x: x[1])
            else:
                if val < worst[n-1][1]:
                    worst.pop()
                    worst.append((wb[key], val))
                    worst = sorted(worst, key = lambda x: x[1])
    return worst

def display_worst(worst, dr, n):
    print(f"{n} worst-predicted words displayed in order from worst to best.")
    for i in range(n):
        w = worst[i][0]
        score = worst[i][1]
        print(f"Word number {i}: {w}. Score: {score}.")
    print("\n")

In [132]:
def shared_zeros(arr):
    zeros = dict()
    for l in arr:
        for z in l:
            if z in zeros:
                zeros[z] += 1
            else:
                zeros[z] = 1
    return zeros

def format_sz(sz):
    zeros = []
    for zero in sz:
        zeros.append((zero, sz[zero]))
    zeros = sorted(zeros, key = lambda x: x[1])[::-1]
    print(f"Shared zeroes from greatest to least frequency:")
    for pair in zeros:
        print(f"{pair[0]}: {pair[1]}")
    print('\n')

def shared_worsts(arr):
    worsts = dict()
    for l in arr:
        for w in l:
            if w[0] in worsts:
                worsts[w[0]] += 1
            else:
                worsts[w[0]] = 1
    return worsts

def format_sw(sw):
    w = []
    for worst in sw:
        w.append((worst, sw[worst]))
    w = sorted(w, key = lambda x: x[1])[::-1]
    print(f"Shared worsts from greatest to least frequency:")
    for pair in w:
        print(f"{pair[0]}: {pair[1]}")
    print('\n')
            
def comb_shared(r):
    sums = dict()
    iters = dict()
    for dr in r:
        for key in dr:
            val = check_self(key, dr[key])
            if key in iters:
                iters[key] += 1
                sums[key] += val
            else:
                iters[key] = 1
                sums[key] = val
    combo = []
    for key in sums:
        combo.append((key, sums[key] / iters[key]))
    combo = sorted(combo, key = lambda x: x[1])
    return combo

def disp_shared(combo, n, wb):
    print(f"Lowest {n} results overall.")
    for i in range(min(n, len(combo))):
        w = combo[i][0]
        score = combo[i][1]
        print(f"Word number {i+1}: {wb[w]}. Score: {score}.")

Gathers data on specific words

In [None]:
drs = [i for i in range(1,9)]
r, wb = random_test(587, drs, 5, False)
n = 10
zeros = []
worsts = []
#print(wb)
for i in range(1,9):
    res = r[i - 1]
    zero = zero_words(res, wb)
    zeros.append(zero)
    display_zero(zero, i)
    worst = worst_words(res, wb, n)
    worsts.append(worst)
    display_worst(worst, i, n)
sz = shared_zeros(zeros)
format_sz(sz)
sw = shared_worsts(worsts)
format_sw(sw)
disp_shared(comb_shared(r), 50, wb)

In [164]:
def swaps(r):
    swaps = dict()
    c = 0
    for dr in r:
        c += 1
        for word in dr:
            #print(word)
            wDict = dr[word]
            #print(wDict)
            for pred in wDict:
                if pred in dr and word != pred and word in dr[pred]:
                    v1 = wDict[pred]
                    v2 = dr[pred][word]
                    pair = (word, pred)
                    res = (v1, v2, c)
                    if pair in swaps:
                        swaps[pair].append(res)
                    else:
                        swaps[pair] = [res]
    return swaps

def proc_swaps(swaps, wb):
    for pair in swaps:
        p1, p2 = pair[0], pair[1]
        l = len(swaps[pair])
        print(f"{l} swap(s) for pair '{wb[p1]}' and '{wb[p2]}'.")
        for i in range(l):
            print(f"For swap number {i + 1} the strengths were {swaps[pair][i][0]} and {swaps[pair][i][1]}, in dr {swaps[pair][i][2]}.")

In [177]:
def full_success_graph(r): #graphs for each word what its prediction is in all 8 dialects
    data = np.zeros((8, 587))
    for i in range(8):
        for word in range(587):
            dr = r[i]
            if word in dr:
                subDict = dr[word]
                data[i][word] = check_self(word, subDict)
            else:
                data[i][word] = float("NaN")
    #print(data)
    fig, ax = plt.subplots()
    im = ax.imshow(data)

    ax.set_title(f"Total Results")
    cbar = ax.figure.colorbar(im, ax=ax)
    cbar.ax.set_ylabel("", rotation=-90, va="bottom")
    fig.tight_layout()
    fig.set_size_inches(12, 12)
    fig.savefig(f"full success.png")
    plt.show()

In [192]:
def failure_graph(r, f_thresh, n_thresh, wb):
    fs = dict()
    for i in range(8):
        dr = r[i]
        for key in dr:
            if check_self(key, dr[key]) <= f_thresh:
                if key in fs:
                    fs[key] += 1
                else:
                    fs[key] = 1
    words = []
    for key in fs:
        if fs[key] >= n_thresh:
            words.append((key, fs[key]))

    words = sorted(words, key = lambda x: x[1])[::-1]
    l = len(words)
    x = [i for i in range(l)]
    heights = []
    x_labels = []
    for i in range(l):
        heights.append(words[i][1])
        x_labels.append(wb[words[i][0]])
    
    #print(x_labels)
    #print(heights)
    fig, ax = plt.subplots()
    plt.bar(x, heights)
    ax.set_ylabel("No. of Failures")
    ax.set_xticks(np.arange(len(x_labels)))
    ax.set_xticklabels(x_labels, rotation=-90)
    ax.set_title(f"Failure Graph")
    fig.tight_layout()
    fig.set_size_inches(12, 12)
    fig.savefig(f"failure.png")
    plt.show()

In [223]:
def av_dialect_score(r):
    s = []
    for i in range(8):
        scores = 0
        total = 0
        dr = r[i]
        for key in dr:
            #print(key, dr[key])
            scores += check_self(key, dr[key])
            total += 1
        print(f"Score for dr {i+1}: {scores/total}.")
        s.append(scores/total)
    return s

def graph_di_score(s):
    dialects = ['New England', 'Northern', 'North Midland', 'South Midland', 'Southern', 'NYC', 'Western', 'Army Brat']
    l = len(s)
    m = max(s)
    x = [i for i in range(l)]
    heights = []
    x_labels = []
    for i in range(l):
        heights.append(s[i] / m)
        x_labels.append(dialects[i])
    
    #print(x_labels)
    #print(heights)
    fig, ax = plt.subplots()
    plt.bar(x, heights)
    ax.set_ylabel("Relative Success")
    ax.set_xticks(np.arange(len(x_labels)))
    ax.set_xticklabels(x_labels, rotation=-90)
    ax.set_title(f"Dialectical Results")
    fig.tight_layout()
    fig.set_size_inches(12, 12)
    fig.savefig(f"dialects.png")
    plt.show()

In [212]:
drs = [i for i in range(1,9)]
r, wb = random_test(587, drs, 5, False)

In [None]:
graph_di_score(av_dialect_score(r))