# Error Visualization

- Quantifying Errors
	- From Determined Test Cases,  
For each word, measure the number of *correctly* and *incorrectly* interpreted
- Classifying Errors
	- Miss interpretation of the words  
For each word, log the transcription, what is the the misinterpretation of the error  
→ calculate the occurence  
→ most common error for a given word


In [22]:
import os, pathlib
import numpy
import glob
import jiwer
import collections
import helper

# TODO: remove warning, put text preprocessing as helper


In [23]:
class Data:

    def __init__(self, name):
        self.name = name
        self.reference = []
        self.transcription = []

    def preprocess_text(self, text):
        return helper.preprocess_text(text)

    def get_name(self):
        return self.name
    
    def get_reference(self):
        return self.reference

    def get_transcription(self):
        return self.transcription

    def add_reference(self, reference):
        self.reference.append(self.preprocess_text(reference))
    
    def add_transcription(self, transcription):
        self.transcription.append(self.preprocess_text(transcription))

    def add_reference_transcription(self, reference, transcription):
        self.add_reference(reference)
        self.add_transcription(transcription)
    
    def length(self):
        assert len(self.reference) == len(self.transcription)
        return len(self.reference)

    def print_reference_transcription(self, i):
        if i >= 0 and i < len(self.reference):
            print("Reference:   \t: ", self.reference[i])
            print("Transcription: \t: ", self.transcription[i])

    
    def print_head(self):
        self.print_reference_transcription(i=0)

    def print_tail(self):
        self.print_reference_transcription(i=self.length()-1)


In [24]:
def read_librispeech_data():
    data = Data("librispeech")
    root_dir = "LibriSpeech/test-clean/"
    model_dir = "deepspeech"

    i = 0

    for filename in glob.iglob(root_dir + '**/*.trans.txt', recursive=True):
        
        file = open(filename)

        for line in file.readlines():
            idx = line.split()[0]
            reference_text = " ".join(line.split()[1:])

            fid = "/".join(idx.split("-")[:-1]) # idx to file id

            fname = os.path.join(root_dir, fid, idx)
            transcription_path = fname + "." + model_dir + ".transcription.txt"
            if os.path.exists(transcription_path):
                transcription = helper.read_transcription(transcription_path)
                data.add_reference_transcription(reference_text, transcription)
                i += 1
            else:
                raise ValueError("missing transcription: " + transcription_path)

        file.close()
    
    
    return data


In [25]:
data = read_librispeech_data()
data.print_head()
print()
data.print_tail()

Reference:   	:  eleven oclock had struck it was a fine clear night they were the only persons on the road and they sauntered leisurely along to avoid paying the price of fatigue for the recreation provided for the toledans in their valley or on the banks of their river
Transcription: 	:  eleven oclock had struck it was a fine clear night there were the only persons on the road and they sauntered leisurely along to avoid paying the price of fatigue for the recreation provided for the toledans in the valley or on the banks of their river

Reference:   	:  then the leader parted from the line
Transcription: 	:  then the leader parted from the line


In [26]:
def read_corpus(corpus_fpath: str):
    file = open(corpus_fpath)
    corpus = file.readlines()
    texts = []
    for text in corpus:
        texts.append(text[:-1])

    return texts

def read_crossasr_data():
    data = Data("crossasr")

    mode = "europarl"
    mode = "librispeech"

    if mode == "europarl" :
        corpus_fpath = "CrossASR/europarl-seed2021/corpus/europarl-20000.txt"
        transcription_dir = "CrossASR/europarl-seed2021/data/transcription"
        tts_name = "rv"
    elif mode == "librispeech" :
        corpus_fpath = "CrossASR/librispeech-crossasr/corpus/librispeech-test-clean-corpus.txt"
        transcription_dir = "CrossASR/librispeech-crossasr/data/transcription"
        tts_name = "google"

    
    asr_name = "deepspeech"
    transcription_dir = os.path.join(transcription_dir, tts_name)
    transcription_dir = os.path.join(transcription_dir, asr_name)

    references = read_corpus(corpus_fpath)
    
    for i in range(len(references)):
        if mode == "europarl" :
            transcription_path = os.path.join(transcription_dir, f"{i+1}.txt")
        elif mode == "librispeech" :
            transcription_path = os.path.join(transcription_dir, f"{i}.txt")
        transcription = helper.read_transcription(transcription_path)

        data.add_reference_transcription(references[i], transcription)

    return data



In [27]:
data = read_crossasr_data()
data.print_head()
print()
data.print_tail()

Reference:   	:  eleven oclock had struck it was a fine clear night they were the only persons on the road and they sauntered leisurely along to avoid paying the price of fatigue for the recreation provided for the toledans in their valley or on the banks of their river
Transcription: 	:  oclock had struck it was a fine clear night they were the only persons on the road and day sauntered leisurely along to avoid paying the price of fatigue for the recreation provided for the toledans in their valley or on the banks of their river

Reference:   	:  then the leader parted from the line
Transcription: 	:  then the leader parted from the line


In [28]:
from asr_evaluation.asr_evaluation import asr_evaluation

class Analyzer(object):

    
    def __init__(self):
        ## used for caching
        self.infos = {}
        self.word_count = {}
        self.word_accuracy = {}
        self.word_accuracy_with_count = {}
        self.common_errors = {}

        
    def analyze(self, data: Data):

        ## get from cache if it is already computed before
        if data.get_name() in self.infos :
            return self.infos[data.get_name()], self.word_count[data.get_name()]
        
        infos = []
        word_count = collections.Counter()
        
        for reference, transcription, in zip(data.get_reference(), data.get_transcription()) :
            
            ## create statistics for word counter
            word_count += collections.Counter(reference.split())
            
            ## create statistics for errors
            wer = jiwer.wer(reference, transcription)
            if wer != 0:
                evaluation = asr_evaluation.ASREvaluation()
                evaluation.detect_word_error(reference, transcription)
                confusion = evaluation.get_confusions()
                infos.append(
                    {"confusion": confusion, "reference": reference, "transcription": transcription})
        
        ## update the cache
        self.infos[data.get_name()] = infos
        self.word_count[data.get_name()] = word_count
        
        
        return infos, word_count

    def calculate_word_accuracy(self, data: Data):
        """Calculate word accuracy, which is the number of error (deletion or subsitution) divided by the number of word count

        """

        ## use caching if it is already computed before
        if data.get_name() in self.word_accuracy:
            return self.word_accuracy[data.get_name()]

        
        infos, word_count = self.analyze(data)
        error_count = collections.Counter()
        for info in infos:
            confusion = info["confusion"]

            ## get error from word substitution
            if len(confusion["substitution"]) > 0:
                
                curr_error_count = {}
                for i in range(len(confusion["substitution"])):
                    word_reference = confusion["substitution"][i]["word_reference"]
                    count = confusion["substitution"][i]["count"]

                    curr_error_count[word_reference] = count
                
                error_count += collections.Counter(curr_error_count)

            ## get error from word deletion
            if len(confusion["deletion"]) > 0:

                curr_error_count = {}
                for i in range(len(confusion["deletion"])):
                    word_reference = confusion["deletion"][i]["word"]
                    count = confusion["deletion"][i]["count"]

                    curr_error_count[word_reference] = count

                error_count += collections.Counter(curr_error_count)

        
        word_accuracy = {}
        word_accuracy_with_count = {}
        for word in word_count :
            
            ## if the word in the reference_text does not appear in the error word
            ## then all the occurence of the word is correctly predicted 
            if word not in error_count :
                word_accuracy[word] = 100.0 
                word_accuracy_with_count[word] = [100.0 , word_count[word]]
            else :
                curr_word_accuracy = 100.0 - (100 * round(error_count[word]/word_count[word], 2))
                assert curr_word_accuracy <= 100 and curr_word_accuracy >= 0
                word_accuracy[word] = curr_word_accuracy
                word_accuracy_with_count[word] = [curr_word_accuracy, word_count[word]]

        ## sort the word accuracy based on the word_accuracy
        word_accuracy = dict(sorted(word_accuracy.items(),
                                    key=lambda item: (item[1], item[0]), reverse=True))

        ## sort the word accuracy with count based on the word_accuracy
        word_accuracy_with_count = dict(sorted(word_accuracy_with_count.items(),
                                               key=lambda item: (item[1][0], -item[1][1], item[0]), reverse=False))

        
        ## update cache
        self.word_accuracy[data.get_name()] = word_accuracy
        self.word_accuracy_with_count[data.get_name()] = word_accuracy_with_count
        
        return word_accuracy

    def get_word_accuracy(self, dataset_name:str):
        if dataset_name in self.word_accuracy :
            return self.word_accuracy[dataset_name]
        return None

    def get_word_accuracy_with_count(self, dataset_name: str):
        if dataset_name in self.word_accuracy_with_count:
            return self.word_accuracy_with_count[dataset_name]
        return None


    def print_word_accuracy_with_minimum_count(self, data: Data, limit=0, minimium_count=10, ascending=True):
        
        if not data.get_name() in self.word_accuracy_with_count :
            self.calculate_word_accuracy(data)
        
        word_accuracy_with_count = self.word_accuracy_with_count[data.get_name()]
    
        keys = list(word_accuracy_with_count.keys())
        values = list(word_accuracy_with_count.values())
        print(f"{'Word':15s} {'Accuracy'} \tCount")
        
        j = 0
        for i in reversed(range(len(keys))) if ascending else range(len(keys)):
            if j < limit :
                if values[i][1] >= minimium_count :
                    print(f"{keys[i]:15s} {values[i][0]:} \t\t{values[i][1]:}")
                    j += 1


    def save_word_accuracy(self, dataset_name, fpath):
        word_accuracy = self.word_accuracy[dataset_name]
        os.makedirs(pathlib.Path(fpath).parent.absolute(), exist_ok=True)
        with open(fpath, 'w') as file:
            keys = list(word_accuracy.keys())
            values = list(word_accuracy.values())
            file.write(f"{'Word':15s} \t{'Accuracy'}\n")
            for i in range(len(keys)):
                file.write(f"{keys[i]:15s} \t{values[i]}\n")

    def save_word_accuracy_with_count(self, dataset_name, fpath):
        word_accuracy_with_count = self.word_accuracy_with_count[dataset_name]
        os.makedirs(pathlib.Path(fpath).parent.absolute(), exist_ok=True)
        with open(fpath, 'w') as file:
            keys = list(word_accuracy_with_count.keys())
            values = list(word_accuracy_with_count.values())
            file.write(f"{'Word':15s} \t{'Accuracy'} \tCount\n")
            for i in range(len(keys)):
                file.write(
                    f"{keys[i]:15s} \t{values[i][0]} \t{values[i][1]}\n")

    def print_lowest_word_accuracy(self, dataset_name, limit=10):
        word_accuracy = self.word_accuracy[dataset_name]
        keys = list(word_accuracy.keys())
        values = list(word_accuracy.values())
        print(f"{'Word':15s} {'Accuracy'}")
        for i in range(len(keys)-limit, len(keys)):
            print(f"{keys[i]:15s} {values[i]}")

    def print_highest_word_accuracy(self, dataset_name, limit=10):
        word_accuracy = self.word_accuracy[dataset_name]
        keys = list(word_accuracy.keys())
        values = list(word_accuracy.values())
        print(f"{'Word':15s} {'Accuracy'}")
        for i in range(limit):
            print(f"{keys[i]:15s} {values[i]}")


    def get_most_common_errors(self, data: Data):
        
        
        ## geta result from caching if it is already computed before
        if data.get_name() in self.common_errors:
            return self.common_errors[data.get_name()]


        infos, _ = self.analyze(data)

        ## TODO: use Counter library
        common_errors = {}
        for info in infos:
            confusion = info["confusion"]
            if len(confusion["substitution"]) > 0:
                for i in range(len(confusion["substitution"])):
                    word_reference = confusion["substitution"][i]["word_reference"]
                    word_substitution = confusion["substitution"][i]["word_substitution"]
                    count = confusion["substitution"][i]["count"]

                    if word_reference == "and" and word_substitution == "terrified":
                        print(info["reference"])
                        print(info["transcription"])


                    if word_reference in common_errors:
                        substitutions = common_errors[word_reference]
                        if word_substitution in substitutions:
                            common_errors[word_reference][word_substitution] = count + \
                                common_errors[word_reference][word_substitution]
                        else:
                            common_errors[word_reference][word_substitution] = count
                    else:
                        common_errors[word_reference] = {
                            word_substitution: count}

        ## sort things inside the common error
        for key in common_errors.keys():
            common_errors[key] = dict(sorted(common_errors[key].items(),
                                            key=lambda item: item[1], reverse=True))

        ## sort words based on the highest occurence
        common_errors = dict(sorted(common_errors.items(),
                                        key=lambda item: list(item[1].values())[0], reverse=True))
        
        self.common_errors[data.get_name()] = common_errors
        
        return common_errors

    def print_common_error(self, common_errors, limit=2):
        count = 0
        print_limit = 16
        for word, common in common_errors.items():
            if count < print_limit :
                print("Word: ", word)
                # print("Substituion: ")
                keys = list(common.keys())
                values = list(common.values())
                for i in range(min(limit, len(keys))):
                    print(f"\t{keys[i]:10s} count: {values[i]}")
            count += 1

    def save_common_errors(self, common_errors, fpath):
        os.makedirs(pathlib.Path(fpath).parent.absolute(), exist_ok=True)
        with open(fpath, 'w') as file:
            for word, common in common_errors.items():
                file.write(f"Word: {word}\n")
                keys = list(common.keys())
                values = list(common.values())
                for i in range(len(keys)):
                    file.write(f"\t{keys[i]:10s} count: {values[i]}\n")
            


In [29]:
analyzer = Analyzer()

data = read_librispeech_data()

analyzer.calculate_word_accuracy(data)
word_accuracy = analyzer.get_word_accuracy(data.get_name())

fpath = "output/librispeech/word_accuracy.txt"
analyzer.save_word_accuracy(data.get_name(), fpath)

fpath = "output/librispeech/word_accuracy_with_count.txt"
analyzer.save_word_accuracy_with_count(data.get_name(), fpath)


print("=== Lowest Accuracy Rate")
analyzer.print_lowest_word_accuracy(data.get_name())
print()

print("=== Highest Accuracy Rate")
analyzer.print_highest_word_accuracy(data.get_name())
print()


=== Lowest Accuracy Rate
Word            Accuracy
advised         0.0
adona           0.0
admittance      0.0
adherents       0.0
acknowledgement 0.0
accruing        0.0
abstractions    0.0
abolitionists   0.0
abduction       0.0
abbe            0.0

=== Highest Accuracy Rate
Word            Accuracy
zoology         100.0
zion            100.0
zeal            100.0
youth           100.0
yourselves      100.0
yourself        100.0
yours           100.0
younger         100.0
yorkshire       100.0
york            100.0



In [30]:
analyzer.print_word_accuracy_with_minimum_count(data, limit=20, minimium_count=10)


Word            Accuracy 	Count
wrong           100.0 		10
simply          100.0 		10
show            100.0 		10
shook           100.0 		10
second          100.0 		10
remember        100.0 		10
really          100.0 		10
ready           100.0 		10
purpose         100.0 		10
probably        100.0 		10
pretty          100.0 		10
ought           100.0 		10
noble           100.0 		10
nearly          100.0 		10
natural         100.0 		10
mistress        100.0 		10
met             100.0 		10
keep            100.0 		10
hung            100.0 		10
ground          100.0 		10


In [31]:
print("=== Most common errors")
common_errors = analyzer.get_most_common_errors(data)
fpath = "output/librispeech/common_errors.txt"
analyzer.save_common_errors(common_errors, fpath)
analyzer.print_common_error(common_errors)
print()


=== Most common errors
Word:  in
	and        count: 44
	an         count: 3
Word:  a
	the        count: 34
	of         count: 4
Word:  and
	in         count: 28
	a          count: 3
Word:  this
	the        count: 21
	spilling   count: 1
Word:  an
	and        count: 13
	on         count: 1
Word:  too
	to         count: 10
	two        count: 3
Word:  two
	to         count: 8
	lotto      count: 1
Word:  is
	as         count: 7
	his        count: 4
Word:  the
	a          count: 7
	to         count: 2
Word:  uncas
	once       count: 6
	one        count: 1
Word:  thee
	the        count: 6
	he         count: 4
Word:  of
	a          count: 6
	at         count: 2
Word:  anyone
	one        count: 6
Word:  men
	man        count: 6
	then       count: 1
Word:  boolooroo
	bolero     count: 6
	booleroo   count: 4
Word:  has
	had        count: 5
	as         count: 2



In [32]:
data = read_crossasr_data()

# analyzer = Analyzer()
analyzer.calculate_word_accuracy(data)
word_accuracy = analyzer.get_word_accuracy(data.get_name())

fpath = "output/crossasr/word_accuracy.txt"
analyzer.save_word_accuracy(data.get_name(), fpath)

fpath = "output/crossasr/word_accuracy_with_count.txt"
analyzer.save_word_accuracy_with_count(data.get_name(), fpath)


print("=== Lowest Accuracy Rate")
analyzer.print_lowest_word_accuracy(data.get_name())
print()

print("=== Highest Accuracy Rate")
analyzer.print_highest_word_accuracy(data.get_name())
print()


=== Lowest Accuracy Rate
Word            Accuracy
adona           0.0
addressing      0.0
acknowledgement 0.0
achievements    0.0
accounts        0.0
accordingly     0.0
accomplishment  0.0
accepting       0.0
abner           0.0
abbe            0.0

=== Highest Accuracy Rate
Word            Accuracy
zoology         100.0
zest            100.0
zeal            100.0
youth           100.0
younger         100.0
yorkshire       100.0
york            100.0
yonder          100.0
yoke            100.0
yielding        100.0



In [33]:
print("=== Most common errors")

common_errors = analyzer.get_most_common_errors(data)
fpath = "output/crossasr/common_errors.txt"
analyzer.save_common_errors(common_errors, fpath)

analyzer.print_common_error(common_errors)
print()


=== Most common errors
Word:  as
	is         count: 50
	his        count: 2
Word:  and
	in         count: 44
	the        count: 3
Word:  the
	that       count: 22
	there      count: 9
Word:  was
	with       count: 17
	as         count: 4
Word:  who
	he         count: 16
	you        count: 5
Word:  a
	the        count: 14
	of         count: 8
Word:  our
	a          count: 13
	us         count: 4
Word:  too
	to         count: 13
	two        count: 5
Word:  an
	and        count: 13
	in         count: 8
Word:  im
	in         count: 12
	him        count: 3
Word:  are
	and        count: 10
	a          count: 7
Word:  them
	him        count: 10
	then       count: 3
Word:  their
	the        count: 10
	there      count: 2
Word:  you
	he         count: 9
	the        count: 3
Word:  at
	it         count: 9
	that       count: 2
Word:  were
	was        count: 8
	with       count: 4



In [34]:
analyzer.print_word_accuracy_with_minimum_count(
    data, limit=20, minimium_count=10)


Word            Accuracy 	Count
talking         100.0 		10
speaking        100.0 		10
simply          100.0 		10
second          100.0 		10
remember        100.0 		10
really          100.0 		10
ready           100.0 		10
purpose         100.0 		10
probably        100.0 		10
pretty          100.0 		10
ought           100.0 		10
noble           100.0 		10
nearly          100.0 		10
natural         100.0 		10
moved           100.0 		10
mistress        100.0 		10
met             100.0 		10
knife           100.0 		10
hung            100.0 		10
hope            100.0 		10


## Insight 

It seems hard to compare the words intersection one-by-one. 
We will try qeurying the statistic for each word that has much error in CrossASR
Then get corresponding statistic from Librispeech data

### Combining Word Accuracy from the Two Results


In [35]:
cr_data = read_crossasr_data()   ## crossasr dataq
ls_data = read_librispeech_data()   ## librispeech data

analyzer = Analyzer()

analyzer.calculate_word_accuracy(cr_data)
analyzer.calculate_word_accuracy(ls_data)


cr_word_acc = analyzer.get_word_accuracy_with_count(cr_data.get_name())
ls_word_acc = analyzer.get_word_accuracy_with_count(ls_data.get_name())

#### For each error in librispeech word accuracy, inform the corresponding error on the crossasr 

In [44]:
import pandas as pd
df = pd.DataFrame(columns=["word", "ls_word_acc",
                  "cr_word_acc", "ls_word_count",  "cr_word_count"])
for word in ls_word_acc :
    if word in cr_word_acc :
        df = df.append({"word": word,
                        "ls_word_acc": ls_word_acc[word][0],
                        "cr_word_acc": cr_word_acc[word][0],
                        "ls_word_count": ls_word_acc[word][1],
                        "cr_word_count": cr_word_acc[word][1]
                        }, ignore_index=True)
    else :
        df = df.append({"word": word,
                        "ls_word_acc": ls_word_acc[word][0], 
                        "cr_word_acc" : -1,
                        "ls_word_count": ls_word_acc[word][1],
                        "cr_word_count": -1
                   }, ignore_index=True)

df


Unnamed: 0,word,ls_word_acc,cr_word_acc,ls_word_count,cr_word_count
0,boolooroo,0.0,0.0,12,12
1,timaeus,0.0,11.0,9,9
2,fitzooth,0.0,0.0,7,7
3,gamewell,0.0,0.0,7,7
4,anyone,0.0,17.0,6,6
...,...,...,...,...,...
8095,yell,100.0,100.0,1,1
8096,younger,100.0,100.0,1,1
8097,yourselves,100.0,0.0,1,1
8098,zion,100.0,0.0,1,1


In [None]:
df.to_csv("output/combined_word_accuracy.csv")