In [2]:
# Importing the required packages
import numpy as np, pandas as pd, random, re, speech_recognition as sr, string, time
from googletrans import Translator
from pickle import dump
from pydub import AudioSegment as AS
from scipy.stats import t
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from unicodedata import normalize
from yandex_translate import YandexTranslate

# Speech Recogition <=> speech to text
# Defining a function to ignore everything but letters and space
def compromise(text_to_be_modified):
    return(re.sub('[^a-zA-Z" "]', '', text_to_be_modified))

# Defining a function to compute Levenshtein Distance
def levenshtein(source, target):
    if len(source) < len(target):
        return(levenshtein(target, source))
    if len(source) == 0:
        return(0)
    else:
        if len(target) == 0:
            return(1)
        source = np.array(tuple(source))
        target = np.array(tuple(target))
        previous_row = np.arange(target.size + 1)
        for s in source:
            current_row = previous_row + 1
            current_row[1:] = np.minimum(current_row[1:], np.add(previous_row[:-1], target != s))
            current_row[1:] = np.minimum(current_row[1:], current_row[0:-1] + 1)
            previous_row = current_row
        return(previous_row[-1] / len(source))

# Defining a function to calculate word error rate
def wer(recognised, actual):
    recognised = recognised.split()
    actual = actual.split()
    d = np.zeros(((len(recognised) + 1), (len(actual) + 1)), dtype = int)
    for i in range(len(recognised) + 1):
        for j in range(len(actual) + 1):
            if i == 0:
                d[0][j] = j
            elif j == 0:
                d[i][0] = i
    for i in range(1, len(recognised) + 1):
        for j in range(1, len(actual) + 1):
            if recognised[i - 1] == actual[j - 1]:
                d[i][j] = d[i - 1][j - 1]
            else:
                substitution = d[i - 1][j - 1] + 1
                insertion = d[i][j - 1] + 1
                deletion = d[i - 1][j] + 1
                d[i][j] = min(substitution, insertion, deletion)
    return(d[len(recognised)][len(actual)] / len(actual))

# Defining a function to add perturbation to audio files
def perturbating(true_audio_file, false_audio_file):
    sound_1 = AS.from_file(true_audio_file)
    sound_2 = AS.from_file(false_audio_file)
    combined = sound_1.overlay(sound_2)
    combined.export(true_audio_file[:(len(true_audio_file) - 4)] + "+" + false_audio_file[:(len(false_audio_file) - 4)] + ".wav", format = 'wav')   

# Defining a function to read audios for speech recognition
def read(audio_file):
    r = sr.Recognizer()
    with sr.AudioFile(audio_file) as source:
        audio = r.record(source)
    return(audio)

# Defining a function to detect speech by Google
def google_recognise(recorded_audio):
    r = sr.Recognizer()
    try:
        google = r.recognize_google(recorded_audio)
    except (sr.UnknownValueError, sr.RequestError):
        google = ""
    return(google)

# Defining a function to detect speech by Wit
def wit_recognise(recorded_audio):
    r = sr.Recognizer()
    try:
        wit = r.recognize_wit(recorded_audio, 'TRQFH3GKSWAR7RUW3MNZMTETZQXS5OL2')
    except (sr.UnknownValueError, sr.RequestError):
        wit = ""
    return(wit)

# Defining a function to comapre the two methods
def speech_recognition_api_comparison(correct_audio_file, reference, perturbed_audio_files = [], print_outputs = False, ignore_case_punctuation_marks = True):
    audios = [correct_audio_file]
    for perturbation in perturbed_audio_files:
        perturbating(correct_audio_file, perturbation)
        audios.append(correct_audio_file[:(len(correct_audio_file) - 4)] + "+" + perturbation[:(len(perturbation) - 4)] + ".wav")
    read_audios = [read(audio) for audio in audios]
    google_recognised_texts = [google_recognise(records) for records in read_audios]
    wit_recognised_texts = [wit_recognise(records) for records in read_audios]
    if print_outputs == True:
        print("\nActual: ", reference, "\nGoogle: ", google_recognised_texts, "\nWit:\t", wit_recognised_texts, "\n")
    if ignore_case_punctuation_marks == True:
        reference = compromise(reference)
        google = [compromise(texts) for texts in google_recognised_texts]
        wit = [compromise(texts) for texts in wit_recognised_texts]
    google_levenshtein = [levenshtein(recognitions, reference) for recognitions in google_recognised_texts]
    wit_levenshtein = [levenshtein(recognitions, reference) for recognitions in wit_recognised_texts]
    google_wer = [wer(recognitions, reference) for recognitions in google_recognised_texts]
    wit_wer = [wer(recognitions, reference) for recognitions in wit_recognised_texts]
    return(google_levenshtein + wit_levenshtein + google_wer + wit_wer)

# Reading the two databases
audio_files_1 = pd.read_csv('rhys_mcg.csv', header = None)
n_1 = len(audio_files_1)
filename_1 = audio_files_1.iloc[:, 0]
text_1 = audio_files_1.iloc[:, 1]
audio_files_2 = pd.read_csv('speech_to_text_benchmark.csv', header = None)
n_2 = len(audio_files_2)
filename_2 = audio_files_2.iloc[:, 0]
text_2 = audio_files_2.iloc[:, 1]

# Study of first database without perturbation
# Sampling k1 files from n1 files
k_1 = 30
indices_1 = list(range(n_1))
random_indices_1 = random.sample(indices_1, k_1)

# Comparing the APIs for the sampled files
results_1 = np.empty([k_1, 4])
for counter, random_index in enumerate(random_indices_1):
    results_1[counter] = speech_recognition_api_comparison((str(filename_1[random_index]) + ".wav"), text_1[random_index])
    time.sleep(3)

# Test of Significance
average_errors_1 = np.mean(results_1, 0)
sd_errors_1 = np.std(results_1, 0)
t_statistics_1 = (average_errors_1 - 0) / (sd_errors_1 / (k_1 ** 0.5))
decisions_1 = ["Reject" if t_stat > t.isf(0.05, (k_1 - 1)) else "Accept" for t_stat in t_statistics_1]

# Results for the Sampled Files
print("\nRandom Indices: ", random_indices_1)
print("\nAverage Errors: ", np.around(average_errors_1, 4))
print("\nDecisions:\t", decisions_1)

# Study of second database with perturbation
# Perturbations to be used
perturbation = ["doing_the_dishes.wav", "exercise_bike.wav", "pink_noise.wav", "running_tap.wav","white_noise.wav"]

# Sampling k2 files from n2 files
k_2 = 20
indices_2 = list(range(n_2))
random_indices_2 = random.sample(indices_2, k_2)

# Comparing the APIs for the sampled files
results_2 = np.empty([k_2, (4 * (len(perturbation) + 1))])
for counter, random_index in enumerate(random_indices_2):
    results_2[counter] = speech_recognition_api_comparison(filename_2[random_index], text_2[random_index], perturbation)
    time.sleep(3)

# Test of Significance
average_errors_2 = np.mean(results_2, 0)
sd_errors_2 = np.std(results_2, 0)
t_statistics_2 = (average_errors_2 - 0) / (sd_errors_2 / (k_2 ** 0.5))
decisions_2 = ["Reject" if t_stat > t.isf(0.05, (k_2 - 1)) else "Accept" for t_stat in t_statistics_2]

# Results for the Sampled Files
print("\nRandom Indices: ", random_indices_2)
print("\nAverage Errors: \n", average_errors_2)
print("\nDecisions:\t", decisions_2)

# Machine Translation <=> text to text
# Defining a function to translate by Google
def google_translate(english_text):
    return(Translator().translate(english_text, src = 'en', dest = 'fr').text)

# Defining a function to translate by Yandex
def yandex_translate(english_text):
    return(YandexTranslate('trnsl.1.1.20180604T085058Z.6a9b155f56ef80dd.f86700fd9d7791ba171d3bf5640be786c71244a4').translate(english_text, 'en-fr')['text'][0])

# Defining a function to compare the two methods
def machine_translation_api_comparison(english_sentence, french_sentence):
    google = google_translate(english_sentence)
    yandex = yandex_translate(english_sentence)
    google_vector = TfidfVectorizer().fit_transform([google, french_sentence])
    yandex_vector = TfidfVectorizer().fit_transform([yandex, french_sentence])
    google_cosine_similarity = cosine_similarity(google_vector[0,], google_vector[1,])
    yandex_cosine_similarity = cosine_similarity(yandex_vector[0,], yandex_vector[1,])
    return([google_cosine_similarity, yandex_cosine_similarity])

# Defining a function to load a document into memory
def load_doc(filename):
    file = open(filename, mode = 'rt', encoding = 'utf-8')
    text = file.read()
    file.close()
    return(text)

# Defining a function to split a loaded document into sentences
def to_sentences(doc):
    return(doc.strip().split('\n'))

# Defining a function to clean a list of lines
def clean_lines(lines):
    cleaned = list()
    # prepare regex for char filtering
    re_print = re.compile('[^%s]' % re.escape(string.printable))
    # prepare translation table for removing punctuation
    table = str.maketrans('', '', string.punctuation)
    for line in lines:
        # normalize unicode characters
        line = normalize('NFD', line).encode('ascii', 'ignore')
        line = line.decode('UTF-8')
        # tokenize on white space
        line = line.split()
        # convert to lower case
        line = [word.lower() for word in line]
        # remove punctuation from each token
        line = [word.translate(table) for word in line]
        # remove non-printable chars form each token
        line = [re_print.sub('', w) for w in line]
        # remove tokens with numbers in them
        line = [word for word in line if word.isalpha()]
        # store as string
        cleaned.append(' '.join(line))
    return(cleaned)

# Reading the two ccorpora
english_corpus = 'europarl-v7.fr-en.en'
english_document = load_doc(english_corpus)
english_sentences = to_sentences(english_document)
english_sentences = clean_lines(english_sentences)
french_corpus = 'europarl-v7.fr-en.fr'
french_document = load_doc(french_corpus)
french_sentences = to_sentences(french_document)
french_sentences = clean_lines(french_sentences)
n_3 = len(english_sentences)

# Sampling k3 files from n3 files
k_3 = 50
indices_3 = list(range(n_3))
random_indices_3 = random.sample(indices_3, k_3)

# Comparing the APIs for the sampled files
results_3 = np.empty([k_3, 2])
for counter, random_index in enumerate(random_indices_3):
    results_3[counter] = machine_translation_api_comparison(english_sentences[random_index], french_sentences[random_index])
    time.sleep(3)

# Test of Significance
average_cosine_similarity = np.mean(results_3, 0)
sd_cosine_similarity = np.std(results_3, 0)
t_statistics_3 = (average_cosine_similarity - 1) / (sd_cosine_similarity / (k_3 ** 0.5))
decisions_3 = ["Reject" if t_stat < t.isf(0.95, (k_3 - 1)) else "Accept" for t_stat in t_statistics_3]

# Results for the Sampled Files
print("\nRandom Indices: ", random_indices_3)
print("\nAverage Cosine Similarities: ", np.around(average_cosine_similarity, 4))
print("\nDecisions:\t", decisions_3)


Random Indices:  [101, 102, 14, 17, 16, 70, 1, 10, 42, 47, 7, 100, 87, 69, 57, 35, 6, 0, 67, 54, 81, 91, 64, 31, 84, 13, 23, 63, 98, 26]

Average Errors:  [0.3878 0.1916 0.6221 0.4842]

Decisions:	 ['Reject', 'Reject', 'Reject', 'Reject']

Random Indices:  [826, 124, 804, 226, 342, 344, 795, 765, 990, 582, 515, 256, 432, 117, 557, 502, 507, 406, 657, 908]

Average Errors: 
 [0.28063306 0.71946963 0.66862643 0.79289759 0.80104164 0.71013695
 0.12611055 0.69480752 0.62038353 0.72771856 0.72865124 0.70723496
 0.48614193 0.87865842 0.80640339 0.89505037 0.92556319 0.82620421
 0.32446107 0.80368939 0.79274617 0.87794534 0.87112637 0.82986264]

Decisions:	 ['Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject', 'Reject']

Random Indices:  [840653, 1617751, 1650128, 791866, 1531337, 1450964, 190561, 2003370, 62400, 5333