In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
num_speakers = 2 #@param {type:"integer"}
language = 'English' #@param ['any', 'English']
model_size = 'large' #@param ['tiny', 'base', 'small', 'medium', 'large']
model_name = model_size
if language == 'English' and model_size != 'large':
  model_name += '.en'

In [3]:
!pip install -q git+https://github.com/openai/whisper.git > /dev/null
!pip install -q git+https://github.com/pyannote/pyannote-audio > /dev/null

import whisper
import datetime

import subprocess

import torch
import pyannote.audio
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
embedding_model = PretrainedSpeakerEmbedding(
    "speechbrain/spkrec-ecapa-voxceleb",
    device=torch.device("cuda"))

from pyannote.audio import Audio
from pyannote.core import Segment

import wave
import contextlib

from sklearn.cluster import AgglomerativeClustering
import numpy as np

In [4]:
path = '/content/drive/MyDrive/call_logs/'

In [5]:
import os
import subprocess
import contextlib
import wave
import datetime
import numpy as np
import pandas as pd

In [None]:
# model = whisper.load_model(model_size)

def segment_embedding(segment, audio, path, duration):
    start = segment["start"]
    end = min(duration, segment["end"])
    clip = Segment(start, end)
    waveform, sample_rate = audio.crop(path, clip)
    return embedding_model(waveform[None])

def time(secs):
    return str(datetime.timedelta(seconds=round(secs)))

# Create the transcriptions folder if it doesn't exist
if not os.path.exists('/content/drive/MyDrive/transcriptions'):
    os.makedirs('/content/drive/MyDrive/transcriptions')

# Process each call recording in the specified directory
directory = 'path_to_call_recordings'  # Replace with the directory containing your call recordings
directory = '/content/drive/MyDrive/call_logs/'
for filename in os.listdir(directory)[3:]:
    if filename.endswith(('.wav', '.mp3', '.mp4', '.m4a', '.flac')):
        path = os.path.join(directory, filename)
        if os.path.getsize(path) < 100 * 1024:
            continue

        # Convert to WAV if necessary
        if not filename.endswith('.wav'):
            subprocess.call(['ffmpeg', '-i', path, '/content/drive/MyDrive/call_logs/audio.wav', '-y'])
            path = '/content/drive/MyDrive/call_logs/audio.wav'

        # Transcribe and diarize the audio
        result = model.transcribe(path)
        segments = result["segments"]
        with contextlib.closing(wave.open(path,'r')) as f:
            frames = f.getnframes()
            rate = f.getframerate()
            duration = frames / float(rate)

        audio = Audio()
        embeddings = np.zeros(shape=(len(segments), 192))
        for i, segment in enumerate(segments):
            embeddings[i] = segment_embedding(segment, audio, path, duration)

        embeddings = np.nan_to_num(embeddings)
        clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
        labels = clustering.labels_
        for i in range(len(segments)):
            segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1)

        # Write the transcription to a CSV file
        transcript_data = []
        for (i, segment) in enumerate(segments):
            if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]:
                transcript_data.append([segment["speaker"], time(segment["start"]), segment["text"]])
            else:
                transcript_data[-1][2] += ' ' + segment["text"]

        df = pd.DataFrame(transcript_data, columns=['Speaker', 'Time', 'Text'])
        output_file = os.path.join('/content/drive/MyDrive/transcriptions', f'{os.path.splitext(filename)[0]}_transcription.csv')
        df.to_csv(output_file, index=False)

        # Clean up temporary files
        if path == '/content/drive/MyDrive/call_logs/audio.wav':
            os.remove('/content/drive/MyDrive/call_logs/audio.wav')

In [None]:
# df = pd.DataFrame(data)
# # Save the DataFrame to an Excel file
# excel_file = '/content/drive/MyDrive/Call_Recordings_Analysis_9900.xlsx'
# df.to_excel(excel_file, index=False)
# print(len(df))
# print("Analysis complete and saved to:", excel_file)

195
Analysis complete and saved to: /content/drive/MyDrive/Call_Recordings_Analysis_9900.xlsx


In [None]:
# if path[-3:] != 'wav':
#   subprocess.call(['ffmpeg', '-i', path, 'audio.wav', '-y'])
#   path = 'audio.wav'

# model = whisper.load_model(model_size)
# result = model.transcribe(path)
# segments = result["segments"]
# with contextlib.closing(wave.open(path,'r')) as f:
#   frames = f.getnframes()
#   rate = f.getframerate()
#   duration = frames / float(rate)
# audio = Audio()

# def segment_embedding(segment):
#   start = segment["start"]
#   # Whisper overshoots the end timestamp in the last segment
#   end = min(duration, segment["end"])
#   clip = Segment(start, end)
#   waveform, sample_rate = audio.crop(path, clip)
#   return embedding_model(waveform[None])

# embeddings = np.zeros(shape=(len(segments), 192))
# for i, segment in enumerate(segments):
#   embeddings[i] = segment_embedding(segment)

# embeddings = np.nan_to_num(embeddings)

# clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
# labels = clustering.labels_
# for i in range(len(segments)):
#   segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1)

# def time(secs):
#   return datetime.timedelta(seconds=round(secs))

# f = open("transcript.txt", "w")

# for (i, segment) in enumerate(segments):
#   if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]:
#     f.write("\n" + segment["speaker"] + ' ' + str(time(segment["start"])) + '\n')
#   f.write(segment["text"][1:] + ' ')
# f.close()