In [None]:
import os
import subprocess
import re
import json
source_dir = "C:\\projects\\local\\video_analysis"
video_dir = 'video'
audio_dir = 'audio'
text_dir = 'text-faster'
date_dir = 'date'
ffmpeg_path = os.environ['LOCALAPPDATA'] + "\\Microsoft\\WinGet\\Packages\\Gyan.FFmpeg_Microsoft.Winget.Source_8wekyb3d8bbwe\\ffmpeg-6.0-full_build\\bin\\ffmpeg.exe"
import whisper
from faster_whisper import WhisperModel
import torch
os.environ["OPENAI_API_TYPE"]="azure"
os.environ["OPENAI_API_VERSION"]="2023-05-15"
whisper_model_name = None #'medium.en'
faster_whisper_model_name = 'large-v2'
whisper_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
if whisper_model_name is None:
    print('Using faster-whisper model {} on device {}'.format(faster_whisper_model_name, whisper_device))
else:
    print('Using whisper model {} on device {}'.format(whisper_model_name, whisper_device))

# get settings from files
with open(os.path.join(source_dir, "openai-base.txt"), "r") as f:
    os.environ["OPENAI_API_BASE"] = f.read()
with open(os.path.join(source_dir, "openai-key.txt"), "r") as f:
    os.environ["OPENAI_API_KEY"] = f.read()

from langchain.llms import AzureOpenAI
from langchain.chat_models import AzureChatOpenAI
from langchain.docstore.document import Document
from langchain.prompts.chat import PromptTemplate, ChatPromptTemplate
from langchain.chains.question_answering import load_qa_chain
from langchain.chains import (
    StuffDocumentsChain,
    LLMChain,
    ReduceDocumentsChain,
    MapReduceDocumentsChain,
    AnalyzeDocumentChain,
)
chat_model = AzureChatOpenAI(
    deployment_name="gpt-35-turbo-16k",
    temperature=0
)


In [None]:
# find all mp3 files in source_dir (and subdirectories) that do not yet have a mp3 file and convert them with ffmpeg
def move_by_extension(src, target, extension, newEx):
    print('moving {} from {} to {}'.format(extension, os.path.join(source_dir, src), os.path.join(source_dir, target)))
    start = os.path.join(source_dir, src)
    for root, dirs, files in os.walk(start):
        relpath = os.path.relpath(root, start)
        for file in files:
            if file.endswith(extension):
                f = os.path.join(source_dir, src, relpath, file)
                t = os.path.join(source_dir, target, relpath, file[:-len(extension)] + newEx)
                os.makedirs(os.path.dirname(t), exist_ok=True)
                os.rename(f, t)
                # print('moving {} to {}'.format(f, t))
                # return
# use this to reorg from old structure to new structure
# move_by_extension(video_dir, audio_dir, '.mp3', '.mp3')
# move_by_extension(video_dir, text_dir, '.txt', '.txt')
# move_by_extension(video_dir, date_dir, '.date', '.txt')
# move_by_extension(video_dir, 'ext-folder', '.ext', '.txt')




In [None]:
# find all mp4 files in source_dir (and subdirectories) that do not yet have a mp3 file and convert them with ffmpeg
def process_files():
    start = os.path.join(source_dir, video_dir)
    for root, dirs, files in os.walk(start):
        relpath = os.path.relpath(root, start)
        for file in files:
            if file.endswith(".mp4"):
                mp4_file = os.path.join(source_dir, video_dir, relpath, file)
                mp3_file = os.path.join(source_dir, audio_dir, relpath, file[:-4] + '.mp3')
                if not os.path.isfile(mp3_file):
                    print('want to create {} from {}'.format(mp3_file, mp4_file))
                    return
                    cmd = [ffmpeg_path, '-i', mp4_file, '-vn', '-ar', '44100', '-ac', '2', '-ab', '192k', '-f', 'mp3', mp3_file]
                    print(cmd)
                    proc = subprocess.Popen(cmd)
                    result = proc.wait()
                    print("{} - processed from {}".format(result, mp4_file))
process_files()
print('generated MP3s for all MP4s in ' + source_dir)



In [None]:
# find all mp3 files in source_dir (and subdirectories) that do not yet have a txt file and convert them with whisper
def process_files():
    model = None
    start = os.path.join(source_dir, audio_dir)
    for root, dirs, files in os.walk(start):
        relpath = os.path.relpath(root, start)
        for file in files:
            if file.endswith(".mp3"):
                mp3_file = os.path.join(source_dir, audio_dir, relpath, file)
                txt_file = os.path.join(source_dir, text_dir, relpath, file[:-4] + '.txt')
                json_file = os.path.join(source_dir, text_dir, relpath, file[:-4] + '.json')
                if not os.path.isfile(txt_file) or not(os.path.isfile(json_file)):
                    print('using {} to make {} and {}'.format(mp3_file, txt_file, json_file))
                    if model is None:
                        if whisper_model_name is None:
                            print('loading {} faster-whisper model on device {}...'.format(faster_whisper_model_name, whisper_device.type))
                            model = WhisperModel(faster_whisper_model_name, device=whisper_device.type, compute_type="float16")
                            print('loaded faster-whisper model - processing file')
                        else:
                            print('loading {} whisper model on device {}...'.format(whisper_model_name, whisper_device))
                            model = whisper.load_model(whisper_model_name, device=whisper_device)
                            print('loaded whisper model - processing file')

                    segments = None
                    if whisper_model_name is None:
                        segments, info = model.transcribe(mp3_file, beam_size=5)
                        # force evaluation now
                        segments = list(segments)
                    else:
                        result = model.transcribe(mp3_file)
                        segments = result['segments']

                        # result has the following structure:
                        #  text: string
                        #  language: string
                        #  segments: array of
                        #    id: number
                        #    seek: number
                        #    start: number
                        #    end: number
                        #    text: string
                        #    tokens: number[]
                        #    temperature: number
                        #    avg_logprob: number
                        #    compression_ratio: number
                        #    no_speech_prob: number

                    # if we used text, we'll get one giant line.  Instead, we'll use segments
                    print('got {} segments from {}'.format(len(segments), mp3_file))
                    os.makedirs(os.path.dirname(txt_file), exist_ok=True)
                    with open(txt_file, 'w', encoding='utf-8') as f:
                        for segment in segments:
                            if whisper_model_name is None:
                                f.write(segment.text.strip() + '\n')
                            else:
                                f.write(segment['text'].strip() + '\n')
                    tojson = segments
                    if whisper_model_name is None:
                        # convert list of tuples to list of dicts
                        tojson = []
                        for segment in segments:
                            # completely guessed by CoPilot - I just had to change it from a [0]/etc. to .id/etc.
                            tojson.append({
                                'id': segment.id,
                                'seek': segment.seek,
                                'start': segment.start,
                                'end': segment.end,
                                'text': segment.text,
                                'tokens': segment.tokens,
                                'temperature': segment.temperature,
                                'avg_logprob': segment.avg_logprob,
                                'compression_ratio': segment.compression_ratio,
                                'no_speech_prob': segment.no_speech_prob,
                            })

                    with open(json_file, 'w', encoding='utf-8') as f:
                        f.write(json.dumps(tojson))
                    print('wrote to {} and {}'.format(txt_file, json_file))
process_files()
print('generated TXTs for all MP3s in ' + source_dir)


In [None]:
chat_prompt = ChatPromptTemplate.from_messages([
    ("system", """You are a helpful assistant that identifies the date a file was created based on information in the filename.
    All dates are between 2019 and 2030, and any dates in the filename are written with the month before the date (ie. american style).
    The resulting date should be formatted as YYYY-mm-dd - ie. 2021-02-15.
    Your response should _only_ contain the date, and nothing else.
    """),
    ("human", "{text}"),
])
chain = chat_prompt | chat_model

# find all txt files in source_dir (and subdirectories) that do not yet have a date file and ask the LLM to guess the date
def process_files():
    start = os.path.join(source_dir, text_dir)
    for root, dirs, files in os.walk(start):
        relpath = os.path.relpath(root, start)
        for file in files:
            if file.endswith(".txt"):
                txt_file = os.path.join(source_dir, text_dir, relpath, file)
                date_file = os.path.join(source_dir, date_dir, relpath, file[:-4] + '.txt')
                if not os.path.isfile(date_file):
                    print('getting date for {} to {}'.format(txt_file, date_file))
                    result = chain.invoke({ "text": txt_file})
                    date = result.content
                    # use a regular expression to make sure date looks like YYYY-mm-dd
                    if not re.match(r'^\d{4}-\d{2}-\d{2}$', date):
                        print('invalid date: {} generated for {}'.format(date, txt_file))
                        continue
                    os.makedirs(os.path.dirname(date_file), exist_ok=True)
                    with open(date_file, 'w') as f:
                        f.write(date)
process_files()
print('generated DATEs for all TXTs in ' + source_dir)


In [None]:
# ok, now for the real work.
#  1. given a question and answer-extension, find all files with a txt+date file, but no answer-extension file
#  2. ask the LLM to answer the question based on the file
#  3. write the answer to the answer-extension file

with open(os.path.join(source_dir, "question.input"), "r") as f:
    question = f.read().strip()
with open(os.path.join(source_dir, "answer-dir.input"), "r") as f:
    answer_dir = f.read().strip()

# # heavily based on https://github.com/langchain-ai/langchain/blob/master/libs/langchain/langchain/chains/combine_documents/map_reduce.py
# document_prompt = PromptTemplate(
#     input_variables=["page_content"],
#     template="{page_content}"
# )
# document_variable_name = "context"
# # The prompt here should take as an input variable the
# # `document_variable_name`
# prompt = PromptTemplate.from_template(
#     """You are a helpful assistant that summarizes existing content with the goal of answering these questions:
#     {}
#     If the content supplied isn't relevant to the question, you will say so.
#     """.format(question) + "Summarize this content: {context}"
# )
# llm_chain = LLMChain(llm=chat_model, prompt=prompt)
# # chat_chain = chat_prompt | chat_model
# # chat_chain = ChatChain()
# # We now define how to combine these summaries
# reduce_prompt = PromptTemplate.from_template(
#     "Combine these summaries: {context}"
# )
# reduce_llm_chain = LLMChain(llm=chat_model, prompt=reduce_prompt)
# combine_documents_chain = StuffDocumentsChain(
#     llm_chain=reduce_llm_chain,
#     document_prompt=document_prompt,
#     document_variable_name=document_variable_name
# )
# reduce_documents_chain = ReduceDocumentsChain(
#     combine_documents_chain=combine_documents_chain,
# )
# chain = MapReduceDocumentsChain(
#     llm_chain=llm_chain,
#     reduce_documents_chain=reduce_documents_chain,
# )

# based on https://python.langchain.com/docs/use_cases/question_answering/analyze_document
qa_chain = load_qa_chain(chat_model, chain_type="map_reduce")
qa_document_chain = AnalyzeDocumentChain(combine_docs_chain=qa_chain)

# find all txt files in source_dir (and subdirectories) that have a text file and a date file, but no answer-dir file
def process_files():
    start = os.path.join(source_dir, text_dir)
    for root, dirs, files in os.walk(start):
        relpath = os.path.relpath(root, start)
        for file in files:
            if file.endswith(".txt"):
                text_file = os.path.join(source_dir, text_dir, relpath, file)
                date_file = os.path.join(source_dir, date_dir, relpath, file)
                answer_file = os.path.join(source_dir, answer_dir, relpath, file)
                if os.path.isfile(text_file) and os.path.isfile(date_file) and not os.path.isfile(answer_file):
                    with open(text_file, "r", encoding='utf-8') as f:
                        content = f.read().strip()
                    with open(date_file, "r") as f:
                        date = f.read().strip()
                    print('getting answer for {}: {}'.format(date, text_file))
                    answer = qa_document_chain.run(input_document=content, question=question)
                    # result = chain.invoke({ "input_documents": [Document(page_content=content)] })
                    # answer = result['output_text']
                    print('writing answer to {}'.format(answer_file))
                    os.makedirs(os.path.dirname(answer_file), exist_ok=True)
                    with open(answer_file, 'w', encoding='utf-8') as f:
                        f.write(answer)
process_files()
print('generated {} for all TXTs in {}'.format(answer_dir, os.path.join(source_dir, text_dir)))


In [None]:
# now find and summarize the results
with open(os.path.join(source_dir, "answer-dir.input"), "r") as f:
    answer_dir = f.read().strip()

results = []

# find all txt files in source_dir (and subdirectories) that have an answer and a date file
def process_files():
    start = os.path.join(source_dir, answer_dir)
    for root, dirs, files in os.walk(start):
        relpath = os.path.relpath(root, start)
        for file in files:
            if file.endswith(".txt"):
                if file == 'summary.txt' and relpath == '':
                    continue
                answer_file = os.path.join(source_dir, answer_dir, relpath, file)
                date_file = os.path.join(source_dir, date_dir, relpath, file)
                text_file = os.path.join(source_dir, text_dir, relpath, file)
                if os.path.isfile(date_file) and os.path.isfile(answer_file):
                    with open(date_file, "r") as f:
                        date = f.read().strip()
                    with open(answer_file, "r", encoding='utf-8') as f:
                        answer = f.read().strip()
                    results.append(('{} - {}'.format(date, os.path.join(relpath, file)), answer))
process_files()

# sort results by date and output
results.sort(key=lambda x: x[0])
outfile = os.path.join(source_dir, answer_dir, 'summary.txt')
with open(outfile, 'w') as f:
    for result in results:
        f.write('{}\n{}\n\n'.format(result[0], result[1]))
print('wrote {} results to {}'.format(len(results), outfile))
