# Main Prototype
In this notebook we collected our main prototype. Were we aggregate the different modules using our custom Home Assistant instance and proccess them via our LLM.

In [1]:
# Llama
import ollama
from time import sleep
import os
from pathlib import Path
# Home Assistant API
from homeassistant_api import Client
from datetime import datetime, timedelta

# Matplotlib
import matplotlib.pyplot as plt
from IPython.display import Audio
import speech_recognition as sr
import whisper

from pydub import AudioSegment
from pydub.playback import play
import simpleaudio as sa
from multiprocessing import Process
from threading import Thread
import multiprocess as mp
import emoji

## Text To Speech Model

In [2]:
import torch
from TTS.api import TTS

# Get device
device = "cuda" if torch.cuda.is_available() else "cpu"

# List available 🐸TTS models
tts = TTS("tts_models/en/ljspeech/tacotron2-DDC").to(device)
#tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device)

 > tts_models/en/ljspeech/tacotron2-DDC is already downloaded.
 > vocoder_models/en/ljspeech/hifigan_v2 is already downloaded.
 > Using model: Tacotron2
 > Setting up Audio Processor...
 | > sample_rate:22050
 | > resample:False
 | > num_mels:80
 | > log_func:np.log
 | > min_level_db:-100
 | > frame_shift_ms:None
 | > frame_length_ms:None
 | > ref_level_db:20
 | > fft_size:1024
 | > power:1.5
 | > preemphasis:0.0
 | > griffin_lim_iters:60
 | > signal_norm:False
 | > symmetric_norm:True
 | > mel_fmin:0
 | > mel_fmax:8000.0
 | > pitch_fmin:1.0
 | > pitch_fmax:640.0
 | > spec_gain:1.0
 | > stft_pad_mode:reflect
 | > max_norm:4.0
 | > clip_norm:True
 | > do_trim_silence:True
 | > trim_db:60
 | > do_sound_norm:False
 | > do_amp_to_db_linear:True
 | > do_amp_to_db_mel:True
 | > do_rms_norm:False
 | > db_level:None
 | > stats_path:None
 | > base:2.718281828459045
 | > hop_length:256
 | > win_length:1024
 > Model's reduction rate `r` is set to: 1
 > Vocoder Model: hifigan
 > Setting up Audio P

In [3]:
stt = whisper.load_model("base.en")

## Data from Home Assintant

In [4]:
ha_ip_addr = 'localhost'
user_entities_idx = {"name":"sensor.user_recognition",
              "age": "sensor.age_sensor",
              "emotion": "sensor.emotion_sensor"}

sensor_entities_idx = {"co2":"sensor.psoc6_micropython_sensornode_working_space_co2_ppm",
                       "temp": "sensor.psoc6_micropython_sensornode_working_space_temperature"}

user_data = {}
sensor_data = {}

history_minutes = 5

with Client(
    f'http://{ha_ip_addr}:8123/api',
    'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiIzMzEyZmUwNjRhZDk0MDYxOWQ2M2RhZmNmZDFiYzU3MSIsImlhdCI6MTcxNzI3MDEzMiwiZXhwIjoyMDMyNjMwMTMyfQ.Wwxem_ktghyGjVjeL3W-1YP0XAKhnTzCgEmfi14JAos'
) as client:
    for entity_key in user_entities_idx:
        # Get entity from id
        entity_id = user_entities_idx[entity_key]
        entity = client.get_entity(entity_id=entity_id)

        # Get data from this entity id for last n minutes
        start = datetime.now() - timedelta(minutes=history_minutes)
        history = client.get_entity_histories(entities=[entity], start_timestamp=start)
    
        # Go through history and filter unkown values
        for entry in history:
            user_data[entity_key] = list(filter(lambda x: x.lower() != 'unknown', [x.state for x in entry.states]))
            
    for entity_key in sensor_entities_idx:
        # Get entity from id
        entity_id = sensor_entities_idx[entity_key]
        entity = client.get_entity(entity_id=entity_id)

        # Get data from this entity id for last n minutes
        start = datetime.now() - timedelta(minutes=history_minutes)
        history = client.get_entity_histories(entities=[entity], start_timestamp=start)
    
        # Go through history and filter unkown values
        for entry in history:
            sensor_data[entity_key] = list(filter(lambda x: x.lower() != 'unknown', [x.state for x in entry.states]))

In [5]:
name = user_data['name'][-1]
emotion = user_data['emotion'][-1]
age = user_data['age'][-1]

In [6]:
name, emotion, age

('Marco', 'sad', 'young man')

In [7]:
sensor_data

{'co2': ['473', '470', '469', '467', '468', '467', '468', '469'],
 'temp': ['23.70',
  '23.71',
  '23.70',
  '23.66',
  '23.60',
  '23.58',
  '23.53',
  '23.48',
  '23.46',
  '23.43',
  '23.41',
  '23.40',
  '23.37',
  '23.33',
  '23.29',
  '23.27',
  '23.23',
  '23.21',
  '23.17',
  '23.15',
  '23.14',
  '23.11',
  '23.08',
  '23.09',
  '23.05',
  '23.07',
  '23.08',
  '23.09',
  '23.14',
  '23.13',
  '23.15',
  '23.19',
  '23.20',
  '23.23',
  '23.25',
  '23.27',
  '23.28',
  '23.31',
  '23.32',
  '23.31',
  '23.30',
  '23.28',
  '23.30',
  '23.33',
  '23.36',
  '23.37',
  '23.40',
  '23.38',
  '23.36',
  '23.35']}

## Main LLM function

In [8]:
preprompt = f" \
    Preprompt: \
    This preprompt is just to give you context for a user, you focus on answering the prompt\n\
    You work for a user.\
    Make sure you say hello properly at the beggining of your reply, the name of the user you are serving is {name}.\
    Also make sure you address him/her properly as you talk to them.\
    Take this information about the user into account:\
    - User's age: {age}\n\
    - Users's emotion: {emotion}\n\
    Prompt:\
"

sensoring_preprompt = f"\
    Preprompt: \n\
    Use this information to answer the users request:\n\
    The temperature is {sensor_data['temp'][-1]} degrees\n\
    CO2 levels are {sensor_data['co2'][-1]}ppm. Lower the better for air quality.\n\
    Prompt:\
"

In [9]:
def llm(prompt):
    response = ollama.generate(model='llama2', prompt="\
                    Analyse the following prompt, just answer Positive if you require external sensor data \n\
                    You can have access to CO2 (in ppm) or Temperature (in C degrees) and Negative if you\n\
                    dont need it \n\
                    - How cold is here -> Positive \n\
                    - How is the air quality here -> Positive\n\
                    - Tell me something I don't know -> Negative\n\
                    - How do you reverse a binary tree -> Negative\n\
                    - Tell me a joke -> Negative\n\
                    Prompt:\n\
                    ")
    if "Positive" in response['response']:
        full_content =  sensoring_preprompt + prompt
        print(full_content)
    else:
        full_content = preprompt + prompt 
        
    response = ollama.generate(model='llama2', prompt=full_content)
    return response['response']

## Speech to Text

In [11]:
r = sr.Recognizer()

with sr.Microphone(device_index=2) as source:
    tts.tts_to_file(text=f"What do you need, {name}?", speaker_wav="file.mp3", file_path=f"tts/need.wav")
        #tts.tts_to_file(text="What do you need?", language='en', speaker_wav="file.mp3", file_path=f"tts/need.wav")
    display(Audio(f"tts/need.wav", autoplay=True))

    sleep(3)
    print("Say something!")
    r.adjust_for_ambient_noise(source)
    audio = r.listen(source, timeout=2) 

with open("recorded_audio.wav", "wb") as f:
    f.write(audio.get_wav_data())
    print("Audio saved as recorded_audio.wav")

 > Text splitted to sentences.
['What do you need, Marco?']
 > Processing time: 0.871654748916626
 > Real-time factor: 0.39295034375228166


Say something!
Audio saved as recorded_audio.wav


In [12]:
result = stt.transcribe("recorded_audio.wav")
prompt = result["text"]
prompt



' Please, how are you? Tell me a little yoke, please.'

## LLM Inference

In [13]:
import re

# We do a bit of cleaning before the reader takes the model response.
def clean_markdown(text):
    # Remove bold and italic markdown
    text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text)
    text = re.sub(r'\*([^*]+)\*', r'\1', text)
    text = re.sub(r'__([^_]+)__', r'\1', text)
    text = re.sub(r'_([^_]+)_', r'\1', text)
    
    # Replace new lines with spaces
    text = text.replace('\n', ' ')
    
    # Replace bullet points with a more readable format
    text = re.sub(r'\n\s*\*\s*', ' - ', text)
    text = re.sub(r'\n\s*-\s*', ' - ', text)
    text = re.sub(r'\n\s*\d+\.\s*', ' - ', text)
    
    # Ensure there are no multiple spaces
    text = re.sub(r'\s+', ' ', text)
    text = emoji.replace_emoji(text, replace='')
    
    return text.strip()

In [14]:
text = clean_markdown(llm(prompt))

## Text to Speech

In [15]:
def generate_tts():
    for i, phrase in enumerate(text.split('.')):
        tts.tts_to_file(text=phrase, speaker_wav="file.mp3", file_path=f"tts/output_{i}.wav")
        #tts.tts_to_file(text=phrase, language='en', speaker_wav="file.mp3", file_path=f"tts/output_{i}.wav")

In [16]:
Thread(target=generate_tts).start()

 > Text splitted to sentences.
["Hello Marco! adjusts glasses It's a pleasure to be speaking with you today"]


In [None]:
def play_audio(file_path):
    try:
        audio = AudioSegment.from_file(file_path)
        play_obj = sa.play_buffer(audio.raw_data, num_channels=audio.channels, bytes_per_sample=audio.sample_width, sample_rate=audio.frame_rate)
        play_obj.wait_done()
        Path(f"tts/output_{i}.wav").unlink()
    except KeyboardInterrupt:
        Path(f"tts/output_{i}.wav").unlink()
        raise KeyboardInterrupt


for i, _ in enumerate(text.split('.')):
    while not os.path.exists(f"tts/output_{i}.wav"):
        sleep(1)
    play_audio(f"tts/output_{i}.wav")

 > Processing time: 1.92641282081604
 > Real-time factor: 0.3537425274732985
 > Text splitted to sentences.
["I hope you're doing well despite feeling a bit down"]
 > Processing time: 1.4605727195739746
 > Real-time factor: 0.36885684060159135
 > Text splitted to sentences.
["nodding sympathetically Would you like to talk about what's on your mind?", "Sometimes it helps to share feelings with someone and get things off one's chest"]
 > Processing time: 2.8054957389831543
 > Real-time factor: 0.2789958013628344
 > Text splitted to sentences.
['offering a supportive smile']
 > Processing time: 0.6169333457946777
 > Real-time factor: 0.2541737719501615
