In [1]:
# specific packages
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor, AutoModelForCTC, Wav2Vec2ProcessorWithLM
from datasets import load_dataset, Audio, load_metric

# General packages
import os
from pathlib import Path
import numpy as np
import glob

In [2]:
MAIN_DIR = Path("/home/user/code/sbb_project")
DATA_DIR = MAIN_DIR.joinpath("data_sbb")
TEST_DATA_DIR = DATA_DIR.joinpath("zeroshot_data")

In [3]:
files = glob.glob(os.path.join(TEST_DATA_DIR, '*.wav'))

In [4]:
ground_truth = "gleis alpha vier vier via gleis beta vier fünf"

In [5]:
model = AutoModelForCTC.from_pretrained("fxtentacle/wav2vec2-xls-r-1b-tevr").to("cuda")

In [6]:
class HajoProcessor(Wav2Vec2ProcessorWithLM):
    @staticmethod
    def get_missing_alphabet_tokens(decoder, tokenizer):
        return []
processor = HajoProcessor.from_pretrained("fxtentacle/wav2vec2-xls-r-1b-tevr")

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [7]:
def predict_single_audio(batch, image=False):    
    audio = batch['audio']['array']
    # resample, if needed
    if batch['audio']['sampling_rate'] != 16000:
        audio = T.Resample(orig_freq=batch['audio']['sampling_rate'], new_freq=16000)(torch.from_numpy(audio)).numpy()
    # normalize
    audio = (audio - audio.mean()) / np.sqrt(audio.var() + 1e-7)
    # ask HF processor to prepare audio for GPU eval
    input_values = processor(audio, return_tensors="pt", sampling_rate=16_000).input_values
    # call model on GPU
    with torch.no_grad():
        logits = model(input_values.to('cuda')).logits.cpu().numpy()[0]
    # ask HF processor to decode logits
    decoded = processor.decode(logits, beam_width=500)
    # return as dictionary
    return { 'groundtruth': text_fix(batch['sentence']), 'prediction': decoded.text }

In [8]:
import os
from functools import lru_cache
from typing import Union

import ffmpeg
import numpy as np
import torch
import torch.nn.functional as F

def exact_div(x, y):
    assert x % y == 0
    return x // y

# hard-coded audio hyperparameters
SAMPLE_RATE = 16000
N_FFT = 400
N_MELS = 80
HOP_LENGTH = 160
CHUNK_LENGTH = 30
N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE  # 480000: number of samples in a chunk
N_FRAMES = exact_div(N_SAMPLES, HOP_LENGTH)  # 3000: number of frames in a mel spectrogram input


def load_audio(file: str, sr: int = SAMPLE_RATE):
    """
    Open an audio file and read as mono waveform, resampling as necessary
    Parameters
    ----------
    file: str
        The audio file to open
    sr: int
        The sample rate to resample the audio if necessary
    Returns
    -------
    A NumPy array containing the audio waveform, in float32 dtype.
    """
    try:
        # This launches a subprocess to decode audio while down-mixing and resampling as necessary.
        # Requires the ffmpeg CLI and `ffmpeg-python` package to be installed.
        out, _ = (
            ffmpeg.input(file, threads=0)
            .output("-", format="s16le", acodec="pcm_s16le", ac=1, ar=sr)
            .run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True)
        )
    except ffmpeg.Error as e:
        raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e

    return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0


def pad_or_trim(array, length: int = N_SAMPLES, *, axis: int = -1):
    """
    Pad or trim the audio array to N_SAMPLES, as expected by the encoder.
    """
    if torch.is_tensor(array):
        if array.shape[axis] > length:
            array = array.index_select(dim=axis, index=torch.arange(length, device=array.device))

        if array.shape[axis] < length:
            pad_widths = [(0, 0)] * array.ndim
            pad_widths[axis] = (0, length - array.shape[axis])
            array = F.pad(array, [pad for sizes in pad_widths[::-1] for pad in sizes])
    else:
        if array.shape[axis] > length:
            array = array.take(indices=range(length), axis=axis)

        if array.shape[axis] < length:
            pad_widths = [(0, 0)] * array.ndim
            pad_widths[axis] = (0, length - array.shape[axis])
            array = np.pad(array, pad_widths)

    return array



In [9]:
def wer(ref, hyp ,debug=True):
    r = ref.split()
    h = hyp.split()
    if (len(h) == 0) or (len(r) == 0):
        return {'WER':9, 'numCor':9, 'numSub':9, 'numIns':9, 'numDel':9, "numCount": len(r)}
    #costs will holds the costs, like in the Levenshtein distance algorithm
    costs = [[0 for inner in range(len(h)+1)] for outer in range(len(r)+1)]
    # backtrace will hold the operations we've done.
    # so we could later backtrace, like the WER algorithm requires us to.
    backtrace = [[0 for inner in range(len(h)+1)] for outer in range(len(r)+1)]
 
    OP_OK = 0
    OP_SUB = 1
    OP_INS = 2
    OP_DEL = 3
    DEL_PENALTY = 1
    INS_PENALTY = 1
    SUB_PENALTY = 1
    
    # First column represents the case where we achieve zero
    # hypothesis words by deleting all reference words.
    for i in range(1, len(r)+1):
        costs[i][0] = DEL_PENALTY*i
        backtrace[i][0] = OP_DEL
    
    # First row represents the case where we achieve the hypothesis
    # by inserting all hypothesis words into a zero-length reference.
    for j in range(1, len(h) + 1):
        costs[0][j] = INS_PENALTY * j
        backtrace[0][j] = OP_INS
    
    # computation
    for i in range(1, len(r)+1):
        for j in range(1, len(h)+1):
            if r[i-1] == h[j-1]:
                costs[i][j] = costs[i-1][j-1]
                backtrace[i][j] = OP_OK
            else:
                substitutionCost = costs[i-1][j-1] + SUB_PENALTY # penalty is always 1
                insertionCost    = costs[i][j-1] + INS_PENALTY   # penalty is always 1
                deletionCost     = costs[i-1][j] + DEL_PENALTY   # penalty is always 1
                 
                costs[i][j] = min(substitutionCost, insertionCost, deletionCost)
                if costs[i][j] == substitutionCost:
                    backtrace[i][j] = OP_SUB
                elif costs[i][j] == insertionCost:
                    backtrace[i][j] = OP_INS
                else:
                    backtrace[i][j] = OP_DEL
                 
    # back trace though the best route:
    i = len(r)
    j = len(h)
    numSub = 0
    numDel = 0
    numIns = 0
    numCor = 0
    if debug:
        print("OP\tREF\tHYP")
        lines = []
    while i > 0 or j > 0:
        if backtrace[i][j] == OP_OK:
            numCor += 1
            i-=1
            j-=1
            if debug:
                lines.append("OK\t" + r[i]+"\t"+h[j])
        elif backtrace[i][j] == OP_SUB:
            numSub +=1
            i-=1
            j-=1
            if debug:
                lines.append("SUB\t" + r[i]+"\t"+h[j])
        elif backtrace[i][j] == OP_INS:
            numIns += 1
            j-=1
            if debug:
                lines.append("INS\t" + "****" + "\t" + h[j])
        elif backtrace[i][j] == OP_DEL:
            numDel += 1
            i-=1
            if debug:
                lines.append("DEL\t" + r[i]+"\t"+"****")
    if debug:
        lines = reversed(lines)
        for line in lines:
            print(line)
        print("#cor " + str(numCor))
        print("#sub " + str(numSub))
        print("#del " + str(numDel))
        print("#ins " + str(numIns))
    # return (numSub + numDel + numIns) / (float) (len(r))
    wer_result = round( (numSub + numDel + numIns) / (float) (len(r)), 3)
    return {'WER':wer_result, 'numCor':numCor, 'numSub':numSub, 'numIns':numIns, 'numDel':numDel, "numCount": len(r)}

In [10]:
preds = list()
for file in files:
    audio = load_audio(file)
    #audio = pad_or_trim(audio)
    audio = (audio - audio.mean()) / np.sqrt(audio.var() + 1e-7)
    input_values = processor(audio, return_tensors="pt", sampling_rate=16_000).input_values
    with torch.no_grad():
        logits = model(input_values.to('cuda')).logits.cpu().numpy()[0]
    decoded = processor.decode(logits, beam_width=500)
    preds.append(decoded.text)

In [11]:
print(preds)

['was er für', 'was auf vier greis der vierte', 'verfiel vier reiste vier fünf', 'kreis aus vier vier reiste vier fünf', 'bis auf vier später vier', 'gleich auf vier vier vier gleis später vier fünf', '', 'gleich vier die räder vier fünf', 'auf ihr gleich viel', 'bereits vier graf', 'bis auf vier gleise vierte', 'er ist']


In [12]:
wers = list()
for pred in preds:
    word_error = wer(pred, ground_truth, debug=False)
    wers.append(word_error["WER"])

In [13]:
print("WER for all sentences: \n", wers)
print("Average WER: {} \n Minimum WER: {} \n Maximum WER: {}".format(np.mean(wers), np.min(wers), np.max(wers)))

WER for all sentences: 
 [3.0, 1.333, 1.2, 0.714, 1.4, 0.444, 9, 1.0, 2.25, 2.667, 1.6, 4.5]
Average WER: 2.425666666666667 
 Minimum WER: 0.444 
 Maximum WER: 9.0
