In [6]:
!pip install elevenlabs python-dotenv h5py -q

In [10]:
from dotenv import load_dotenv
from elevenlabs.client import ElevenLabs
from elevenlabs.play import play
import os
import pandas as pd
import IPython
import string
import re
import shutil
import subprocess
import librosa
import numpy as np
import h5py
from collections import Counter

dot = load_dotenv()

In [2]:
# Prepare Medicine Data
raw = pd.read_csv("./medicine.csv")["med_name"]
raw

0       (Bofalogn)Paracetamol Infusion
1                25%+Cevit+B6+B12+20cc
2                        25%+Poly+20cc
3                                3-way
4                50%+Cevit+B6+B12+20cc
                     ...              
1694                        ေခါင္းစႅပ္
1695                          ေဆးထုိးခ
1696                        ေရေႏႅးအိတ္
1697                     ေျခေထာက္ခဲဆႅဲ
1698                          ျကက္ဆူဆီ
Name: med_name, Length: 1699, dtype: object

In [3]:
# Clean Medicine Data
clean_data = []
raw_data = []
punctuation = ''.join([c for c in string.punctuation if c != '-'])
exclude = ('inj', 'tab', 'mg', 'ml', 'syp', 'mgtab', 'g', 'l', 'cc', 'dt', 'iv')
med_one_words = ['C', 'E', 'D', 'A', 'B']
for data in raw:
    word = ""
    for c in data:
        if c in string.ascii_letters + '-' + string.digits:
            word += c
        elif c in punctuation + string.whitespace:
            word += " "

    if word != "":
        words = word.strip().split(' ')

        _words = []
        for word in words:
            if word.lower() not in exclude and (len(word) > 1 or word in med_one_words):
                w_ = re.sub(r'\b\d+[a-zA-Z]+\b', '', word)
                if not w_.isdigit():
                    _words.append(w_)
        
        cleaned = " ".join(_words).strip()
        
        if cleaned != '':
            clean_data.append(cleaned)
            raw_data.append(data)



In [4]:
# Compare with raw data
pd.DataFrame(zip(clean_data, raw_data))

Unnamed: 0,0,1
0,Bofalogn Paracetamol Infusion,(Bofalogn)Paracetamol Infusion
1,Cevit B6 B12,25%+Cevit+B6+B12+20cc
2,Poly,25%+Poly+20cc
3,3-way,3-way
4,Cevit B6 B12,50%+Cevit+B6+B12+20cc
...,...,...
1670,ZyQ,ZyQ
1671,Zyrova,Zyrova 10
1672,Zyrova,Zyrova 5
1673,Zytee RB Solution,Zytee RB Solution


In [39]:
# Final Clean
clean_data = list(set(clean_data))
clean_data.sort()
pd.DataFrame(clean_data)

with open('medicines_p.txt', 'w') as f:
    for medicine in clean_data:
        f.write(medicine + " ")


with open('medicines.txt', 'w') as f:
    for medicine in clean_data:
        f.write(medicine + "\n")

with open("medicines_p.txt", 'r') as f:
    dictionary_raw_list = f.read().split(' ')

with open("medicines_freq.txt", 'w') as f:
    for word, count in Counter(dictionary_raw_list).items():
        if word != '' and not word.isnumeric() and not word.startswith('-'):
            f.write(f"{word} {count}\n")

with open("medicines_dict.txt", 'w') as f:
    for word, count in Counter(dictionary_raw_list).items():
        if word != '' and not word.isnumeric() and not word.startswith('-'):
            f.write(f"{word}\n")

## Generating Voice Audio with ElevenLabs Agents

In [12]:
# Agents
elevenlabs = ElevenLabs(
  api_key=os.getenv("ELEVENLABS_API_KEY"),
)

voice_ids = {
    "Default": "JBFqnCBsd6RMkjVDRZzb",
    "AsianMan1": "K8elrI3roCHJugSjT3np",
    "Matilda": "XrExE9yKIg1WjnnlVkGX",
    "Serena": "pMsXgVXv3BLzUgSXRplE",
    "Daniel": "onwK4e9ZLuTAKqWW03F9",
    "Will": "bIHbv24MWmeRgasZH58o",
    "Laura": "FGY2WhTYpPnrIDTdsKH5",
    "Roger": "CwhRBWXzGAHq8TQ4Fs17",
    "Clara": "Qggl4b0xRMiqOwhPtVWT",
    "Arabella": "Z3R5wn05IrDiVCyEkUrK",
    # "Brittney": "pjcYQlDFKMbcOUp6F5GD",
    # "Ralf Eisent": "A9evEp8yGjv4c3WsIKuY",
    # "Trinity": "2qfp6zPuviqeCOZIE9RZ",
    # "Julia": "tOuLUAIdXShmWH7PEUrU",
    # "Liam": "TX3LPaxmHKxFdv7VOQHJ",
}

current_voice_id = "AsianMan1"
models = ["eleven_multilingual_v2", "eleven_flash_v2"]
current_model = "eleven_flash_v2"

def TTS(text, voice_id):
    audio_stream = elevenlabs.text_to_speech.convert(
        text=text,
        voice_id=voice_id,
        model_id=current_model,
        output_format="mp3_44100_128",
    )
    
    result = bytearray()
    
    for chunk in audio_stream:
        result.extend(chunk)
    
    return bytes(result)


In [13]:
# # Voice Testing

# audio = TTS('Celecoxid CELEC', voice_ids[current_voice_id])

# with open('a.mp3', 'wb') as f:
#     f.write(audio)

# IPython.display.Audio('a.mp3')

In [14]:
# Voice Generation with one voice agent for all dataset

# voices_dir = os.path.join(os.getcwd(), 'voices', 'en')
# agent_dir = os.path.join(voices_dir, current_voice_id)


# for text in clean_data:
    
#     if not os.path.exists(agent_dir):
#         os.makedirs(agent_dir)
    
#     audio_path = os.path.join(agent_dir, f'{text}.mp3')
    
#     if not os.path.exists(audio_path):
#         audio = TTS(text, voice_ids[current_voice_id])
    
#         with open(audio_path, 'wb') as f:
#             f.write(audio)

In [15]:
# Voice Generation for one text for all agent

# text = "Paracetamol"
# voices_dir = os.path.join(os.getcwd(), 'voices', 'en')

# for voice_id in voice_ids.keys():
#     agent_dir = os.path.join(voices_dir, voice_id)

#     if not os.path.exists(agent_dir):
#         os.makedirs(agent_dir)
    
#     audio_path = os.path.join(agent_dir, f'{text}.mp3')
    
#     if not os.path.exists(audio_path):
#         audio = TTS(text, voice_ids[voice_id])
    
#         with open(audio_path, 'wb') as f:
#             f.write(audio)

In [16]:
# # Create ASR dataset for one word

# voices_dir = os.path.join(os.getcwd(), 'voices', 'en')
# dataset_path = os.path.join(voices_dir, 'dataset')
# data_path = os.path.join(dataset_path, 'audio')
# metadata_path = os.path.join(dataset_path, 'metadata.csv')
# meta_data_write_method = 'w'

# text = "Paracetamol"

# if os.path.exists(metadata_path) and meta_data_write_method != 'w':
#     metadata_file = open(metadata_path,  meta_data_write_method)
# else:
#     metadata_file = open(metadata_path, 'w')
#     metadata_file.write('file,text\n')

# if not os.path.exists(dataset_path):
#     os.makedirs(dataset_path)

# if not os.path.exists(data_path):
#     os.makedirs(data_path)

# for voice_id in voice_ids.keys():
#     agent_dir = os.path.join(voices_dir, voice_id)

#     if not os.path.exists(agent_dir):
#         os.makedirs(agent_dir)
    
#     audio_path = os.path.join(agent_dir, f'{text}.mp3')
#     target_path = os.path.join(data_path, f"{text}.mp3")
#     target_path_wav = os.path.join(data_path, f"{text}_{voice_id}.wav")
    
#     if os.path.exists(audio_path) and not os.path.exists(target_path):
#         shutil.copy(audio_path, target_path)
#         try:
#             subprocess.run(["ffmpeg", "-i", target_path, target_path_wav], 
#                            stdout=subprocess.DEVNULL, 
#                            stderr=subprocess.DEVNULL, 
#                            check=True) # convert to wav
#             os.remove(target_path) # remove mp3
#             metadata_file.write(f"{text}_{voice_id}.wav,{text}\n") # csv
#         except subprocess.CalledProcessError as e:
#             print("Error:", e.stderr)


# metadata_file.close()
# pd.read_csv(metadata_path)

In [17]:
# # Create ASR dataset

# voices_dir = os.path.join(os.getcwd(), 'voices', 'en')
# dataset_path = os.path.join(voices_dir, 'dataset')
# data_path = os.path.join(dataset_path, 'audio')
# metadata_path = os.path.join(dataset_path, 'metadata.csv')
# meta_data_write_method = 'w'


# start = 0
# end = 1483


# metadata_file = open(metadata_path, 'w')
# metadata_file.write('file,text\n')

# for i in range(start, end+1):

#     text = clean_data[i]
   
#     if not os.path.exists(dataset_path):
#         os.makedirs(dataset_path)
   
#     if not os.path.exists(data_path):
#         os.makedirs(data_path)
   
#     for voice_id in voice_ids.keys():
#         agent_dir = os.path.join(voices_dir, voice_id)
   
#         if not os.path.exists(agent_dir):
#             os.makedirs(agent_dir)
       
#         audio_path = os.path.join(agent_dir, f'{text}.mp3')
#         target_path = os.path.join(data_path, f"{text}.mp3")
#         target_path_wav = os.path.join(data_path, f"{text}_{voice_id}.wav")

#         if not os.path.exists(audio_path):
#             generate_voice_for_all_agent(text)


#         if os.path.exists(target_path_wav):
#             metadata_file.write(f"{text}_{voice_id}.wav,{text}\n") # csv
       
#         elif os.path.exists(audio_path) and not os.path.exists(target_path_wav):
#             shutil.copy(audio_path, target_path)
#             try:
#                 subprocess.run(["ffmpeg", "-i", target_path, target_path_wav],
#                                stdout=subprocess.DEVNULL,
#                                stderr=subprocess.DEVNULL,
#                                check=True) # convert to wav
#                 os.remove(target_path) # remove mp3
#                 metadata_file.write(f"{text}_{voice_id}.wav,{text}\n") # csv
#             except subprocess.CalledProcessError as e:
#                 print("Error:", e.stderr)

# metadata_file.close()
# pd.read_csv(metadata_path)

## Saving Sample Dataset

In [20]:
# Format Dataset 

dataset_path = os.path.join(os.getcwd(), "voices", "en", "dataset")
audio_path = os.path.join(dataset_path, "data")
metadata_path = os.path.join(dataset_path, "metadata.csv")
metadata = pd.read_csv(metadata_path)

dataset = []
whisper_sampling_rate = 16000 # 16kHz
total_duration = np.float32(0)
agent_involvement = {}

for filename, text in zip(metadata.file, metadata.text):
    audio_file_path = os.path.join(audio_path, filename)
    if os.path.exists(audio_file_path):
        samples, sample_rate = librosa.load(audio_file_path, sr=whisper_sampling_rate)
        total_duration += librosa.get_duration(path=audio_file_path)
        audio = { 'audio': { 'filename': filename, 'array': samples, 'sampling_rate': sample_rate },
                  'sentence': text
                }
        dataset.append(audio)
        
        agent = filename.split('_')[-1]
        if agent in agent_involvement.keys():
            agent_involvement[agent] += 1
        else:
            agent_involvement[agent] = 1
    else:
        print(audio_file_path, 'is not exits')


pd.DataFrame(dataset[0])
# print(count)

Unnamed: 0,audio,sentence
filename,3-way_Default.wav,3-way
array,"[-2.5507216e-10, -1.1945796e-10, -1.7387639e-1...",3-way
sampling_rate,16000,3-way


In [21]:
print("Total Dataset: ", len(dataset))
print("Total Duration (second)s: ", total_duration)
print("No. of Agents: ", len(agent_involvement.keys()))
print("No. of VoiceLines: ", len(dataset)//len(agent_involvement.keys()))
pd.DataFrame(agent_involvement.items())

Total Dataset:  14669
Total Duration (second)s:  15853.571
No. of Agents:  10
No. of VoiceLines:  1466


Unnamed: 0,0,1
0,Default.wav,1467
1,Matilda.wav,1467
2,Serena.wav,1467
3,Daniel.wav,1467
4,Will.wav,1467
5,Laura.wav,1467
6,Roger.wav,1467
7,Clara.wav,1467
8,Liam.wav,1467
9,AsianMan1.wav,1466


In [22]:
# Format Dataset for HDF5

# Find max length of audio sample
max_len = max(len(d['audio']['array']) for d in dataset)

# Pad all audio arrays to same length
def pad(x, max_len):
    return np.pad(x, (0, max_len - len(x)), mode='constant')

padded_samples = np.array([pad(d['audio']['array'], max_len) for d in dataset])
filenames = np.array([d['audio']['filename'] for d in dataset], dtype="S")  # bytes
sampling_rates = np.array([d['audio']['sampling_rate'] for d in dataset])
sentences = np.array([d['sentence'] for d in dataset], dtype="S")

In [24]:
# Save as HDF5
h5filename = "medicines_whisper_en_dataset_v2.h5"

with h5py.File(h5filename, "w") as f:
    f.create_dataset("audio/samples", data=padded_samples)
    f.create_dataset("audio/sampling_rate", data=sampling_rates)
    f.create_dataset("audio/total_duration", data=total_duration, dtype='float32')
    f.create_dataset("sentence", data=sentences)
