In [1]:
import sys
from googleapiclient.discovery import build
import re
from isodate import parse_duration
import subprocess
import random
import re
import os
from moviepy.editor import VideoFileClip, AudioFileClip
from spleeter.separator import Separator
import shutil 
import cv2
import torch
from transformers import  AutoProcessor, AutoModelForVision2Seq, AutoTokenizer, AutoModelForCausalLM, TextStreamer, GenerationConfig
import csv
import uuid
import pandas as pd
from PIL import Image
import numpy as np
import torchvision.transforms as T
ckpt = "microsoft/kosmos-2-patch14-224"
model = AutoModelForVision2Seq.from_pretrained(ckpt).to("cuda")
processor = AutoProcessor.from_pretrained(ckpt)


sys.path.append(r'D:\video_extraction\inaSpeechSegmenter')
from inaSpeechSegmenter import Segmenter
from inaSpeechSegmenter.export_funcs import seg2csv, seg2textgrid

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Youtube link scraper

In [1]:
#api_key = 'insert api key here'
api_key = ''
hg_key = ''

In [3]:
def fetch_videos(api_key, search_term, limit):
    # Initialize the YouTube API client
    youtube = build('youtube', 'v3', developerKey=api_key)

    video_urls = []
    page_token = None
    while len(video_urls) < limit:
        # Adjust the search limit based on remaining needed videos
        search_limit = min(limit - len(video_urls), 50)  # API max is 50 for a single request

        # Search for videos matching the term with pagination
        search_response = youtube.search().list(
            q=search_term,
            part='id,snippet',
            maxResults=search_limit,
            type='video',
            #videoDuration='medium',  # Filters videos approximately between 4-20 minutes
            pageToken=page_token
        ).execute()

        video_ids = [item['id']['videoId'] for item in search_response['items']]

        if not video_ids:
            break  # Exit if no more videos are found

        # Fetch details for each video to filter by precise duration
        videos_response = youtube.videos().list(
            part='contentDetails',
            id=','.join(video_ids)
        ).execute()

        for item in videos_response['items']:
            duration = parse_duration(item['contentDetails']['duration']).total_seconds()
            if 240 <= duration <= 3600:  # 4 minutes to 30 minutes in seconds
                video_urls.append(f"https://www.youtube.com/watch?v={item['id']}")
                if len(video_urls) >= limit:
                    break

        page_token = search_response.get('nextPageToken')
        if not page_token:
            break  # Exit the loop if there are no more pages to fetch

    return video_urls

Video 30s clip downloader

In [4]:
def sanitize_filename(title):
    # Replace spaces and special characters with underscores
    # This is a basic example; you might need to extend it to cover more cases
    return re.sub(r'[^\w\-_\. ]', '_', title)

In [5]:
def download_video_and_extract_random_clips(url, output_dir, number_of_clips=10, clip_length=30):
    try:
        # Ensure the output directory exists
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        # Define preferences
        format_preference = "22"  # yt-dlp resolution

        # Fetch the video title for naming
        command_get_title = ["yt-dlp", "--get-title", url]
        result_title = subprocess.run(command_get_title, capture_output=True, text=True, check=True, encoding='utf-8')

        title = result_title.stdout.strip()
        safe_title = sanitize_filename(title)
        output_template = os.path.join(output_dir, f"{safe_title}.%(ext)s")
        
        # Download the video
        print("Downloading video...")
        command_download = ["yt-dlp", "-f", format_preference, "-o", output_template, url]
        subprocess.run(command_download, capture_output=True, text=True, check=True)

        downloaded_filename = os.path.normpath(output_template.replace('%(ext)s', 'mp4'))

        print(f"Downloaded filename: {downloaded_filename}")

        if not os.path.exists(downloaded_filename):
            raise FileNotFoundError(f"Expected downloaded file not found: {downloaded_filename}")

        # Get video duration
        command_duration = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", downloaded_filename]
        result_duration = subprocess.run(command_duration, capture_output=True, text=True)
        duration_seconds = float(result_duration.stdout.strip())

        # Generate random start times
        clip_paths = []
        for i in range(number_of_clips):
            start_time = random.randint(0, max(int(duration_seconds - clip_length), 0))
            clip_filename = os.path.join(output_dir, f"{safe_title}_clip_{i}.mp4")

            # Extract a 30s clip from the video
            command_extract = ["ffmpeg", "-ss", str(start_time), "-t", str(clip_length), "-i", downloaded_filename, "-c:v", "libx264", "-c:a", "aac", clip_filename]
            subprocess.run(command_extract, check=True, capture_output=True, text=True)
            
            clip_paths.append(clip_filename)

        # Optionally, delete the original video file
        os.remove(downloaded_filename)

        # Return the list of clip filenames
        return clip_paths
    
    except subprocess.CalledProcessError as e:
        print(f"An error occurred during subprocess execution: {e.stderr}")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None

music extract shiz

In [6]:
def process_video_and_get_path(video_path, output_directory):
    """
    Extracts music from a video file by separating it from vocals, replaces the original audio
    of the video with the extracted music, and saves the edited version alongside the original.
    Returns the path to the edited video and cleans up all temporary files and directories created during the process.

    Parameters:
    - video_path: The path to the video file.
    - output_directory: The directory where the separated audio files will be temporarily stored.

    Returns:
    - Path to the edited video.
    """
    # Ensure output_directory exists
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # Step 1: Extract and separate audio
    audio_output_path = os.path.join(output_directory, 'extracted_audio.wav')
    video = VideoFileClip(video_path)
    video.audio.write_audiofile(audio_output_path)
    video.close()  # Close the clip to release the file handle

    separator = Separator('spleeter:2stems')
    separator.separate_to_file(audio_output_path, output_directory)
    accompaniment_file_path = os.path.join(output_directory, 'extracted_audio', 'accompaniment.wav')

    # Step 2: Replace audio in the video
    video_clip = VideoFileClip(video_path)
    accompaniment_audio_clip = AudioFileClip(accompaniment_file_path)
    video_with_new_audio = video_clip.set_audio(accompaniment_audio_clip)
    
    edited_video_path = video_path.rsplit(".", 1)[0] + "_edited." + video_path.split(".")[-1]
    video_with_new_audio.write_videofile(edited_video_path, codec='libx264', audio_codec='aac')
    video_clip.close()
    accompaniment_audio_clip.close()
    video_with_new_audio.close()

    # Step 4: Clean up temporary files and directories
    os.remove(audio_output_path)
    shutil.rmtree(os.path.join(output_directory, 'extracted_audio'), ignore_errors=True)
    os.remove(video_path)
    return edited_video_path

detect if 80% is audio

In [7]:
def check_and_delete_clip_if_not_enough_music(media):
    # Assuming the Segmenter and seg2csv setup is correct and in place

    # Placeholder for your existing code that generates the CSV
    # Simulating the CSV creation to fit the function's structure
    seg = Segmenter()
    segmentation = seg(media)
    seg2csv(segmentation, 'myseg.csv')
    
    try:
        # Assuming the first row could be headers that are misinterpreted
        segmentation_df = pd.read_csv('myseg.csv', sep='\t', names=['labels', 'start', 'stop'], skiprows=1)
    except ValueError:
        print("Error reading the segmentation CSV. Please check the format.")
        return None

    try:
        segmentation_df['start'] = segmentation_df['start'].astype(float)
        segmentation_df['stop'] = segmentation_df['stop'].astype(float)
    except ValueError as e:
        print(f"Error converting start/stop times to float: {e}")
        return None

    total_music_duration = segmentation_df[segmentation_df['labels'] == 'music']['stop'].sum() - \
                           segmentation_df[segmentation_df['labels'] == 'music']['start'].sum()

    if total_music_duration < 25:
        try:
            os.remove(media)
            print(f"Deleted {media} due to insufficient music duration.")
            return None
        except OSError as e:
            print(f"Error deleting file {media}: {e}")
            return None
    else:
        print(f"{media} contains enough music. It will not be deleted.")
        return media

In [8]:
def capture_screenshots(video_path):
    """
    Captures screenshots from a video at specified times and returns a dictionary with custom keys for each path.

    Parameters:
    - video_path: Path to the video file.
    - times: List of times in seconds at which to capture the screenshots.

    Returns:
    - A dictionary with keys like 'path1', 'path2', etc., pointing to the file paths of the captured screenshots.
    """
    # List of times in seconds at which to capture the screenshots.
    times = [5, 10, 15, 20, 25]
    
    # Initialize a dictionary to hold the paths of the screenshots with custom keys.
    screenshots_paths = {}

    # Load the video.
    video = cv2.VideoCapture(video_path)

    # Check if video opened successfully.
    if not video.isOpened():
        print("Error: Could not open video.")
        return screenshots_paths

    # Get video FPS (frames per second) to calculate the frame number.
    fps = video.get(cv2.CAP_PROP_FPS)

    # The directory where screenshots will be saved.
    save_directory = "D:\\video_extraction\\video\\720p"

    # Iterate over the specified times, using enumerate to get both index and time.
    for index, time in enumerate(times, start=1):
        # Calculate the frame number.
        frame_number = int(time * fps)

        # Set video position to the specific frame.
        video.set(cv2.CAP_PROP_POS_FRAMES, frame_number)

        # Read the frame.
        success, frame = video.read()

        # Check if the frame was grabbed successfully.
        if success:
            # Define the file path for the screenshot, including the save directory.
            file_path = f"{save_directory}\\screenshot_{time}s.jpg"

            # Save the frame as an image file.
            cv2.imwrite(file_path, frame)


            # Use a custom key for each path.
            key = f"path{index}"

            # Add the key and file path to the dictionary.
            screenshots_paths[key] = file_path
        else:
            print(f"Error: Could not capture screenshot at {time}s")

    # Release the video capture object.
    video.release()

    # Return the dictionary of screenshot paths.
    return screenshots_paths


Image captioning

In [9]:
def generate_predictions(image_path, text_input="Detailed"):
    image_input = Image.open(image_path).convert("RGB")
    
    if text_input.lower() == "brief":
        text_input = "<grounding>An image of"
    elif text_input.lower() == "detailed":
        text_input = "<grounding>Describe this image in detail:"
    else:
        text_input = f"<grounding>{text_input}"

    inputs = processor(text=text_input, images=image_input, return_tensors="pt").to("cuda")

    generated_ids = model.generate(
        pixel_values=inputs["pixel_values"],
        input_ids=inputs["input_ids"],
        attention_mask=inputs["attention_mask"],
        image_embeds=None,
        image_embeds_position_mask=inputs["image_embeds_position_mask"],
        use_cache=True,
        max_new_tokens=128,
    )

    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]

    processed_text = processor.post_process_generation(generated_text)

    actual_description = processed_text[0].replace("Describe this image in detail: ", "")

    return actual_description



llama

In [10]:
class PromptTemplate:
    system_prompt = None
    user_messages = []
    model_replies = []

    def __init__(self, system_prompt=None):
        self.system_prompt = system_prompt

    def add_user_message(self, message: str, return_prompt=True):
        self.user_messages.append(message)
        if return_prompt:
            return self.build_prompt()

    def add_model_reply(self, reply: str, includes_history=True, return_reply=True):
        reply_ = reply.replace(self.build_prompt(), "") if includes_history else reply
        self.model_replies.append(reply_)
        if len(self.user_messages) != len(self.model_replies):
            raise ValueError(
                "Number of user messages does not equal number of system replies."
            )
        if return_reply:
            return reply_

    def get_user_messages(self, strip=True):
        return [x.strip() for x in self.user_messages] if strip else self.user_messages

    def get_model_replies(self, strip=True):
        return [x.strip() for x in self.model_replies] if strip else self.model_replies

    def clear_chat_history(self):
        self.user_messages.clear()
        self.model_replies.clear()

    def build_prompt(self):
        if self.user_messages == [] and self.model_replies == []:
            return f"<s>[INST] <<SYS>>\n{self.system_prompt}\n<</SYS>> [/INST]</s>"
        
        elif len(self.user_messages) != len(self.model_replies) + 1:
            raise ValueError(
                "Error: Expected len(user_messages) = len(model_replies) + 1. Add a new user message!"
            )

        if self.system_prompt is not None:
            SYS = f"[INST] <<SYS>>\n{self.system_prompt}\n<</SYS>>"
        else:
            SYS = ""

        CONVO = ""
        SYS = "<s>" + SYS
        for i in range(len(self.user_messages) - 1):
            user_message, model_reply = self.user_messages[i], self.model_replies[i]
            conversation_ = f"{user_message} [/INST] {model_reply} </s>"
            if i != 0:
                conversation_ = "[INST] " + conversation_
            CONVO += conversation_

        CONVO += f"[INST] {self.user_messages[-1]} [/INST]"

        return SYS + CONVO

In [11]:
device = "cuda:0" if torch.cuda.is_available() else "cpu" 
llama_model = AutoModelForCausalLM.from_pretrained('meta-llama/Llama-2-7b-chat-hf', token=hg_key, torch_dtype=torch.bfloat16, device_map="auto")
llama_tokenizer = AutoTokenizer.from_pretrained('meta-llama/Llama-2-7b-chat-hf', token=hg_key)
print(f"Model running on {device}")

Loading checkpoint shards: 100%|██████████| 2/2 [00:10<00:00,  5.46s/it]


Model running on cuda:0


In [12]:
def generate_response_with_llama(context_prompt, llama_tokenizer, llama_model):
    # Initialize the prompt generator with the given context
    prompt = PromptTemplate(context_prompt)
    true_prompt = prompt.build_prompt()

    config = GenerationConfig(
        max_new_tokens=1024,
        do_sample= True,
        top_k= 10,
        num_return_sequences= 1,
        return_full_text= False,
        temperature= 0.1,
    )

    text_stream = TextStreamer(llama_tokenizer, skip_prompt=True)
    encoded_input = llama_tokenizer.encode(true_prompt, return_tensors='pt', add_special_tokens=False).to(device)
    results = llama_model.generate(encoded_input, generation_config=config, streamer=text_stream)
    decoded_output = llama_tokenizer.decode(results[0], skip_special_tokens=True)
    response = decoded_output.split("[/INST]")[-1].strip()
    return response

In [13]:
def save_to_csv(base_path, file_name, path, caption):
    """
    Appends a single video path and caption to a CSV file.
    
    :param base_path: The directory where the CSV file will be saved.
    :param file_name: The name of the CSV file.
    :param path: The video path to append.
    :param caption: The caption to append.
    """
    # Ensure the base directory exists
    os.makedirs(base_path, exist_ok=True)
    
    # Construct the full file path for the CSV
    csv_file_path = os.path.join(base_path, file_name)

    # Open the CSV file for appending. Use 'a' mode.
    with open(csv_file_path, mode='a', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        
        # Check if the file is empty to decide on writing headers
        file.seek(0, os.SEEK_END)
        if file.tell() == 0:
            writer.writerow(['Video Path', 'Caption'])
        
        # Append the video path and caption
        writer.writerow([path, caption])

COMBINED

In [None]:
def scraping(api_key):
    video_urls = fetch_videos(api_key,"no commentary walkthrough", 50)
    for url in video_urls:
        clip_file_path = download_video_and_extract_random_clips(url, output_dir="D:\\video_extraction\\video\\480p")
        for clip in clip_file_path:
            file_path = check_and_delete_clip_if_not_enough_music(clip)
            if file_path is not None:
                screenshot_paths = capture_screenshots(file_path)
                captions = []
                for screenshot in screenshot_paths.values():
                    captions.append(generate_predictions(screenshot))
                    os.remove(screenshot)
                context_prompt=f"""=============context================
        {captions[0]},{captions[1]},{captions[2]},{captions[3]},{captions[4]}
        =========================================
        Take the 5 context and make a sentence no more than 20 words to summarise the context. Just give me the sentence. Don't give me anything else."""
                generated_text = generate_response_with_llama(context_prompt, llama_tokenizer, llama_model) 
                save_to_csv('D:\\video_extraction\\video\\360p', 'videos_and_captions.csv', file_path, generated_text[2:])

                
            else:
                continue



In [None]:
#scraping(api_key)

ultimatevocalremovergui

In [21]:
def scraping(clip_file_path):
    for clip in clip_file_path:
        file_path = clip_file_path
        if file_path is not None:
            screenshot_paths = capture_screenshots(file_path)
            captions = []
            for screenshot in screenshot_paths.values():
                captions.append(generate_predictions(screenshot))
                os.remove(screenshot)

            print(f'caption 1{captions[0]}')
            print('--------------------------------------------------')
            
            print(f'caption 2{captions[1]}')
            print('--------------------------------------------------')
            
            print(f'caption 3{captions[2]}')
            print('--------------------------------------------------')
            
            print(f'caption 4{captions[3]}')
            print('--------------------------------------------------')
            
            print(f'caption 5{captions[4]}')
            print('--------------------------------------------------')
            context_prompt1=f"""context={captions[0]},{captions[1]},{captions[2]},{captions[3]},{captions[4]}. 
            imagine you are writing a description of a video. You are capturing the emotions caused by the environment and atmosphere. You should write a 30 word paragraph that takes the 5 context and summarise all of them into that one paragraph. Talk about the environment, the vibes and the emotions. Immediately starts describing and do not mention the source. I will give you example prompts, follow their formatting but not the content. Give me just the paragraph and nothing else. 
            example prompts:
            A small rural village. The atmosphere is peaceful, with citizens doing their daily tasks. However, there is an underlying tension in the air, like something is about to go down. The overall emotion of the video is one of suspense and intrigue.
            """
            print(file_path)
            print('--------------------------------------------------')
            generated_text1 = generate_response_with_llama(context_prompt1, llama_tokenizer, llama_model) 
            context_prompt2=f"""context={captions[0]},{captions[1]},{captions[2]},{captions[3]},{captions[4]}. 
            imagine you are writing a description of a scene. You are capturing the emotions of the scene caused by the environment and atmosphere. You should write a 30 word paragraph that is going to be used in a website. Take the 5 context and summarise all of them into that one paragraph. I will give you example prompts, follow their formatting but not the content. Give me just the paragraph and nothing else. 
            example prompts:
            A small rural village. The atmosphere is peaceful, with citizens doing their daily tasks. However, there is an underlying tension in the air, like something is about to go down. The overall emotion of the video is one of suspense and intrigue.
            """
            print('--------------------------------------------------')
            generated_text2 = generate_response_with_llama(context_prompt2, llama_tokenizer, llama_model) 
            context_prompt3=f"""context={captions[0]},{captions[1]},{captions[2]},{captions[3]},{captions[4]}. 
            imagine you are writing a description of a video. You are capturing the emotions caused by the environment and atmosphere. You should write a 30 word paragraph that takes the 5 context and summarise all of them into that one paragraph. Talk about the environment, followed by the atmosphere and the emotions. Immediately starts describing and do not mention the source. I will give you example prompts, follow their formatting but not the content. Give me just the paragraph and nothing else. 
            example prompts:
            A small rural village. The atmosphere is peaceful, with citizens doing their daily tasks. However, there is an underlying tension in the air, like something is about to go down. The overall emotion of the video is one of suspense and intrigue.
            """
            print('--------------------------------------------------')
            generated_text3 = generate_response_with_llama(context_prompt3, llama_tokenizer, llama_model)
            
        else:
            continue

In [22]:
scraping(r"D:\video_extraction\video\360p\assassins creed iv blackflag gameplay walkthrough part 1 no commentary pc games_clip.mp4")

caption 1In the image, a man is walking through a village, with a palm tree and a building in the background. He is wearing a black shirt and a backpack. There are several other people in the scene, some of whom are carrying backpacks. Some of the people are closer to the foreground, while others are further away. The village appears to be a mix of houses and palm trees.
--------------------------------------------------
caption 2In the image, a man is riding a horse through a village, while several people are walking around. They are all carrying various items, including a handbag and a backpack. There are also two flags visible in the scene, one on the left side and the other on the right side.
--------------------------------------------------
caption 3In the image, two men are standing in front of a building, engaged in a conversation. They are dressed in historical clothing, and one of them is holding a shield. In the background, there is a flag flying, and a few other people can 

In [20]:
caption = 'ice and water'
text_prompt = 'a chaotic cooking game where you and your friends work together to prepare and serve meals in wacky kitchens under pressure.'
print(caption)
prompt = f"""=============context================
{caption},
{text_prompt},
=========================================

Take the following 2 context and merge them to create a textual prompt for music generation. Your prompt should be a single line. Do not give prompts that suggest increasing intensity.
The prompt should contain the atmosphere of the song, where the song would fit environment wise and chord progression you have come up with. I have given you some example prompts, format your prompt similarly to them but do not copy their content.
Example prompts: This is a live performance of a classical music piece. There is an orchestra performing the piece with a violin lead playing the main melody. The atmosphere is sentimental and heart-touching. This piece could be playing in the background at a classy restaurant.
The song is an instrumental. The song is in medium tempo with a classical guitar playing a lilting melody in accompaniment style. The song is emotional and romantic. The song is a romantic instrumental song.
This is a new age piece. There is a flute playing the main melody with a lot of staccato notes. The rhythmic background consists of a medium tempo electronic drum beat with percussive elements all over the spectrum. There is a playful atmosphere to the piece.

"""
generated_text1 = generate_response_with_llama(prompt, llama_tokenizer, llama_model) 

ice and water
t Here is a merged prompt for the two contexts you provided:

"This is a chaotic cooking game where you and your friends work together to prepare and serve meals in wacky kitchens under pressure. The atmosphere is fast-paced and energetic, with a playful and unpredictable vibe. The chord progression is upbeat and lively, with a mix of major and minor chords to create a sense of tension and release. Imagine a funky, jazz-inspired melody on a solo instrument, perhaps a saxophone or trumpet, over the top of the chord progression. The song could be playing in the background of a bustling kitchen, adding to the excitement and energy of the cooking competition."</s>


In [1]:
import subprocess

def download_youtube_video(youtube_url):
    try:
        # Define the command to download the best quality video
        command = ['yt-dlp', youtube_url]
        
        # Execute the command
        subprocess.run(command, check=True)
        print("Download successful.")
    except subprocess.CalledProcessError as e:
        print(f"An error occurred: {e}")

In [18]:
download_youtube_video('https://www.youtube.com/clip/UgkxOk4ZKyt0HKV4GCyqKZJJC1X8HnK20UDW')

Download successful.
