Install dependenies

In [None]:
!pip install requests bs4 whisper openai-whisper langchain langchain_openai

In [None]:
from pathlib import Path

directory_list = ["audio", "transcripts", "text", "summary"]

for directory in directory_list:
    Path(f"/content/{directory}").mkdir(parents=True, exist_ok=True)

Get a list of meetings to download. Update the `meetings` list to add or remove a meeting type to download.

In [None]:
import requests, os
from bs4 import BeautifulSoup

url = "https://townhallstreams.com/towns/sunapee_nh"
meetings = [
    "Zoning Board Meeting",
    "Selectboard Meeting",
    "Planning Board Meeting",
    "Zoning_Board_Meeting",
    "Selectboard_Meeting",
    "Planning_Board_Meeting"
]

# The meeting links on the main page are .php with an id. This redirects
# to the actual video url. From there we can extract the m3u8 stream
def _get_meeting_urls_from_root_page(url):
    response = requests.get(url, verify=False)
    soup = BeautifulSoup(response.content, "html.parser")
    meeting_urls = []

    # filter only relevant meetings
    for link in soup.find_all("a"):
        if any(meeting in link.text for meeting in meetings):
            meeting_urls.append(link["href"])
    return meeting_urls

def _get_video_url_from(meeting_url):
    response = requests.get(f"https://townhallstreams.com{meeting_url}", verify=False)
    soup = BeautifulSoup(response.content, "html.parser")

    for video in soup.find_all("script", attrs={"type":"text/javascript"}):
        if len(video) != 0:
            if any(meeting in video.string for meeting in meetings):
                # extract the m3u8 link from the video tag
                m3u8_start_index = video.contents[0].string.find("https://")
                m3u8_end_index = video.contents[0].string.find(".m3u8") + len(".m3u8")
                return video.contents[0].string[m3u8_start_index:m3u8_end_index]

def get_list_of_video_urls(url):
    video_urls = []
    meeting_urls = _get_meeting_urls_from_root_page(url)

    for meeting_url in meeting_urls:
        video_url = _get_video_url_from(meeting_url)
        if video_url is not None:
            video_urls.append(video_url)
    return video_urls


video_urls = get_list_of_video_urls(url)

Convert video files to `.mp3` and download.

In [None]:
import shlex, subprocess, os

audio_files = [file for file in os.listdir("/content/audio") if file.endswith(".mp3")]
files_to_skip = []

# get a list of audio_files that are in the video_urls
for video_url in video_urls:
    for audio_file in audio_files:
        if audio_file.split(".mp3")[0] in video_url:
            files_to_skip.append(video_url)

# download any missing videos
for video_url in video_urls:
    if video_url not in files_to_skip:
        print(f"Downloading {video_url}")
        subprocess.run(shlex.split((f"ffmpeg -n -i {video_url} /content/audio/{video_url.split('/')[-2].strip('.mp4')}.mp3")))
    else:
        print(f"Skipping {video_url}")

Transcribe `.mp3` file. Output is a `.json` file which includes metatdata about the transcription such as the time series of text.

In [None]:
import whisper
import json

audio_files = [file for file in os.listdir("/content/audio") if file.endswith(".mp3")]
transcript_files = [file for file in os.listdir("/content/transcripts") if file.endswith(".json")]
model = whisper.load_model('medium.en')

# remove audio files that already have a transcript
for transcript_file in transcript_files:
    print(f"Removing {transcript_file}")
    audio_files.remove(f"{transcript_file.strip('.json')}.mp3")

for audio_file in audio_files:
    print(f"Transribing {audio_file}...")
    result = model.transcribe(f"/content/audio/{str(audio_file)}", language='en', verbose=True)
    with open(f"/content/transcripts/{audio_file.strip('.mp3')}.json", "w") as file:
        json.dump(result, file, indent=4)

Extract the text from the transcript `.json` file.

In [18]:
import json

transcript_files = [file for file in os.listdir("/content/transcripts") if file.endswith(".json")]

for transcript_file in transcript_files:
    with open(f"/content/text/{transcript_file.strip('.json')}.txt", "w") as f:
        f.write(json.load(open(f"/content/transcripts/{transcript_file}"))["text"])

Create meeting summary. This requires an OpenAI API key.


In [None]:
import os

from langchain.prompts import PromptTemplate
from langchain.chains.summarize import load_summarize_chain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import ChatOpenAI


llm = ChatOpenAI(model_name="gpt-3.5-turbo", max_tokens=2000)
summary_chain = load_summarize_chain(llm=llm, chain_type='map_reduce')

prompt_template = """You are an AI assistant tasked with creating a detailed summary from the provided meeting transcription,
                  indicated with {text}.\n\n\n
                  First, analyze the entire {text} to identify all cases discussed.
"""

PROMPT = PromptTemplate(template=prompt_template, input_variables=["text"])

refine_template = ("""For each case, provide detailed notes from the {text}. Focus on capturing
                   all the relevant details discussed during the meeting related to the individual cases, formatted
                   for easy readability. Include the case numbers, parcel ids, applicant names, special exceptions, variances, and article numbers. Avoid truncation or
                   summary but instead strive for as much detail as possible, articulated in grammatically correct English.
"""
)
refine_prompt = PromptTemplate(
    input_variables=["existing_answer", "text"],
    template=refine_template,
)
chain = load_summarize_chain(
    llm,
    chain_type="map_reduce",
    verbose=False,
    map_prompt=PROMPT,
    combine_prompt=refine_prompt
)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=3000, chunk_overlap=1000)

text_files = [file for file in os.listdir("text")]
summary_files = [file for file in os.listdir("summary")]

for summary_file in summary_files:
    print(f"Removing {summary_file}")
    text_files.remove(f"{summary_file.strip('_Summary.txt')}.txt")

for file in text_files:
    print(f"Processing {file}")
    with open(f"text/{file}", "r") as f:
        raw_text = f.read()

        docs = text_splitter.create_documents([raw_text])
        print(f"Processing: {file}")
        output = chain.invoke(docs)
        print(output["output_text"])
        with open(f"summary/{file.split('.txt')}_Summary.txt", "w") as f:
            f.write(output["output_text"])