In [17]:
import json
import os
import openai
import whisper
from whisper.utils import format_timestamp
from sentence_transformers import SentenceTransformer, util

In [18]:
openai.api_key = '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' # os.getenv("OPENAI_API_KEY")

In [19]:
def create_transcript_str(whisper_out):
    
    transcript_str = []
    for segment in whisper_out['segments']:
        transcript_str.append(f"[{format_timestamp(segment['start'])}]:\t{segment['text']}")
    
    transcript_str = "\n".join(transcript_str)
    return transcript_str

def postprocess_points(raw_output):
    points = raw_output.split('\n-')
    points = [point.strip() for point in points]
    points = [point for point in points if point != '']
    return points

def find_closest_time(time, all_times):
    closest_time = min(all_times, key=lambda x: abs(x - time))
    return closest_time

def create_transcript_chunks(all_start_times, all_end_times, whisper_out, stride=45, length=60):
    '''Create larger chunks of the segments using a sliding window'''

    transcript_chunks = []
    for seek in range(0, int(all_end_times[-1]), stride):
        chunk = {'start': None, 'end': None, 'text': None}

        start_index = all_start_times.index(find_closest_time(seek, all_start_times))
        chunk['start'] = all_start_times[start_index]
        end_index = all_end_times.index(find_closest_time(seek + length, all_end_times))
        chunk['end'] = all_end_times[end_index]

        chunk['text'] = "".join([segment['text'] for segment in whisper_out['segments'][start_index:end_index+1]]).strip()

        transcript_chunks.append(chunk)
    
    return transcript_chunks

In [20]:
class LISAPipeline():
    def __init__(self, whisper_model, search_model):
        self.whisper_model = whisper.load_model(whisper_model)
        self.search_model = SentenceTransformer.load(search_model)
    
    def run_gpt3(self, prompt, max_tokens=256, temperature=0.5, top_p=1, frequency_penalty=0.0, presence_penalty=0.0):
        response = openai.Completion.create(
            engine="text-davinci-002",
            prompt=prompt,
            max_tokens=max_tokens,
            temperature=temperature,
            top_p=top_p,
            frequency_penalty=frequency_penalty,
            presence_penalty=presence_penalty
        )
        return response.choices[0].text
    
    def transcribe(self, audio_path):
        whisper_out = self.whisper_model.transcribe(audio_path, verbose=False)
        return whisper_out
    
    def minutes_of_meeting(self, transcript_str):

        mom_prompt = f"""Generate the minutes of the meeting for the following transcript:
        Meeting Transcription:
        {transcript_str}

        Meeting Minutes:
        -"""

        raw_minutes = '\n-' + self.run_gpt3(mom_prompt, temperature=0.5)
        minutes = postprocess_points(raw_minutes)

        return minutes

    def action_items(self, transcript_str):

        if 'hey lisa' not in transcript_str.lower():
            return []

        action_prompt = f"""Extract the Action Items / To-Do List from the Transcript.
        Meeting Transcription:
        {transcript_str}

        Action Items:
        -"""
        raw_action_items = self.run_gpt3(action_prompt, temperature=0.4)
        action_items = postprocess_points(raw_action_items)

        return action_items

    def create_index(self, whisper_out):
        '''Create search index by embedding the transcript segments'''
        all_start_times = [segment['start'] for segment in whisper_out['segments']]
        all_end_times = [segment['end'] for segment in whisper_out['segments']]

        transcript_chunks = create_transcript_chunks(all_start_times, all_end_times, whisper_out, stride=45, length=60)

        # Encode query and documents
        chunk_texts = [chunk['text'] for chunk in transcript_chunks]
        doc_emb = self.search_model.encode(chunk_texts)

        return doc_emb, transcript_chunks
    
    def search(self, doc_embeddings, transcript_chunks, query, top_k=3, threshold=16):
        # Compute dot score between query and all document embeddings
        query_embeddings = self.search_model.encode(query)
        scores = util.dot_score(query_embeddings, doc_embeddings)[0].cpu().tolist()

        chunks = [(chunk['start'], chunk['end'], chunk['text']) for chunk in transcript_chunks]

        # Combine docs & scores
        chunk_score_tuples = [(*chunks[i], scores[i]) for i in range(len(chunks))]

        # Sort by decreasing score
        chunk_score_tuples = sorted(chunk_score_tuples, key=lambda x: x[-1], reverse=True)

        # Output passages & scores
        results = []
        for start, end, text, score in chunk_score_tuples[:top_k]:
            if score > threshold:
                results.append((start, end, text))

        return results

    def __call__(self, audio_path):
        '''Run the pipeline on an audio file'''
        whisper_out = self.transcribe(audio_path)
        transcript_str = create_transcript_str(whisper_out)
        minutes = self.minutes_of_meeting(transcript_str)
        action_items = self.action_items(transcript_str)
        doc_emb, transcript_chunks = self.create_index(whisper_out)

        return minutes, action_items, doc_emb, transcript_chunks

In [21]:
lisa = LISAPipeline(whisper_model="whisper_models/medium.pt", search_model="multi-qa-mpnet-base-dot-v1")

In [23]:
minutes, action_items, doc_emb, transcript_chunks = lisa('Recording2.mp3')

Detected language: English


100%|██████████| 418/418 [00:02<00:00, 164.41frames/s]


In [30]:
import numpy as np

In [None]:
np.array(doc_emb.tolist()) == doc_emb

: 

In [None]:
whisper_out = lisa.transcribe('Recording.m4a')

In [None]:
transcript_str = create_transcript_str(whisper_out)
minutes = lisa.minutes_of_meeting(transcript_str)
action_items = lisa.action_items(transcript_str)

In [None]:
doc_emb, transcript_chunks = lisa.create_index(whisper_out)

In [24]:
minutes

['Bhavish introduced himself.']

In [25]:
action_items

[]

In [26]:
lisa.search(doc_emb, transcript_chunks, 'explain the issue with deep learning with an example')

[]

In [None]:
import moviepy.editor as mp

In [None]:
clip = mp.VideoFileClip('Recording.mp4')

In [None]:
clip.audio.write_audiofile("Recording2.mp3")