## `libraries, files and models`

In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
import os
import numpy as np
import re
from datetime import datetime
import librosa
import soundfile as sf
import noisereduce as nr
from scipy.signal import butter, lfilter
from pyannote.audio import Pipeline
import torchaudio
import torch
from speechbrain.inference import SpeakerRecognition
from speechbrain.utils.fetching import LocalStrategy
from transformers import pipeline as hf_pipeline
from tkinter import Tk
from tkinter.filedialog import askopenfilename
from huggingface_hub import InferenceClient 

In [3]:
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) if "__file__" in globals() else os.getcwd()
DB_PATH = os.path.join(BASE_DIR, "speaker_database.npy")
ENROLL_DIR = os.path.join(BASE_DIR, "processed_audio_enrollment")
TEST_DIR   = os.path.join(BASE_DIR, "processed_audio_test")
PDF_DIR = os.path.join(BASE_DIR, "docs")
os.makedirs(PDF_DIR, exist_ok=True)

In [4]:
BASE_DIR

'c:\\Users\\hp\\Downloads\\RAG\\Meeting_to_pdf'

In [5]:
HF_TOKEN = "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"

DIARIZATION_MODEL = "pyannote/speaker-diarization-3.1"
ASR_MODEL = "openai/whisper-small"
SPEAKER_DB_PATH = DB_PATH

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
TARGET_SR = 16000

In [6]:
client = InferenceClient(
    model="mistralai/Mistral-7B-Instruct-v0.2",
    token= HF_TOKEN
)

In [7]:
# =========================
# LOAD MODELS
# =========================
print("Loading diarization pipeline...")
diarization_pipeline = Pipeline.from_pretrained(
    DIARIZATION_MODEL,
    token=HF_TOKEN,
    revision="main"
).to(torch.device(DEVICE))

print("Loading ASR model...")
asr_pipeline = hf_pipeline(
    task="automatic-speech-recognition",
    model=ASR_MODEL,
    ignore_warning=True,
    device=0 if DEVICE == "cuda" else -1,
    generate_kwargs={
        "task": "transcribe",
        "language": "en"
    }
)

print("Loading speaker recognition model...")
speaker_verification = SpeakerRecognition.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb",
    savedir="pretrained_models/spkrec",
    local_strategy=LocalStrategy.COPY_SKIP_CACHE
)

print("\n ====Models are loaded successfully====")

Loading diarization pipeline...
Loading ASR model...


Device set to use cpu


Loading speaker recognition model...

 ====Models are loaded successfully====


In [8]:
# Create Speaker Database

if os.path.exists(DB_PATH):
    speaker_db = np.load(DB_PATH, allow_pickle=True).item()
else:
    speaker_db = {}

## `Preprocessing`

In [9]:
def bandpass_filter(audio, sr, low=80, high=7500):
    b, a = butter(4, [low/(sr/2), high/(sr/2)], btype="band")
    return lfilter(b, a, audio)


In [10]:
# Remove Silence Parts

# def apply_vad(audio, sr, pad_ms=150):
#     model, utils = torch.hub.load(
#         "snakers4/silero-vad",
#         "silero_vad",
#         force_reload=False
#     )
#     get_speech_timestamps = utils[0]

#     speech_timestamps = get_speech_timestamps(audio, model, sampling_rate=sr)

#     pad = int(sr * pad_ms / 1000)
#     segments = []

#     for seg in speech_timestamps:
#         start = max(0, seg["start"] - pad)
#         end = min(len(audio), seg["end"] + pad)
#         segments.append(audio[start:end])

#     return np.concatenate(segments) if segments else audio

def apply_vad(audio, sr):
    model, utils = torch.hub.load(
        repo_or_dir="snakers4/silero-vad",
        model="silero_vad",
        force_reload=False
    )
    get_speech_timestamps = utils[0]

    speech_timestamps = get_speech_timestamps(audio, model, sampling_rate=sr)

    speech_audio = []
    for seg in speech_timestamps:
        speech_audio.append(audio[seg["start"]:seg["end"]])

    return np.concatenate(speech_audio)

In [11]:
def preprocess_audio(
    input_wav,
    output_dir,
    suffix="_preprocessed",
    target_sr=16000
):
    os.makedirs(output_dir, exist_ok=True)

    base_name = os.path.splitext(os.path.basename(input_wav))[0]
    output_wav = os.path.join(
        output_dir,
        f"{base_name}{suffix}.wav"
    )

    audio, sr = librosa.load(input_wav, sr=target_sr, mono=True)

    audio = librosa.util.normalize(audio)
    audio = nr.reduce_noise(y=audio, sr=sr, prop_decrease=0.8)
    audio = bandpass_filter(audio, sr)
    audio = apply_vad(audio, sr)
    audio = librosa.util.normalize(audio)

    sf.write(output_wav, audio, sr)
    return output_wav


## `Upload files`

In [12]:
# Upload WAV Files

def select_audio_file():
    root = Tk()
    root.update()

    file_path = askopenfilename(
        title="Select an audio file",
        filetypes=[("WAV files", "*.wav")]
    )

    root.destroy()
    return file_path

## `Adding Speakers to the DB`

In [13]:
# Extract the Embeddings

def extract_embedding(wav_path):
    waveform, sr = torchaudio.load(wav_path)

    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)

    if sr != 16000:
        waveform = torchaudio.functional.resample(waveform, sr, 16000)

    with torch.no_grad():
        emb = speaker_verification.encode_batch(waveform.to(DEVICE))

    return emb.squeeze().cpu().numpy()

In [14]:
def add_speaker(audio_path):
    global speaker_db

    speaker_name = os.path.splitext(os.path.basename(audio_path))[0]

    processed_path = preprocess_audio(
        input_wav=audio_path,
        output_dir=ENROLL_DIR,
        suffix="_enroll"
    )

    embedding = extract_embedding(processed_path)

    speaker_db[speaker_name] = embedding
    np.save(DB_PATH, speaker_db)

    print(f"[✓] Speaker '{speaker_name}' added")


In [15]:
def add_speaker_from_dialog():
    audio_path = select_audio_file()

    if not audio_path:
        print("[!] No file selected")
        return

    add_speaker(audio_path)

In [18]:
add_speaker_from_dialog()

Using cache found in C:\Users\hp/.cache\torch\hub\snakers4_silero-vad_master


[✓] Speaker 'Zira' added


## `Handling the Audio of the Meeting`

In [19]:
def process_query_audio(audio_path):
    processed_path = preprocess_audio(
        input_wav=audio_path,
        output_dir=TEST_DIR,
        suffix="_query"
    )

    print("[✓] Audio processed successfully")
    return processed_path

In [20]:
# =========================
# LOAD AUDIO
# =========================
def upload_process_audio():
    audio_path = select_audio_file()

    if not audio_path:
        print("[!] No file selected")
        return None

    return process_query_audio(audio_path)

def generate_transcription(DEVICE, SPEAKER_DB_PATH, ASR_MODEL, DIARIZATION_MODEL, HF_TOKEN, AUDIO_PATH, TARGET_SR= 16000):
    # =========================
    # LOAD AUDIO
    # =========================
    waveform, sample_rate = torchaudio.load(AUDIO_PATH)
    
    if sample_rate != TARGET_SR:
        waveform = torchaudio.functional.resample(
            waveform, sample_rate, TARGET_SR
        )
        sample_rate = TARGET_SR
    
    # =========================
    # HELPER FUNCTIONS
    # =========================
    def extract_segment(waveform, start, end, sr):
        start_sample = int(start * sr)
        end_sample = int(end * sr)
        return waveform[:, start_sample:end_sample]
    
    def identify_speaker(segment, sample_rate):
        min_duration_sec = 0.5
        if segment.shape[1] < int(min_duration_sec * sample_rate):
            return "Unknown", 0.0
    
        with torch.no_grad():
            emb = speaker_verification.encode_batch(segment.to(DEVICE))
            emb = emb.squeeze()
    
        best_speaker = "Unknown"
        best_score = 0.0
    
        for name, ref_emb in speaker_db.items():
            score = torch.nn.functional.cosine_similarity(
                emb, torch.tensor(ref_emb).to(emb.device), dim=0
            )
            if score > best_score:
                best_score = score
                best_speaker = name
    
        if best_score < 0.3:
            best_speaker = "Unknown"
    
        return best_speaker, float(best_score)
    
    
    # =========================
    # RUN DIARIZATION
    # =========================
    print("\nRunning diarization...")
    diarization = diarization_pipeline({
        "waveform": waveform,
        "sample_rate": sample_rate
    })
    
    print("\n--- FINAL SPEAKER-ATTRIBUTED TRANSCRIPT ---\n")

    transcript_lines = []
    
    for turn, _, _ in diarization.speaker_diarization.itertracks(yield_label=True):
        start, end = turn.start, turn.end

        segment = extract_segment(waveform, start, end, sample_rate)
        speaker, _ = identify_speaker(segment, sample_rate)

        segment_np = segment[0].cpu().numpy()
        text = asr_pipeline(segment_np, chunk_length_s=0)["text"].strip()

        transcript_lines.append(
            f"{speaker}:\n{text}\n"
        )

    full_transcript = "\n".join(transcript_lines)

    return full_transcript

In [None]:
AUDIO_PATH = upload_process_audio()

if AUDIO_PATH is None:
    print("[!] No audio to process")
else:
    meeting_example= generate_transcription(
        DEVICE,
        SPEAKER_DB_PATH,
        ASR_MODEL,
        DIARIZATION_MODEL,
        HF_TOKEN,
        AUDIO_PATH
    )

Using cache found in C:\Users\hp/.cache\torch\hub\snakers4_silero-vad_master


[✓] Audio processed successfully

Running diarization...


In [28]:
print(meeting_example)

David2:
Thank you for calling Union Mobile. My name is Ray. How can I assist you today?

Zira2:
Hi, I'm having some issues with my phone service. I've been experiencing dropped calls, poor reception, and data connectivity problems.

David2:
Sorry to hear that, Alyssa. Can you please verify your identity for me? I'll do my best to help you resolve the issue.

Zira2:
Sure, my account pin is 1234.

David2:
Thank you, Alyssa. I'm unable to verify your identity with the PU provided. Can you please try again?

Zira2:
Okay, let me check again. My account pin is 5678.

David2:
I apologize, but that doesn't seem to be working either. Can you please try one more time for me?

Zira2:
Alright, my account pin is 9,012.

David2:
Thank you, Alyssa. I'm now able to verify your identity. I see that you've had some issues with your phone service. Can you tell me a little bit more about what's been happening?

Zira2:
Yeah, like I said, I've been experiencing dropped calls, poor reception, and data connec

## `Generate the Report and Save it in PDF`

In [30]:
def generate_report(points):
    prompt = f"""
You are a meeting report generator AI.

Using the following extracted points:

{points}

Create a professional structured meeting report using this format:

Meeting Title:
Date:
Participants:

Summary:

Key Discussion Points:
- …

Decisions Made:
- …

Action Items:
- Person:
  Task:
  Deadline:

Risks & Concerns:
- …

Next Meeting:
- Date:
"""

    response = client.chat.completions.create(
        messages=[{"role": "user", "content": prompt}],
        max_tokens=500,
        temperature=0.3
    )

    return response.choices[0].message["content"]

In [32]:
report_text = generate_report(meeting_example)

In [33]:
report_text

" Meeting Title: Union Mobile Customer Support Call with Zira2\nDate: [Current Date]\nParticipants: David2 (Union Mobile Representative), Zira2 (Union Mobile Customer)\n\nSummary:\nDuring the call, Zira2 reported experiencing issues with her phone service, including dropped calls, poor reception, and data connectivity problems. David2 attempted to verify Zira2's identity using her account pin but was unable to do so. After successfully verifying her identity, David2 gathered more information about the issues she was experiencing and discovered potential network issues in Zira2's area. He escalated the issue to the engineering team and offered her a complimentary one-month subscription to Union Mobile's premium data plan as a gesture of goodwill.\n\nKey Discussion Points:\n- Zira2 reported experiencing dropped calls, poor reception, and data connectivity problems with her Union Mobile service.\n- David2 was unable to verify Zira2's identity using the provided account pin.\n- After succe

In [34]:
def extract_meeting_title(report_text, fallback="Meeting_Report"):
    """
    Extracts the Meeting Title from the generated report.
    Returns a filename-safe version.
    """
    match = re.search(r"Meeting Title:\s*(.+)", report_text)

    if not match:
        return fallback

    title = match.group(1).strip()

    # Sanitize for file system
    title = re.sub(r'[\\/*?:"<>|]', "", title)  # remove illegal chars
    title = re.sub(r"\s+", "_", title)          # spaces → underscores

    return title

In [49]:
save_report_as_pdf(report_text)

[✓] PDF saved to C:\Users\hp\Desktop\Meeting assistant\docs\2025-12-18_Union_Mobile_Customer_Support_Call_with_Zira2.pdf


In [57]:
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, ListFlowable, ListItem
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.pagesizes import A4
from reportlab.lib.enums import TA_LEFT
from reportlab.lib.units import inch
import re

# Save the Output in PDF

def save_report_as_pdf(report_text):
    
    pdf_title = extract_meeting_title(report_text)

    date_str = datetime.now().strftime("%Y-%m-%d")

    output_path = os.path.join(PDF_DIR, f"{date_str}_{pdf_title}.pdf")
    
    doc = SimpleDocTemplate(
        output_path,
        pagesize=A4,
        rightMargin=40,
        leftMargin=40,
        topMargin=40,
        bottomMargin=40
    )

    styles = getSampleStyleSheet()

    header_style = ParagraphStyle(
        "Header",
        parent=styles["Normal"],
        fontSize=12,
        fontName="Helvetica-Bold",
        spaceBefore=16,
        spaceAfter=8,
        alignment=TA_LEFT
    )

    body_style = ParagraphStyle(
        "Body",
        parent=styles["Normal"],
        fontSize=10,
        spaceAfter=10,
        leading=14
    )

    story = []

    lines = report_text.split("\n")

    bullet_buffer = []

    def flush_bullets():
        nonlocal bullet_buffer
        if bullet_buffer:
            story.append(
                ListFlowable(
                    [
                        ListItem(
                            Paragraph(item, body_style),
                            leftIndent=18
                        )
                        for item in bullet_buffer
                    ],
                    bulletType="bullet"
                )
            )
            bullet_buffer = []

    for line in lines:
        line = line.strip()

        if not line:
            flush_bullets()
            story.append(Spacer(1, 0.15 * inch))
            continue

        # Section headers
        if re.match(
            r"^(Meeting Title|Date|Participants|Summary|Key Discussion Points|Decisions Made|Action Items|Risks & Concerns|Next Meeting):",
            line
        ):
            flush_bullets()
            header = line.replace(":", "")
            story.append(Paragraph(header, header_style))
            continue

        # Bullet points
        if line.startswith("-"):
            bullet_buffer.append(line[1:].strip())
            continue

        # Normal paragraph
        flush_bullets()
        story.append(Paragraph(line, body_style))

    flush_bullets()
    doc.build(story)

    print(f"[✓] Formatted PDF saved to {output_path}")


In [56]:
save_report_as_pdf(report_text)

[✓] Formatted PDF saved to C:\Users\hp\Desktop\Meeting assistant\docs\2025-12-18_Union_Mobile_Customer_Support_Call_with_Zira2.pdf


In [1]:
import sys
sys.executable


'c:\\Users\\hp\\anaconda3\\envs\\audio_rec\\python.exe'

In [2]:
import torchaudio

ModuleNotFoundError: No module named 'torchaudio'

In [1]:
import torch

ModuleNotFoundError: No module named 'torch'

In [None]:
import torchaudio

ModuleNotFoundError: No module named 'torchaudio'