[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/aurelio-labs/semantic-chunkers/blob/main/docs/01-video-chunking.ipynb) [![Open nbviewer](https://raw.githubusercontent.com/pinecone-io/examples/master/assets/nbviewer-shield.svg)](https://nbviewer.org/github/aurelio-labs/semantic-chunkers/blob/main/docs/01-video-chunking.ipynb)

# Semantic search over chunked learning videos for ScormAI
**Main idea:** Videos are a sequence of frames with a temporal component, so we try to identify the context between each scene or batch of frames.

This concept is based on this source video: https://youtu.be/hsH9q_N02Gw?si=bQtS__SxPG3T2nXo

**DISCLAIMER:** THIS IS JUST A PROOF OF CONCEPT! NO HATE, NO SELFISH FEEDBACK, NO FANCY CODE REVIEWS ACCEPTED!

GOT A PROBLEM? OBVIOUSLY YOU DO...





In [None]:
"""
Copyright (c) 2024 "Imperator" Radim Tvrdon. All rights reserved.

This software and associated documentation files (the "Software") are the exclusive property of Mughla Chesky.
Unauthorized copying, modification, distribution, or sale of the Software, in whole or in part, is strictly prohibited
without the prior written permission of Imperator

The Software is provided "AS IS", without warranty of any kind, express or implied, including but not limited to the warranties
of merchantability, fitness for a particular purpose, or noninfringement. In no event shall the author or copyright holder
be liable for any claim, damages, or other liability, whether in an action of contract, tort, or otherwise, arising from,
out of, or in connection with the Software or the use or other dealings in the Software.

For permission requests, please contact: radim@resync.cz
"""


Install the dependencies. In this case we will be using the semantic chunkers and mainly the semantic-router lib and the OpenCV lib.

In [None]:
!pip install -qU \
    "semantic-chunkers[stats]" \
    "semantic-router[vision]==0.0.39" \
    opencv-python

Init the cv2.vidcap library and load the source video for further processing. No worries about the source URL - talentwave.cz is the only accessible cloud instance where I can host any stuff.

In [None]:
import cv2

vidcap = cv2.VideoCapture ("https://talentwave.cz/vids/pu_lesson.mp4")

frames = []
success, image = vidcap.read()
while success:
    frames.append(image)
    success, image = vidcap.read()
len(frames)

1221

Let's load the frames, yo folks!

In [None]:
from PIL import Image

image_frames = list(map(Image.fromarray, frames))
len(image_frames)

1221

Now that we have the frames loaded, we can go ahead and use the `Chunker` functionality to create splits based on frame similarity

First, lets initialise our ViT Encoder

In [None]:
import torch
from semantic_router.encoders import VitEncoder
from semantic_router.splitters.consecutive_sim import ConsecutiveSimSplitter

device = (
    "mps"
    if torch.backends.mps.is_available()
    else "cuda" if torch.cuda.is_available() else "cpu"
)
print(f"Using '{device}'")

encoder = VitEncoder(device=device)

splitter = ConsecutiveSimSplitter(encoder=encoder, score_threshold=0.5)
splits = splitter(docs=image_frames)

Using 'cuda'


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Now lets initialise our bad boy called Splitter.

> Note: currently, we can only use `semantic_chunkers.chunkers.ConsecutiveChunker` for image content

In [None]:
import base64
import io

b64_img_messages = []

for split in splits:
    # Get the middle frame from each split
    middle_frame = split.docs[len(split.docs) // 2]

    # Get image bytes
    frame_bytes = io.BytesIO()
    middle_frame.save(frame_bytes, format="JPEG")

    # Base64-encode the image bytes
    b64_img = base64.b64encode(frame_bytes.getvalue()).decode("utf-8")
    b64_img_messages.append(
        {
            "type": "image_url",
            "image_url": {
                "url": f"data:image/jpeg;base64,{b64_img}"
            }
        }
    )

In [None]:
!pip install openai

In [None]:
import os
os.environ['OPENAI_API_KEY'] = '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'
print(os.getenv('OPENAI_API_KEY'))

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


In [None]:
from openai import OpenAI

client = OpenAI(
  organization='org-3jyWJoqwwQE9aOIIjR5wEfZk',
  project='$PROJECT_ID',
)

In [None]:
import openai
client = openai.Client()

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
      {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": "The following series of images are sampled frames from a video, in chronological order. What's happening in the video?"
            },
            *b64_img_messages,
        ]
    }],
    stream=False,
)
print(response.choices[0].message.content)

NameError: name 'b64_img_messages' is not defined

---

In [None]:
!pip install git+https://github.com/openai/whisper.git
!pip install moviepy
!pip install ffmpeg-python
!pip install requests

In [None]:
import requests
import os

video_url = "https://talentwave.cz/vids/pu_lesson.mp4"

video_filename = os.path.basename(video_url)

response = requests.get(video_url, stream=True)

with open(video_filename, 'wb') as video_file:
    for chunk in response.iter_content(chunk_size=1024):
        if chunk:
            video_file.write(chunk)

print(f"Video bylo staženo a uloženo jako {video_filename}")

Video bylo staženo a uloženo jako pu_lesson.mp4


In [None]:
from moviepy.editor import *

video_filename = video_filename
video = VideoFileClip(video_filename)
audio = video.audio
audio_filename = os.path.splitext(video_filename)[0] + "_ext-audio.mp3"
audio.write_audiofile(audio_filename)

print(f"Audio bylo uloženo jako {audio_filename}")

NameError: name 'video_filename' is not defined

In [None]:
import soundfile as sf
import librosa

In [None]:
def find_audio_files(path, extension=".mp3"):
    audio_files = []
    for root, dirs, files in os.walk(path):
        for f in files:
            if f.endswith(extension):
                audio_files.append(os.path.join(root, f))

    return audio_files

In [None]:
def chunk_audio(filename, segment_length: int, output_dir):
    """segment lenght is in seconds"""

    print(f"Chunking audio to {segment_length} second segments...")

    if not os.path.isdir(output_dir):
        os.mkdir(output_dir)

    audio, sr = librosa.load(filename, sr=44100)

    duration = librosa.get_duration(y=audio, sr=sr)
    num_segments = int(duration / segment_length) + 1

    print(f"Chunking {num_segments} chunks...")

    for i in range(num_segments):
        start = i * segment_length * sr
        end = (i + 1) * segment_length * sr
        segment = audio[start:end]
        sf.write(os.path.join(output_dir, f"segment_{i}.mp3"), segment, sr)

    chunked_audio_files = find_audio_files(output_dir)
    return sorted(chunked_audio_files)

In [None]:
def transcribe_audio(audio_files: list, output_file=None, model="whisper-1") -> list:

    print("converting audio to text...")

    transcripts = []
    for audio_file in audio_files:
        audio = open(audio_file, "rb")
        response = openai.Audio.transcribe(model, audio)
        transcripts.append(response["text"])

    if output_file is not None:
        # save all transcripts to a .txt file
        with open(output_file, "w") as file:
            for transcript in transcripts:
                file.write(transcript + "\n")

    return transcripts

In [None]:
import whisper

model = whisper.load_model("small")

option = whisper.DecodingOptions(language='cs', fp16=False)
result = model.transcribe('pu_lesson.mp4', **option.__dict__)
print(result["text"])

ModuleNotFoundError: No module named 'whisper'

In [None]:
import openai

def prepis_textu(client, input_text):
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": input_text
                    },
                ]
            }
        ],
        stream=False,
    )

    result = response.choices[0].message.content
    print(result)

# Použití funkce
client = openai.Client()
text_to_rewrite = "In the following text, letters are missing within words. At the same time, the text is difficult to read for the average user. Rewrite this text so that it is fluent and easy to read."
prepis_textu(client, text_to_rewrite)

Absolutely, I'd be happy to help! Please provide the text you would like me to rewrite.
