In [1]:
import asyncio
import time
from serial.tools import list_ports
import matplotlib.pyplot as plt
import numpy as np
import serial
from IPython.display import Audio, display, HTML
import librosa
import librosa.display



In [2]:
import tensorflow as tf
from tensorflow import keras

In [3]:
ports = list_ports.comports()
for port in ports:
    print(port)

COM3 - USB Serial Device (COM3)


In [4]:
# import serial.tools.list_ports

# ports = serial.tools.list_ports.comports()
# for port in ports:
#     print(port.device, "-", port.description)

In [5]:
SERIAL_PORT = "COM3"
BAUD_RATE = 38400

def arduino_activate():
    try:
        arduino = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
        time.sleep(2)  # Allow time for connection to establish
    except serial.SerialException as e:
        print(f"Failed to connect to serial port {SERIAL_PORT}: \n{e}")
        return None

    command = input("Type 'x' to Activate Arduino: ")
    if command.lower() == 'x':
        arduino.write(command.encode('utf-8'))
        print("Success Activated!")
        print("Open the Serial Monitor to check if it is working, then close it.")
    return arduino


arduino = arduino_activate()

Type 'x' to Activate Arduino: x
Success Activated!
Open the Serial Monitor to check if it is working, then close it.


In [6]:
def arduino_read(buffer, buffer_size, overlapping, norm=(None, None), max_attempts=10):
    buffer = np.roll(buffer, overlapping)
    num_data = buffer_size - overlapping
    for i in range(num_data):
        decoded_data = ''
        attempts = 0
        while decoded_data == '':
            arduino_data = arduino.readline()
            decoded_data = arduino_data[:len(arduino_data)].decode("utf-8").strip('\r\n')
            attempts += 1
            if attempts >= max_attempts:
                print('Fail to Retrieve Data...')
                break
        if norm[0] is not None:
            decoded_data = normalize(int(decoded_data), 1, -1, norm[0], norm[1])
        try:
            buffer[i+overlapping] = decoded_data
        except ValueError:
            print("Open the Serial Monitor to check if it is working. If it's not, press the reset button and rerun 'arduino_activate' function again.")
    return buffer

def normalize(array, new_max, new_min, old_max, old_min):
    array = (((array - old_min) * (new_max - new_min)) / (old_max - old_min)) + new_min
    return array.astype(np.float16)


# Calibrate the microphone
print("Calibrating: Please Speak to the Microphone")
time.sleep(1)
tuning_data = arduino_read(np.zeros(48000), 48000, 0)
TUNING_MAX, TUNING_MIN = (max(tuning_data), min(tuning_data))
print(f'Normalized signal from the range ({TUNING_MIN}, {TUNING_MAX}) to (-1, 1)')

Calibrating: Please Speak to the Microphone
Normalized signal from the range (-1998.0, 1704.0) to (-1, 1)


In [9]:
# pip install tensorflow


In [10]:
# pip install --upgrade numba


In [11]:
# pip install --upgrade librosa

### Correctly running code

In [9]:
BUFFER_SIZE = 24000
OVERLAPPED = 512

NUM_MFCC = 13
N_FFT = 2048
HOP_LENGTH = 512
SAMPLE_RATE = 16000

EMOTIONS = ['sad','neutral', 'happy', 'angry']
COMMANDS = ['a', 'b', 'c', 'd']

# Manually Calibrate Sensitivity of Recognition
RECOG_MASK = np.array([1, 300, 400, 50])


async def tflite_process_data():
    data = np.zeros(BUFFER_SIZE)
    data = arduino_read(data, BUFFER_SIZE, OVERLAPPED, norm=(TUNING_MAX, TUNING_MIN))
#     data /= np.iinfo(np.int16).max
#     print(data.shape)
    mfcc = librosa.feature.mfcc(y=data, sr=SAMPLE_RATE, n_mfcc=NUM_MFCC, n_fft=N_FFT, hop_length=HOP_LENGTH)
    features = np.array([mfcc.T], dtype=np.float32)

    # Model Input Shape = (None, None, 13)
    interpreter.set_tensor(interpreter.get_input_details()[0]['index'], features)
    interpreter.invoke()
    prediction = interpreter.get_tensor(interpreter.get_output_details()[0]['index'])[0]
    result = np.multiply(prediction, RECOG_MASK)
    emotion = EMOTIONS[np.argmax(result)]
    command = COMMANDS[np.argmax(result)]
    arduino.write(command.encode('utf-8'))
    print(result)
    print(f'Emotion: {emotion}\n')
    
async def tflite_run(rounds=5):
    tasks = []
    start_time = time.time()
    for turn in range(rounds):
        task = asyncio.create_task(tflite_process_data())
        tasks.append(task)
        time.sleep(0.6)
    await asyncio.gather(*tasks)
    display(HTML("<hr>"))
    print(f"Inference time for {turn+1} rounds: {time.time() - start_time} seconds") 


interpreter = tf.lite.Interpreter(model_path="SER.tflite")
interpreter.allocate_tensors()
await tflite_run(rounds=5)

[ 0.33832064 36.36630401 13.53465915 25.33108592]
Emotion: neutral

[2.64457834e-04 7.27865822e-01 3.56483902e-01 4.98209059e+01]
Emotion: angry

[9.84158687e-05 3.19786195e-01 1.54644030e-01 4.99224484e+01]
Emotion: angry

[7.57678691e-03 9.16127209e+00 3.74204740e+00 4.76265281e+01]
Emotion: angry

[6.62807288e-05 2.69259996e-01 5.49055450e-01 4.98831809e+01]
Emotion: angry



Inference time for 5 rounds: 33.853081941604614 seconds


In [16]:
# pip install pyaudio


Defaulting to user installation because normal site-packages is not writeable
Collecting pyaudio
  Downloading PyAudio-0.2.14-cp39-cp39-win_amd64.whl (164 kB)
     -------------------------------------- 164.1/164.1 kB 3.3 MB/s eta 0:00:00
Installing collected packages: pyaudio
Successfully installed pyaudio-0.2.14
Note: you may need to restart the kernel to use updated packages.




# Trials

In [7]:
import speech_recognition as sr
import numpy as np
import asyncio
import librosa
import time
from IPython.display import HTML

In [8]:


# # Constants
# BUFFER_SIZE = 24000
# OVERLAPPED = 512

# NUM_MFCC = 13
# N_FFT = 2048
# HOP_LENGTH = 512
# SAMPLE_RATE = 16000

# EMOTIONS = ['neutral', 'happy', 'surprise', 'unpleasant']
# COMMANDS = ['a', 'b', 'c', 'd']

# # Manually Calibrate Sensitivity of Recognition
# RECOG_MASK = np.array([1, 30000, 40000, 500])

# # Initialize speech recognizer
# recognizer = sr.Recognizer()

# async def tflite_process_data():
#     data = np.zeros(BUFFER_SIZE)
#     data = arduino_read(data, BUFFER_SIZE, OVERLAPPED, norm=(TUNING_MAX, TUNING_MIN))
    
#     # Speech recognition
#     with sr.AudioFile(data) as source:
#         audio_data = recognizer.record(source)
#         try:
#             text = recognizer.recognize_google(audio_data)
#             print(f"Recognized Text: {text}")
#         except sr.UnknownValueError:
#             print("Google Speech Recognition could not understand audio")
#         except sr.RequestError as e:
#             print(f"Could not request results from Google Speech Recognition service; {e}")
    
#     # MFCC feature extraction
#     mfcc = librosa.feature.mfcc(y=data, sr=SAMPLE_RATE, n_mfcc=NUM_MFCC, n_fft=N_FFT, hop_length=HOP_LENGTH)
#     features = np.array([mfcc.T], dtype=np.float32)

#     # Model Inference
#     interpreter.set_tensor(interpreter.get_input_details()[0]['index'], features)
#     interpreter.invoke()
#     prediction = interpreter.get_tensor(interpreter.get_output_details()[0]['index'])[0]
#     result = np.multiply(prediction, RECOG_MASK)
#     emotion = EMOTIONS[np.argmax(result)]
#     command = COMMANDS[np.argmax(result)]
#     arduino.write(command.encode('utf-8'))
#     print(f"Emotion Detected: {emotion}")
    
# async def tflite_run(rounds=10):
#     tasks = []
#     start_time = time.time()
#     for turn in range(rounds):
#         task = asyncio.create_task(tflite_process_data())
#         tasks.append(task)
#         time.sleep(0.6)
#     await asyncio.gather(*tasks)
#     display(HTML("<hr>"))
#     print(f"Inference time for {turn+1} rounds: {time.time() - start_time} seconds") 

# interpreter = tf.lite.Interpreter(model_path="SER_quant.tflite")
# interpreter.allocate_tensors()

# await tflite_run(rounds=10)


In [9]:
# pip install pyaudio



In [127]:
import numpy as np
import librosa
import asyncio
import speech_recognition as sr
import pyaudio
import tensorflow as tf
from IPython.display import HTML

# Constants
BUFFER_SIZE = 24000
OVERLAPPED = 512
NUM_MFCC = 13
N_FFT = 2048
HOP_LENGTH = 512
SAMPLE_RATE = 9600
EMOTIONS = ['sad','neutral', 'happy', 'angry']
COMMANDS = ['a', 'b', 'c', 'd']
RECOG_MASK = np.array([1, 30000, 40000, 500])

# Initialize TensorFlow Lite interpreter
interpreter = tf.lite.Interpreter(model_path="SER_quant.tflite")
interpreter.allocate_tensors()

# Initialize the recognizer and microphone
recognizer = sr.Recognizer()

async def record_and_process_audio():
    p = pyaudio.PyAudio()
    stream = p.open(format=pyaudio.paInt16, channels=1, rate=SAMPLE_RATE, input=True, frames_per_buffer=BUFFER_SIZE)
    
    print("Recording...")
    frames = []
    for _ in range(0, int(SAMPLE_RATE / BUFFER_SIZE*4.5)):  # Record for 5 seconds
        data = stream.read(BUFFER_SIZE)
#         data = np.zeros(BUFFER_SIZE)
#         data = arduino_read(data, BUFFER_SIZE, OVERLAPPED, norm=(TUNING_MAX, TUNING_MIN))
    
        frames.append(data)
    print("Recording stopped.")
    
    stream.stop_stream()
    stream.close()
    p.terminate()
    
    # Convert frames to byte array
    audio_data = b''.join(frames)
    source = sr.AudioData(audio_data, SAMPLE_RATE, 2)  # Corrected this line
    
    data = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32)
    # Normalize the audio data to range [-1, 1] (as expected by librosa)
#     data /= np.iinfo(np.int16).max
    # Use the audio data for speech recognition
#     with sr.AudioData(audio_data, SAMPLE_RATE, 2) as source:
    
    try:
        text = recognizer.recognize_google(source)
        print(f"Recognized Text: {text}")
    except sr.UnknownValueError:
        print("Google Speech Recognition could not understand audio")
    except sr.RequestError as e:
        print(f"Could not request results from Google Speech Recognition service; {e}")
    
    # Convert byte data to NumPy array for MFCC
#     data = np.frombuffer(audio_data, dtype=np.int16)

    print(data.shape)
#     print(data[0])
#     print(data[24000])
#     print(data[48000])

    # MFCC feature extraction
    mfcc = librosa.feature.mfcc(y=data, sr=SAMPLE_RATE, n_mfcc=NUM_MFCC, n_fft=N_FFT, hop_length=HOP_LENGTH)
    features = np.array([mfcc.T], dtype=np.float32)

    # Model Inference
    interpreter.set_tensor(interpreter.get_input_details()[0]['index'], features)
    interpreter.invoke()
    prediction = interpreter.get_tensor(interpreter.get_output_details()[0]['index'])[0]
    result = np.multiply(prediction, RECOG_MASK)
    emotion = EMOTIONS[np.argmax(result)]
    command = COMMANDS[np.argmax(result)]
    arduino.write(command.encode('utf-8'))
    print(result)
    print(f"Emotion Detected: {emotion}\n")
    
    
    
    
async def tflite_run(rounds=1):
    tasks = []
    for _ in range(rounds):
        task = asyncio.create_task(record_and_process_audio())
        tasks.append(task)
        await asyncio.sleep(0.6)  # Wait before starting next recording
    await asyncio.gather(*tasks)
    display(HTML("<hr>"))
    print("Inference completed.")

# Run the asynchronous inference loop
await tflite_run(rounds=1)


Recording...
Recording stopped.
Recognized Text: today I am happy
(24000,)
[3.64670134e-03 4.47887823e+03 3.24245572e+04 1.82217360e+01]
Emotion Detected: surprise



Inference completed.


In [114]:
input_details = interpreter.get_input_details()
expected_shape = input_details[0]['shape']
print("Expected shape:", expected_shape)


Expected shape: [ 1 47 13]


In [131]:
import numpy as np
import librosa
import asyncio
import speech_recognition as sr
import pyaudio
import tensorflow as tf
import time
from IPython.display import HTML

# Constants defined as before

BUFFER_SIZE = 24000
OVERLAPPED = 512

NUM_MFCC = 13
N_FFT = 2048
HOP_LENGTH = 512
SAMPLE_RATE = 16000

EMOTIONS = ['neutral', 'happy', 'surprise', 'unpleasant']
COMMANDS = ['a', 'b', 'c', 'd']

# Manually Calibrate Sensitivity of Recognition
RECOG_MASK = np.array([1, 30000, 40000, 500])



async def tflite_process_data():
    data = np.zeros(BUFFER_SIZE)
    data = arduino_read(data, BUFFER_SIZE, OVERLAPPED, norm=(TUNING_MAX, TUNING_MIN))
#     data = data.astype(np.float32) / np.iinfo(np.int16).max  # Normalize audio data to [-1, 1]

    # Convert the data to a format suitable for speech recognition
#     recognizer = sr.Recognizer()
#     recognized_text = recognize_speech_from_array(data, SAMPLE_RATE)
    
#     with sr.AudioFile(data) as source:  # Assuming data can be treated like an audio file
#         audio = recognizer.record(source)

#     try:
#         # Recognize speech using Google's speech recognition
#         recognized_text = recognizer.recognize_google(audio)
#         print(f"Recognized sentence: {recognized_text}")
#     except sr.UnknownValueError:
#         print("Google Speech Recognition could not understand audio")
#         recognized_text = ""
#     except sr.RequestError as e:
#         print(f"Could not request results from Google Speech Recognition service; {e}")
#         recognized_text = ""

    # Proceed with emotion detection if speech was recognized
#     if recognized_text:
    
    mfcc = librosa.feature.mfcc(y=data, sr=SAMPLE_RATE, n_mfcc=NUM_MFCC, n_fft=N_FFT, hop_length=HOP_LENGTH)
    features = np.array([mfcc.T], dtype=np.float32)

    # Model Inference
    interpreter.set_tensor(interpreter.get_input_details()[0]['index'], features)
    interpreter.invoke()
    prediction = interpreter.get_tensor(interpreter.get_output_details()[0]['index'])[0]
    result = np.multiply(prediction, RECOG_MASK)
    emotion = EMOTIONS[np.argmax(result)]
    command = COMMANDS[np.argmax(result)]
    arduino.write(command.encode('utf-8'))
    print(result)
    print(f'Emotion: {emotion}\n')
    # Send command to Arduino
#     arduino.write(command.encode('utf-8'))
#     else:
#         print("No speech detected, skipping emotion detection.")

# Rest of your code for running and initializing remains unchanged
    
    
async def tflite_run(rounds=2):
    tasks = []
    for _ in range(rounds):
        task = asyncio.create_task(tflite_process_data())
        tasks.append(task)
        await asyncio.sleep(0.6)  # Wait before starting next recording
    await asyncio.gather(*tasks)
    display(HTML("<hr>"))
    print("Inference completed.")

# Run the asynchronous inference loop
await tflite_run(rounds=2)

AttributeError: __enter__

In [129]:
import numpy as np
import io
import soundfile as sf
from speech_recognition import Recognizer, AudioData

def numpy_array_to_audio_file(data, sample_rate=16000):
    """ Convert a NumPy array to an audio file in memory. """
    virtual_file = io.BytesIO()
    sf.write(virtual_file, data, samplerate=sample_rate, format='WAV', subtype='PCM_16')
    virtual_file.seek(0)  # Important: return to the start of the file before reading
    return virtual_file

def recognize_speech_from_array(data, sample_rate=16000):
    """ Recognize speech using the speech_recognition library. """
    recognizer = Recognizer()
    audio_file = numpy_array_to_audio_file(data, sample_rate)
    with AudioData(audio_file, sample_rate, 2) as source:
        # Use recognizer to convert speech to text
        try:
            text = recognizer.recognize_google(source)
            print("Recognized text:", text)
            return text
        except Exception as e:
            print("Error recognizing speech:", str(e))
            return None


In [17]:
import numpy as np
import io
import soundfile as sf
from speech_recognition import Recognizer, AudioData
import serial  # For serial communication with Arduino
import speech_recognition as sr

# Initialize TensorFlow Lite interpreter
interpreter = tf.lite.Interpreter(model_path="SER_quant.tflite")
interpreter.allocate_tensors()

# Initialize the recognizer and microphone
recognizer = sr.Recognizer()

def numpy_array_to_audio_file(data, sample_rate=16000):
    """Convert a NumPy array to an audio file in memory."""
    virtual_file = io.BytesIO()
    sf.write(virtual_file, data, samplerate=sample_rate, format='WAV', subtype='PCM_16')
    virtual_file.seek(0)  # Important: return to the start of the file before reading
    return virtual_file

def recognize_speech_from_array(data, sample_rate=16000):
    """Recognize speech using the speech_recognition library."""
    recognizer = Recognizer()
    audio_file = numpy_array_to_audio_file(data, sample_rate)
    
    source = sr.AudioData(audio_file.read(), sample_rate, 2)
#     with AudioData(audio_file, sample_rate, 2) as source:
    try:
        text = recognizer.recognize_google(source)
        print("Recognized text:", text)
        return text
    except Exception as e:
        print("Error recognizing speech:", str(e))
        return None

# Example setup for reading and processing
# arduino = serial.Serial('COM_PORT', 9600, timeout=1)  # Update 'COM_PORT' with your actual COM port
BUFFER_SIZE = 24000
OVERLAPPED = 512
SAMPLE_RATE = 16000

data = np.zeros(BUFFER_SIZE)
print("Recording started...")
data = arduino_read(data, BUFFER_SIZE, OVERLAPPED, norm=(TUNING_MAX, TUNING_MIN))
if data is not None:
    print("Recording ended.")
    recognized_text = recognize_speech_from_array(data, SAMPLE_RATE)
else:
    print("Recording failed.")


Recording started...
Recording ended.
Error recognizing speech: 


  recognized_text = record_and_process_audio()
