### A simulation of a server environment in terms of loading models and single requests


In [1]:
# Let's initialze our libraries first
import os, subprocess
import numpy as np
import datetime

import ffmpeg

import torch, torchaudio, spacy
from sklearn.cluster import AgglomerativeClustering



In [None]:
# Then the models
# ----- Denoising
from denoiser import pretrained
from denoiser.dsp import convert_audio
# ------ Voice transcription
from faster_whisper import WhisperModel
# ------ Speaker diarization
from speechbrain.inference.speaker import EncoderClassifier
from transformers import AutoTokenizer, AutoModelForSequenceClassification
# ------ Grammar correction
from happytransformer import HappyTextToText, TTSettings

In [None]:
# Then load some of the models

# ----- Denoising model
denoising_model = pretrained.dns64()

# ----- Voice transcription model
transcription_model = WhisperModel("tiny")

# ----- Dirization model
speech_dairization_model_path = "pretrained_models/spkrec-xvect-voxceleb"
speech_dairization_classifier = EncoderClassifier.from_hparams(
    source=speech_dairization_model_path,
    run_opts={"device": "cpu"}     # or "cuda" if you have a GPU
)

# ----- Text classification model
text_classification_model_path = "self_trained_models/nlpie-distil-clinicalbert-0.66"
text_classification_model = AutoModelForSequenceClassification.from_pretrained(text_classification_model_path)
text_classification_tokenizer = AutoTokenizer.from_pretrained(text_classification_model_path)

# ----- Entity extraction model
entity_extraction_model_path = "pretrained_models/en_core_med7_lg"
entity_extraction_model = spacy.load("entity_extraction_model_path")

# ----- Grammar correction model
grammar_correction_model_name = "vennify/t5-base-grammar-correction"
grammar_correction_model = HappyTextToText("T5",  grammar_correction_model_name)

Step 1 - Audio input
    Let's assume the frontend already chunks the voice input and sends over bits of audio files

In [4]:
audio_file_path = "C:/Users/HP/Desktop/assignments/Final _Year/test_audio/sample_interview(1).mp3"
request_number = 1

Step 2 - Audio preprocessing

In [None]:
# Audio format modification
def verify_format(audio_file_path):
    if not os.path.exists(audio_file_path):
        print(f"Error: File not found at {audio_file_path}")
        return None

    if audio_file_path.lower().endswith(".wav"):
        print(f"File is already a WAV: {audio_file_path}")
        return audio_file_path

    # Construct the output WAV file path
    wav_file = os.path.splitext(audio_file_path)[0] + ".wav"

    # Check if the output directory exists
    output_dir = os.path.dirname(wav_file)
    if output_dir and not os.path.exists(output_dir):
        try:
            os.makedirs(output_dir)
        except OSError as e:
            print(f"Error creating output directory: {e}")
            return None

    try:
        # Use ffmpeg-python for the conversion
        ffmpeg.input(audio_file_path).output(wav_file).run(quiet=True)
        print(f"Successfully converted {audio_file_path} to {wav_file}")
        return wav_file
    
    except ffmpeg.Error as e:
        print(f"Error converting {audio_file_path} to WAV:")
        print(e.stderr.decode())  # Print the ffmpeg error message
        return None
    
    except FileNotFoundError:
        print("Error: ffmpeg not found. Please ensure ffmpeg is installed and in your system's PATH.")
        return None
    
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None

audio_file_path = verify_format( audio_file_path)

Successfully converted C:/Users/HP/Desktop/assignments/Final _Year/test_audio/sample_interview(1).mp3 to C:/Users/HP/Desktop/assignments/Final _Year/test_audio/sample_interview(1).wav


In [None]:
# Denoising audio file
print(f"Denoising audio - {audio_file_path} - request : {request_number}")
wav, sr = torchaudio.load(audio_file_path)
wav = convert_audio(wav, sr, denoising_model.sample_rate, denoising_model.chin)
with torch.no_grad():
    denoised = denoising_model(wav[None])[0]
    torchaudio.save(f'test_audio/denoised_no_{request_number}_{audio_file_path[:-3]}.wav', denoised, denoising_model.sample_rate)

audio_file_path = f'test_audio/denoised_no_{request_number}_{audio_file_path[:-3]}.wav'

Denoising audio - C:/Users/HP/Desktop/assignments/Final _Year/test_audio/sample_interview(1).wav - request : 1


In [None]:
# Step 3 - Audio Transcription
segments, _ = transcription_model.transcribe(audio_file_path)
transcript = ""
time_segments = []
for segment in segments:
    print(segment.text)
    transcript += segment.text[1:] + "\n"
    time_segments.append({"start": segment.start, "end": segment.end, "text": segment.text[1:]})

print(transcript)

 Man, what's that with me?
 Yes.
 Hi.
 Here's, I'll be your doctor today.
 I'm going to just wash my hands really quick.
 Would you perform this stone here?
 Oh, you can.
 Pat's fine.
 Great.
 Well, it's nice to meet you.
 Can you tell me why you're here today?
 I have a terrible headache.
 It looks really bad.
 Is there anything else besides your headache that you want to address here
 today at the clinical point?
 No.
 It's just that.
 Except I am concerned.
 I just recently changed insurance companies and I'm not sure this is going
 to be covered yet.
 What we can do is, well, we're talking and I'm doing your history
 and physical.
 I will have my office secretary look and do the insurance
 plane that you're gas.
 So you don't have to worry about that.
 That sounds good.
 Okay.
 Sounds great.
 Anything else?
 No.
 I just, this is just really bad.
 Okay.
 So what I'd like to do today, and let's take a look at what's causing
 your headache.
 All right.
 I will go over history, physica

In [None]:
# Step 3.5 - Speech dirization
# ─── 1) Define your ASR segments ────────────────────────────────────────────────
# time_segments = time_segments


# ─── 2) Load the full waveform once ────────────────────────────────────────────
audio_path = audio_file_path
waveform, sample_rate = torchaudio.load(audio_path)  # waveform: [channels, samples] ... We're also reloading after denoising

# ─── 3) Extract embeddings for each segment ────────────────────────────────────
# Note: The classifier expects a 1D tensor of audio samples, so we need to slice out the segments
MIN_SEGMENT_SEC = 0.5

embeddings = []
for seg in time_segments:
    # 1) Slice out the raw audio samples
    s = int(seg["start"] * sample_rate)
    e = int(seg["end"]   * sample_rate)
    clip = waveform[:, s:e]            # [channels, length]

    # 2) Convert to mono
    clip = clip.mean(dim=0)            # [length]

    # 3) Zero‑pad to a minimum length
    min_len = int(MIN_SEGMENT_SEC * sample_rate)
    if clip.shape[0] < min_len:
        padding = min_len - clip.shape[0]
        clip = torch.cat([clip, torch.zeros(padding)], dim=0)

    # 4) Now clip is a 1D tensor [time]. encode_batch will accept that.
    with torch.no_grad():
        emb = speech_dairization_classifier.encode_batch(clip)  # → [1, emb_dim]
    embeddings.append(emb[0].cpu().numpy())

embeddings = np.vstack(embeddings)

# ─── 4) Cluster embeddings into speakers ───────────────────────────────────────
n_speakers = 2  # adjust as needed
clustering = AgglomerativeClustering(n_clusters=n_speakers).fit(embeddings)
labels = clustering.labels_                               # array of length n_segments

# Assign speaker labels back into each segment
for seg, label in zip(segments, labels):
    seg["speaker"] = f"SPEAKER {label+1}"

# ─── 5) Merge consecutive segments with the same speaker ───────────────────────
merged = []
for seg in segments:
    if not merged or merged[-1]["speaker"] != seg["speaker"]:
        merged.append({
            "start":   seg["start"],
            "end":     seg["end"],
            "speaker": seg["speaker"],
            "text":    seg["text"].strip()
        })
    else:
        # extend the end and append text
        merged[-1]["end"] = seg["end"]
        merged[-1]["text"] += " " + seg["text"].strip()

# ─── 6) Write out a diarized transcript ────────────────────────────────────────
with open("output_transcripts/diarized_transcript.txt", "w", encoding="utf-8") as f:
    for seg in merged:
        ts = str(datetime.timedelta(seconds=int(seg["start"])))
        print(f"{seg['speaker']} [{ts}]: {seg['text']}\n")
        f.write(f"{seg['speaker']} [{ts}]: {seg['text']}\n")

print("Diarized transcript saved to diarized_transcript.txt")

In [29]:
# Step 4 - Text Classification

def classify_sentence(sentence):
    inputs = text_classification_tokenizer(sentence, return_tensors="pt", truncation=True, padding="max_length", max_length=128)
    outputs = text_classification_model(**inputs)
    logits = outputs.logits
    probabilities = torch.softmax(logits, dim=1).detach().cpu().numpy()[0]
    predicted_class = int(probabilities.argmax())
    return predicted_class, probabilities


for sentence in transcript:
    # Test on a new sentence
    pred_class, probs = classify_sentence(sentence)

    # Print the results
    labels = ["SmallTalk", "Demographics", "Treatment_Plan", "Past_Medical_History", "Active_Symptoms", "Test_Results", "Allergies", "Current_Conditions", "Clinical_Diagnosis", "Family_History"]
    print(f"Predicted class: {pred_class}, {labels[pred_class]}")
    print("Probabilities:", probs)

Predicted class: 0, SmallTalk
Probabilities: [0.13090172 0.09876203 0.10390221 0.09583224 0.10224502 0.11178298
 0.10571308 0.08255803 0.07861784 0.08968489]
Predicted class: 1, Demographics
Probabilities: [0.10985983 0.11362945 0.1015994  0.09502721 0.0999129  0.09536203
 0.08948737 0.10450061 0.09410802 0.09651329]
Predicted class: 5, Test_Results
Probabilities: [0.11080374 0.09703148 0.10954341 0.09616452 0.09705445 0.11478363
 0.09523308 0.0994366  0.08761609 0.09233303]
Predicted class: 2, Treatment_Plan
Probabilities: [0.10566767 0.10061268 0.10676946 0.10046227 0.10353696 0.09325973
 0.09458303 0.10341836 0.09207452 0.09961524]
Predicted class: 7, Current_Conditions
Probabilities: [0.10457605 0.10233615 0.09776613 0.09844018 0.10262644 0.10361329
 0.08058926 0.11140728 0.10380062 0.09484458]
Predicted class: 0, SmallTalk
Probabilities: [0.11987253 0.10713323 0.09753264 0.10259789 0.09405783 0.09491007
 0.09925766 0.11460137 0.08514669 0.08489002]
Predicted class: 5, Test_Results

In [None]:
# step 5 - Context Grouping and input correction

args = TTSettings(num_beams=5, min_length=1)

for sentence in transcript:
    # Add the prefix "grammar: " before each input 
    result = grammar_correction_model.generate_text(f"grammar: {sentence}", args=args)
    print(result.text)

In [None]:
# step 6 - Entity extraction

for i, text in enumerate(transcript, 1):
    print(f"\nExample {i}: {text}")
    doc = entity_extraction_model(text)
    for ent in doc.ents:
        print(f"  {ent.text}: {ent.label_}")