### Necessary Packages

These are the necessary packages for voice recognition (i.e. vosk and some other things) as well as interpreting the audio

In [32]:
# relevant packages
from vosk import Model, KaldiRecognizer, SetLogLevel
import sys
import os
import wave
import subprocess
import json
from scipy import signal
import matplotlib.pyplot as plt
import math as math
import numpy as np

SetLogLevel(0)


### Step 1

Develop functions to convert signal from Vox to wav

In [33]:
# some relevant functions...

# table of  quantizer step size
StepSizeTable = [16, 17, 19, 21, 23, 25, 28, 31, 34, 37, 41,
                 45, 50, 55, 60, 66, 73, 80, 88, 97, 107, 118, 130, 143, 157, 173,
                 190, 209, 230, 253, 279, 307, 337, 371, 408, 449, 494, 544, 598, 658,
                 724, 796, 876, 963, 1060, 1166, 1282, 1408, 1552]

# another conversion table
IndexTable = [-1, -1, -1, -1, 2, 4, 6, 8]


def ADPCM_Encode(sample):
    global index
    global predsample

    code = 0

    step_size = StepSizeTable[index]

    # compute diff and record sign and absolut value
    diff = sample - predsample
    if diff < 0:
        code = 8
        diff = -diff

    # quantize the diff into ADPCM code
    # inverse quantize the code into a predicted diff
    tmpstep = step_size
    diffq = step_size >> 3

    if diff >= tmpstep:
        code = code | 0x04
        diff -= tmpstep
        diffq = diffq + step_size

    tmpstep = tmpstep >> 1

    if diff >= tmpstep:
        code = code | 0x02
        diff = diff - tmpstep
        diffq = diffq + (step_size >> 1)

    tmpstep = tmpstep >> 1

    if diff >= tmpstep:
        code = code | 0x01
        diffq = diffq + (step_size >> 2)

    # fixed predictor to get new predicted sample
    if code & 8:
        predsample = predsample - diffq
    else:
        predsample = predsample + diffq

    # check for overflow
    if predsample > 32767:
        predsample = 32767
    elif predsample < -32768:
        predsample = -32768

    # find new stepsize index
    index += IndexTable[code]

    # check for overflow
    if index < 0:
        index = 0

    if index > 48:
        index = 48

    # return new ADPCM code   code & 0x0f == code
    return code & 0x0f


# ADPCM_Decode.
# code: a byte containing a 4-bit ADPCM sample.
# retval : 16-bit ADPCM sample
de_index = 0
de_predsample = 0

def ADPCM_Decode(code):
    global de_index
    global de_predsample

    step_size = StepSizeTable[de_index]

    # inverse code into diff
    diffq = step_size >> 3  # == step/8
    if code & 4:
        diffq += step_size

    if code & 2:
        diffq += step_size >> 1

    if code & 1:
        diffq += step_size >> 2

    # add diff to predicted sample
    if code & 8:
        diffq = -diffq

    de_predsample += diffq

    # check for overflow  clip the values to +/- 2^11 (supposed to be 16 bits)
    if de_predsample > 2047:
        de_predsample = 2047
    elif de_predsample < -2048:
        de_predsample = -2048

    # find new quantizer step size
    de_index += IndexTable[code]

    # check for overflow
    if de_index < 0:
        de_index = 0

    if de_index > 48:
        de_index = 48

    # save predict sample and de_index for next iteration
    # return new decoded sample
    # The original algorithm turned out to be 12bit, need to convert to 16bit
    return de_predsample << 4

def decodeTBS2(list_8bit):
    list_16bit = []
    for i in range(len(list_8bit)):
        byte_i = list_8bit[i]  # 1 bytes = 8bit
        high_4bit = (byte_i & 0xf0) >> 4  # split high 4bit from 8bit
        low_4bit = byte_i & 0x0f  # split low 4bit from 8bit

        # first sample
        sample_0 = high_4bit
        # unsigned to signed
        # 4bit : -2^4 ~ 2^(4-1)-1
        if sample_0 > 7:
            sample_4bit_0 = sample_0 - 16
        else:
            sample_4bit_0 = sample_0

        # second sample
        sample_1 = low_4bit
        # unsigned to signed
        if sample_1 > 7:
            sample_4bit_1 = sample_1 - 16
        else:
            sample_4bit_1 = sample_1

        # now decode
        tmpDeS16_0 = ADPCM_Decode(sample_4bit_0)
        tmpDeS16_1 = ADPCM_Decode(sample_4bit_1)

        list_16bit.extend([tmpDeS16_0, tmpDeS16_1])
        
    # decoded data
    return list_16bit

# make sure that list_16bit is a numpy array!
def toWav(list_16bit, name):
    wav_file = wave.open(name + '.wav', 'wb')

    # configure channel number, quantization size, and sample rate
    wav_file.setnchannels(1)
    wav_file.setsampwidth(2)
    wav_file.setframerate(16000)
    # converts data to binary data and writes it to a file
    wav_file.writeframes(list_16bit.tobytes())
    wav_file.close()

### Step 2

Filter audio signals...

In [34]:
def filtered(list_16bit):
    sos = signal.butter(10, [4000], 'lowpass', fs=16000, output='sos')
    return signal.sosfilt(sos, list_16bit)

def toWav(list_16bit, name):
    wav_file = wave.open(name + '.wav', 'wb')

    # configure channel number, quantization size, and sample rate
    wav_file.setnchannels(1)
    wav_file.setsampwidth(2)
    wav_file.setframerate(16000)
    # converts data to binary data and writes it to a file
    wav_file.writeframes(list_16bit.tobytes())
    wav_file.close()

### Step 3

Convert audio data to text through Vox and compare to preset commands

In [35]:
# convert audio from a wavefile e.g. "example.wav" to 
def wav2str(filename):
    sample_rate=16000
    # this should be the name
    model = Model("modelsmall")
    rec = KaldiRecognizer(model, sample_rate)

    wf = wave.open(filename, "rb")
    if wf.getnchannels() != 1 or wf.getsampwidth() != 2 or wf.getcomptype() != "NONE":
        print ("Audio file must be WAV format mono PCM.")
        exit (1)


    results = []
    subs = []
    while True:
        data = wf.readframes(4000)
        if len(data) == 0:
            break
        if rec.AcceptWaveform(data):
            results.append(rec.Result())
    results.append(rec.FinalResult())

    Strings = []
    for i, res in enumerate(results):
        jres = json.loads(res)
        if not 'result' in jres:
            continue
        words = jres['result']
        for j in range(len(words)):
            Strings.append(words[j]['word'])
        
    return Strings

# to compare two lists
def edit_dist(A, B):
    if len(A) <= len(B):                 # convenient notation to organize
        shorter,longer = A,B
    else:
        shorter,longer = B,A

    a = np.zeros((2,len(shorter) + 1), dtype=int) # matrix of values
    
    # get the first row
    for i in range(len(shorter)+1):
        a[0][i] = i                      # 0th row
    
    # get the rest of the rows
    for j in range(1,len(longer)+1):
        a[1][0] = j                          # first column
        for i in range(1,len(shorter)+1):
            a[1][i] = min([a[0][i-1] + (longer[j-1] != shorter[i-1]),
                           a[0][i] + 1,
                           a[1][i-1] + 1])
        a[0] = a[1]                          # push row back
    
    return(a[0][len(shorter)])           # return last value

# a method for comparing two lists of strings
# there are other ways this could be implemented
# this is just one way that worked
def list_compare(L1,L2):
    if not len(L1) and not len(L2):
        return 0
    
    D = np.zeros((len(L1)+1,len(L2)+1))
    for i in range(len(L1)):
        D[i+1,0] = i+1
    for j in range(len(L2)):
        D[0,j+1] = j+1
        
        
    for i in range(1,len(L1)+1):
        for j in range(1,len(L2)+1):
            D[i,j] = min([D[i-1][j-1] + edit_dist(L1[i-1],L2[j-1])/max(len(L1[i-1]),len(L2[j-1])),
                           D[i-1][j] + 1,
                           D[i][j-1] + 1])
    return D[len(L1),len(L2)]/max(len(L1),len(L2))

# returns -1 for error otherwise returns index of input
def Select_Command(Input, Commands, Threshold = 0.33):
    argMax, Max = -1, 0
    for i in range(len(Commands)):
        Similarity = 0
        for j in range(len(Commands[i])):
            Similarity = Similarity + (1-list_compare(Input,Commands[i][j]))
        Similarity = Similarity / len(Commands[i])
        if Similarity > Max and Similarity > Threshold:
            argMax, Max = i, Similarity
    return argMax
    

# list of new commands. Normally we would do this over Thread 
# but I am doing this manually with nothing under my sleeve 
# (first run through) in order to make the test faster

# 'test command one' 
Command_One = [['just', 'one'],
              ['test', 'one'],
              ['test', 'command', 'one'],
              ['test', 'to', 'man', 'one'],
              ['test', 'man', 'one']]
# 'some long sentence'
Command_Two = [['some', 'one', 'sentence'],
              ['some', 'sentence'],
              ['the', 'sentence'],
              ['some', 'long', 'sentence'],
              ['some', 'sentence']]
# 'dog' - corner case short command
Command_Three = [['dog'], ['dog'],
                [], [],
                ['talk']]
# 'test command four' - corner case similar commands
Command_Four = [['test', 'command'],
               ['command', 'for'],
               ['test', 'command', 'for'],
               ['test', 'in', 'for'],
               ['just', 'coming']] # the last one makes absolutely no sense but there yah go

### Actual Implementation

Code below used to generate file...

In [5]:
# read from file, any other way to get 8-bit encoded data works
filename = 'Recordings/command4e'

test = []
with open(filename + '.log', "rb") as file:
    test.extend(file.read())
    
RawPCM = np.array(decodeTBS2(test),dtype=np.int16)[1500:]
# filtering with this method just makes it worse... 
# look into other methods like Riley suggested or accept the error for now...
# This is quite unfortunate so far
Filtered = filtered(RawPCM)

# convert to wav file
toWav(RawPCM,filename)

# read file and translate through vosk
wav2str(filename + '.wav')

FileNotFoundError: [Errno 2] No such file or directory: 'Recordings/command4e.log'

In [39]:
# test first with intended command
Input = [['test', 'command', 'one'], ['some', 'long', 'sentence'], ['dog'], ['test', 'command', 'four']]
Commands = [Command_One,Command_Two,Command_Three,Command_Four]
for i in range(len(Input)):
    index = Select_Command(Input[i],Commands,0.4)
    print(i == index,Commands[index])
# This works

True [['just', 'one'], ['test', 'one'], ['test', 'command', 'one'], ['test', 'to', 'man', 'one'], ['test', 'man', 'one']]
True [['some', 'one', 'sentence'], ['some', 'sentence'], ['the', 'sentence'], ['some', 'long', 'sentence'], ['some', 'sentence']]
False [['test', 'command'], ['command', 'for'], ['test', 'command', 'for'], ['test', 'in', 'for'], ['just', 'coming']]
True [['test', 'command'], ['command', 'for'], ['test', 'command', 'for'], ['test', 'in', 'for'], ['just', 'coming']]


In [38]:
# compare to random commands...
Input = [['cat'], ['another', 'command'], ['some', 'other', 'command'], ['test','command', 'five']]
Commands = [Command_One,Command_Two,Command_Three,Command_Four]
for i in range(len(Input)):
    index = Select_Command(Input[i],Commands,0.4)
    print(index == -1)

True
True
True
False


### Notes

The true commands are recognizable with a similarity threshold of 0.35. The false commands can be removed with a similarity of 0.55. This forces a dilemma, and we may need better methods. For example, with small strings we can force them to repeat if we get an empty list. Next, we should try comparing these commands to new spoken ones (valid and invalid).

### Test real commands

Below is the result of recorded commands vs recorded commands

In [28]:
filename = 'Recordings/one'

test = []
with open(filename + '.log', "rb") as file:
    test.extend(file.read())
    
RawPCM = np.array(decodeTBS2(test),dtype=np.int16)[1500:]

# convert to wav file
toWav(RawPCM,filename)

# read file and translate through vosk
result = wav2str(filename + '.wav')
print(result)

Commands = [Command_One,Command_Two,Command_Three,Command_Four]
index = Select_Command(result,Commands,.4)

return index

# print(index,Commands[index])

# S = 0
# for i in range(5):
#     S = S+(list_compare(result,Commands[index][i]))
# print('similarity: ' + str(1-S/5))

['pest', 'command', 'point']
3 [['test', 'command'], ['command', 'for'], ['test', 'command', 'for'], ['test', 'in', 'for'], ['just', 'coming']]
similarity: 0.470952380952381


In [26]:
filename = 'Recordings/two'

test = []
with open(filename + '.log', "rb") as file:
    test.extend(file.read())
    
RawPCM = np.array(decodeTBS2(test),dtype=np.int16)[1500:]

# convert to wav file
toWav(RawPCM,filename)

# read file and translate through vosk
result = wav2str(filename + '.wav')
print(result)

Commands = [Command_One,Command_Two,Command_Three,Command_Four]
index = Select_Command(result,Commands,.4)

print(index,Commands[index])

S = 0
for i in range(5):
    S = S+(list_compare(result,Commands[index][i]))
print('similarity: ' + str(1-S/5))

['some', 'on', 'sentence']
1 [['some', 'one', 'sentence'], ['some', 'sentence'], ['the', 'sentence'], ['some', 'long', 'sentence'], ['some', 'sentence']]
similarity: 0.6944444444444444


In [31]:
filename = 'Recordings/three'

test = []
with open(filename + '.log', "rb") as file:
    test.extend(file.read())
    
RawPCM = np.array(decodeTBS2(test),dtype=np.int16)[1500:]

# convert to wav file
toWav(RawPCM,filename)

# read file and translate through vosk
result = wav2str(filename + '.wav')
print(result)

Commands = [Command_One,Command_Two,Command_Three,Command_Four]
index = Select_Command(result,Commands)

print(index,Commands[index])

S = 0
for i in range(5):
    S = S+(list_compare(result,Commands[index][i]))
print('similarity: ' + str(1-S/5))

[]
2 [['dog'], ['dog'], [], [], ['talk']]
similarity: 0.4


In [30]:
filename = 'Recordings/four'

test = []
with open(filename + '.log', "rb") as file:
    test.extend(file.read())
    
RawPCM = np.array(decodeTBS2(test),dtype=np.int16)[1500:]

# convert to wav file
toWav(RawPCM,filename)

# read file and translate through vosk
result = wav2str(filename + '.wav')
print(result)

Commands = [Command_One,Command_Two,Command_Three,Command_Four]
index = Select_Command(result,Commands,.4)

print(index,Commands[index])

S = 0
for i in range(5):
    S = S+(list_compare(result,Commands[index][i]))
print('similarity: ' + str(1-S/5))

['test', 'plan', 'for']
3 [['test', 'command'], ['command', 'for'], ['test', 'command', 'for'], ['test', 'in', 'for'], ['just', 'coming']]
similarity: 0.5182539682539682


### Further notes

This did fantastic! However, both corner cases caught this program. For short inputs we should verify that we got an actual input and not \[\]. For similar commands we'll need an improved filter or hardware I believe.