<a href="https://colab.research.google.com/github/nikhilnair31/MIS284N-UD-YT-Shorts-Generator/blob/main/MIS284N_Unstructured_Data_Test_3B_Reddit_YT_Shorts.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Setup

### Install and Import

In [None]:
!pip install praw openai whisper elevenlabs replicate
!pip install opencv-python-headless moviepy
!pip install google-auth google-auth-oauthlib
!pip install --upgrade google-auth

In [None]:
import re
import os
import sys
import json
import time
import numpy as np
import pandas as pd
from datetime import datetime

from google.colab import drive
from IPython.display import Image, display

### Filter Warnings

In [None]:
import warnings

warnings.filterwarnings("ignore", category=UserWarning, message="It appears that you are using PRAW in an asynchronous environment.")
warnings.filterwarnings("ignore", category=DeprecationWarning)

### Load Drive and Env Vars

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Specify the path to the shell script file
script_path = "/content/drive/MyDrive/Colab Notebooks/UD/Others/env_vars.sh"

# Read the shell script file
with open(script_path) as file:
    script_content = file.readlines()

# Extract environment variables and assign them to os.environ
for line in script_content:
    if line.startswith("export "):
        key, value = line[len("export "):].strip().split("=")
        os.environ[key] = value.strip('"')

In [None]:
os.environ['REDDIT_USER_AGENT']

'Test UT'

# Reddit Shorts Generation

## Reddit Scrape

In [None]:
import praw

In [None]:
# Create a Reddit API instance
reddit = praw.Reddit(
    client_id=os.environ['REDDIT_CLIENT_ID'],
    client_secret=os.environ['REDDIT_CLIENT_SECRET'],
    user_agent=os.environ['REDDIT_USER_AGENT']
)

In [None]:
total_media_to_retrieve = 35
results = []
params = {'limit': 7}  # Specify the number of items per request

while len(results) < total_media_to_retrieve:
    remaining_to_retrieve = total_media_to_retrieve - len(results)
    current_limit = min(remaining_to_retrieve, params['limit'])

    batch = list(reddit.subreddit("shortscarystories").top(limit=current_limit, params=params, time_filter="week"))

    if not batch:
        break

    results.extend(batch)

    params = {'limit': params['limit'], 'after': batch[-1].fullname}

It is strongly recommended to use Async PRAW: https://asyncpraw.readthedocs.io.
See https://praw.readthedocs.io/en/latest/getting_started/multiple_instances.html#discord-bots-and-asynchronous-environments for more info.

It is strongly recommended to use Async PRAW: https://asyncpraw.readthedocs.io.
See https://praw.readthedocs.io/en/latest/getting_started/multiple_instances.html#discord-bots-and-asynchronous-environments for more info.

It is strongly recommended to use Async PRAW: https://asyncpraw.readthedocs.io.
See https://praw.readthedocs.io/en/latest/getting_started/multiple_instances.html#discord-bots-and-asynchronous-environments for more info.

It is strongly recommended to use Async PRAW: https://asyncpraw.readthedocs.io.
See https://praw.readthedocs.io/en/latest/getting_started/multiple_instances.html#discord-bots-and-asynchronous-environments for more info.

It is strongly recommended to use Async PRAW: https://asyncpraw.readthedocs.io.
See https://praw.readthedocs.io/en/l

In [None]:
# Create a dictionary to store post data
posts_dict = {}

# Iterate through the top posts
for i, post in enumerate(results):
    posts_dict[i] = {
        "Title": post.title,
        "Content": post.selftext,
        # "URL": post.url,
        "Timestamp": datetime.utcfromtimestamp(post.created_utc).strftime('%Y-%m-%d %H:%M:%S UTC'),
        "Upvotes": post.ups,
        "Total Comments": post.num_comments,
    }

In [None]:
# Create a DataFrame from the dictionary
test_df = pd.DataFrame(posts_dict).T
# Convert "Timestamp" column to datetime
test_df["Timestamp"] = pd.to_datetime(test_df["Timestamp"])
# Check post content length
test_df["Post Length"] = test_df['Content'].apply(len)

test_df.head()

Unnamed: 0,Title,Content,Timestamp,Upvotes,Total Comments,Post Length
0,I deserved the divorce. But no one deserves wh...,"Alimony bleeds me dry every paycheck, but that...",2023-08-25 18:01:30+00:00,1086,62,2562
1,How to cheat Death,*Cheating Death is easy— People do it unconsci...,2023-08-28 03:13:08+00:00,1088,37,2474
2,My girlfriend says I’m becoming the man of her...,The day I bought DreamSync was the first time ...,2023-08-28 15:55:50+00:00,788,36,2685
3,“Lucky” Survivor,I was a “lucky” survivor.\n\nIt was a sunny da...,2023-08-26 03:48:54+00:00,576,29,2691
4,I really couldn’t ask for a better wife.,I have been blessed most of my life. I got a ...,2023-08-28 13:53:52+00:00,510,17,2400


In [None]:
min_test_df = test_df.sort_values(by=['Post Length'], ascending=[True])[:7]
min_test_df

Unnamed: 0,Title,Content,Timestamp,Upvotes,Total Comments,Post Length
21,"I'm never telling anyone about my son's ""quirks""","He's different, but not worse. I love him with...",2023-08-29 15:24:36+00:00,133,6,610
32,I Must put Food on the Table,After the outbreak many jobs that were outlawe...,2023-08-30 00:16:27+00:00,74,2,765
25,License to Kill,"*One...*\n\nA methed up woman, screaming somet...",2023-08-25 19:27:04+00:00,112,9,1177
30,“The machines have a religion now”,"In a distant future, humanity’s insatiable que...",2023-08-25 03:08:31+00:00,79,7,1232
27,Just a little while,Do you ever just wish you could disappear for ...,2023-08-26 14:22:10+00:00,110,14,1255
14,She’ll Only Play Once,"Her face is pale, and she is crying. Good. I l...",2023-08-27 16:10:52+00:00,235,13,1527
24,I Love My Uncle,I love my uncle!!!\n\nMy uncle Abel is the bes...,2023-08-29 19:48:05+00:00,112,8,1627


## Seed Content

In [None]:
def preprocess_text(text):
    # Remove newlines and extra spaces
    text = re.sub(r'\n', ' ', text)
    text = re.sub(r'\s+', ' ', text).strip()

    # Remove asterisks (*) and backslashes (\)
    text = re.sub(r'[\*\\]', '', text)

    # Remove digits at the beginning of lines
    text = re.sub(r'^\d+\s+', '', text, flags=re.MULTILINE)

    return text

pd.set_option('display.max_colwidth', None)
seed_content = str(min_test_df['Content'].iloc[6])
preprocessed_content = preprocess_text(seed_content)
print(f'original text\n{seed_content}\n\nprocessed text\n{preprocessed_content}')

original text
I love my uncle!!!

My uncle Abel is the bestest uncle in the whole wide world! He’s never working like Mommy always is. He always has time for me. We get to play together every day! He’ll push me on the swings and play with my dolls and color with me. 

He tells great stories too! Uncle Abel is always telling me about what he and Daddy used to do when they were little. Like how they used to slide down the stairs. I wish I could do that. But the stairs hadn’t had carpet for a very long time. I didn’t even know they had carpet until Uncle Abel told me! I want to slide down them too but Mommy really really doesn’t like me playing on them. 

Uncle Abel is a great listener too! He always wants to hear about my day. Or how school was. Or about the stray cat that sometimes let me pet him. Or me and Mommy’s visit with Daddy. 

Uncle Abel sometimes asks me if Daddy looks sad. Daddy always look sad. But that’s cause he doesn’t like not being home with us. Mommy says Daddy made a b

## Generating Title and Description

In [None]:
import openai

In [None]:
openai.api_key = os.environ['OPENAI_API_KEY']

In [None]:
def generate_title(prompt, script):
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "system", "content": prompt},
                  {"role": "user", "content": script}],
        temperature=1,
        max_tokens=256,
        top_p=1,
        frequency_penalty=0,
        presence_penalty=0
    )

    return response.choices[0].message['content']

In [None]:
shorts_title = generate_title("Generate a very short title for a YouTube Short based on the script. Make it similar to the examples below. Examples: - Scary things hidden in normal photos Part#28 - This is the scariest video on the internet... - Scary video😳 #shorts", preprocessed_content)
shorts_description = generate_title("Generate a very short description for a YouTube Short", preprocessed_content)

shorts_title = shorts_title.replace("\"","")
shorts_description = shorts_description.replace("\"","")

print(f'Title: {shorts_title}\nDescription: {shorts_description}\n')

Title: Uncle Abel's Surprise for Daddy #shorts
Description: Uncle Abel's Heartwarming Welcome Home Surprise for Daddy



In [None]:
folder_title = shorts_title.replace("\"","").split()[0]
folder_title

'Uncle'

## Image Prompts

In [None]:
def generate_image_prompts(gpt_type, prompt, script):
    response = openai.ChatCompletion.create(
        model=gpt_type,
        messages=[{"role": "system", "content": prompt},
                  {"role": "user", "content": script}],
        temperature=1,
        max_tokens=1024,
        top_p=1,
        frequency_penalty=0,
        presence_penalty=0
    )

    image_prompts_str = response.choices[0].message['content']
    image_prompts_str = re.sub(r'[^\w\s]', '', image_prompts_str)
    image_prompts_str = re.sub(r'[^a-zA-Z\s]', '', image_prompts_str)
    image_prompts = image_prompts_str.lower().split('\n')
    image_prompts = list(map(lambda prompt: f"{prompt.strip()}", image_prompts))

    return image_prompts

In [None]:
gpt_type = "gpt-3.5-turbo"
gpt_prompt = """
  Generate 6 short one liner prompts for DALLE-2/Midjourney/StableDiffusion prompt image generation. The prompt I give will be the script for a video.
  Prompts should be very short but descriptive with no NSFW content.
"""
image_prompts = generate_image_prompts(gpt_type, gpt_prompt, preprocessed_content)
image_prompts

['a heartwarming tale of a special bond between a child and her beloved uncle',
 'unleash your inner child with uncle abel as they embark on daily adventures',
 'discover the magic of family through the eyes of a little girl and her caring uncle',
 'when a childs innocent perspective reveals the true meaning of happiness',
 'experience the joy of reconnecting through a handmade card and a longawaited homecoming',
 'join an extraordinary uncle who goes above and beyond to create priceless memories for his niece']

In [None]:
gpt_type = "gpt-3.5-turbo"
gpt_prompt = """
  Generate 1 short one liner prompts for DALLE-2/Midjourney/StableDiffusion prompt thumbnail generation. The prompt I give will be the script for a video.
  Prompts should be very short but descriptive with no NSFW content.
"""
thumbnail_prompts = generate_image_prompts(gpt_type, gpt_prompt, preprocessed_content)
thumbnail_prompts

['uncle abels love a heartwarming story of family reunion and a special card for daddy']

## Image Generation

In [None]:
import replicate

In [None]:
client = replicate.Client(api_token=os.environ["STABLEDIFFUSION_API_KEY"])

In [None]:
def generate_stablediffusion_image(image_prompt):
    output = client.run(
        "stability-ai/stable-diffusion:ac732df83cea7fff18b8472768c88ad041fa750ff7682a21affe81863cbe77e4",
        input = {
            "prompt": image_prompt,
            "height": 1024,
            "width": 576,
            "num_outputs": 1,
            "num_inference_steps": 50,
            "guidance_scale": 7.5,
            "scheduler": 'DPMSolverMultistep',
            "seed": 42
        }
    )
    return output[0]

In [None]:
thumbnail_image = generate_stablediffusion_image(thumbnail_prompts[0])
thumbnail_image

'https://pbxt.replicate.delivery/HjhFGQucOMJONtIgtnkdRi58bGEA1R8RJYf67ubOe2S8ipfiA/out-0.png'

In [None]:
generated_images = [generate_stablediffusion_image(prompt) for prompt in image_prompts]
generated_images

In [None]:
import urllib.request

# Set the directory path to save the images
directory_path = f"/content/drive/MyDrive/Colab Notebooks/UD/{folder_title}/img"

# Create the directory if it does not exist
if not os.path.exists(directory_path):
    os.makedirs(directory_path)

# Save thumbnail image to the folder
thumbnail_file_path = f"{directory_path}/image_0.jpg"
urllib.request.urlretrieve(thumbnail_image, thumbnail_file_path)
print(f"Image 0 saved successfully at {thumbnail_file_path}.")

# Save video images to the folder
for i, image_url in enumerate(generated_images):
    file_path = f"{directory_path}/image_{i+1}.jpg"
    urllib.request.urlretrieve(image_url, file_path)
    print(f"Image {i+1} saved successfully at {file_path}.")

## Audio Generation

In [None]:
from elevenlabs import set_api_key, generate, play

In [None]:
set_api_key(os.environ["ELEVENLABS_API_KEY"])

In [None]:
# Generate audio using 'elevenlabs' package
audio = generate(
    text=preprocessed_content,
    voice="Nicole",
    model="eleven_monolingual_v1"
)

# Set the directory path to save the images
directory_path = f"/content/drive/MyDrive/Colab Notebooks/UD/{folder_title}/aud"

# Create the directory if it does not exist
if not os.path.exists(directory_path):
    os.makedirs(directory_path)

audio_file_path = os.path.join(directory_path, f"audio_0.wav")

# Save the generated audio to the temporary directory
with open(audio_file_path, 'wb') as f:
    f.write(audio)

## Generating SRT (To Be Fixed)

In [None]:
preprocessed_content

"He's different, but not worse. I love him with my whole heart and I can tell he loves me back. Sure, he has weird food preferences, but who doesn't? I'm still teaching him to talk properly, even though he's 10, because no one would understand what he has to say - not because he's stupid, rather because he's a genius. And he's a very fast runner (I'm a proud mom if you can't tell). It's not that I'm ashamed of his differences - I just don't want him to be attacked by the mindless masses. The world is a cruel place. It's my job as a mom to make it better. He's a good person at heart - just not infected."

In [None]:
for i, (text, start, end) in enumerate(preprocessed_content):
    print(f'{text} - {start} - {end}')

ValueError: ignored

In [None]:
import whisper

def create_srt_from_mp3(srt_path):
    with open(srt_path, 'w') as srt_file:
        for i, (text, start, end) in enumerate(preprocessed_content):
            duration = end - start

            # If the duration is less than or equal to 1 second, consider it as a one-word caption
            if duration.total_seconds() <= 1:
                srt_file.write(f"{i + 1}\n")
                srt_file.write(f"{start.strftime('%H:%M:%S,%f')[:-3]} --> {end.strftime('%H:%M:%S,%f')[:-3]}\n")
                srt_file.write(f"{text}\n\n")

    print(f"SRT file created!")

srt_path = directory_path+'/captions_0.srt'

create_srt_from_mp3(srt_path)

ValueError: ignored

## Compositing

In [None]:
from moviepy.editor import ImageClip, concatenate_videoclips
from moviepy.editor import AudioFileClip

# Paths to images and audio
images_folder = f'/content/drive/MyDrive/Colab Notebooks/UD/{folder_title}/img/'
audio_file_path = f'/content/drive/MyDrive/Colab Notebooks/UD/{folder_title}/aud/audio_0.wav'

# Get image files from the folder. Sort image files by name. Create a list of image paths
image_files = [f for f in os.listdir(images_folder) if f.endswith('.jpg')]
image_files.sort()
image_paths = [os.path.join(images_folder, f) for f in image_files]

# Load audio clip
audio_clip = AudioFileClip(audio_file_path)

# Calculate duration per image
duration_per_image = audio_clip.duration / len(image_paths)

# Create video clips for each image
video_clips = []
for image_path in image_paths:
    img_clip = ImageClip(image_path, duration=duration_per_image).resize((1080, 1920))
    video_clips.append(img_clip)

# Concatenate video clips. Set audio for the concatenated video. Set the FPS for the final video
concatenated_clips = concatenate_videoclips(video_clips, method="compose")
final_clip = concatenated_clips.set_audio(audio_clip)
final_clip.fps = 24

# Set the directory path to save the images. Create the directory if it does not exist
directory_path = f"/content/drive/MyDrive/Colab Notebooks/UD/{folder_title}/output"
if not os.path.exists(directory_path):
    os.makedirs(directory_path)

# Export the video
formatted_timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_video_path = directory_path+f'/output_{formatted_timestamp}_reddit.mp4'
final_clip.write_videofile(output_video_path, codec='libx264', threads=4)

Moviepy - Building video /content/drive/MyDrive/Colab Notebooks/UD/My/output/output_20230830213527_reddit.mp4.
MoviePy - Writing audio in output_20230830213527_redditTEMP_MPY_wvf_snd.mp3




MoviePy - Done.
Moviepy - Writing video /content/drive/MyDrive/Colab Notebooks/UD/My/output/output_20230830213527_reddit.mp4





Moviepy - Done !
Moviepy - video ready /content/drive/MyDrive/Colab Notebooks/UD/My/output/output_20230830213527_reddit.mp4


In [None]:
import cv2
from moviepy.editor import VideoFileClip

def pipeline(frame, word):
    global curr_frame

    if curr_frame < frames_per_word and len(word) > 0:
        text = word[0]
    elif curr_frame >= frames_per_word:
        if len(word) > 1:
            text = word.pop(0)
        else:
            text = word[0]

        curr_frame = 0

    curr_frame += 1

    cv2.resize(frame, (1080, 1920))

    height, width, _ = frame.shape

    (text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_ITALIC, 4, 50)

    text_x = (width - text_width) // 2
    text_y = (height + text_height) // 2

    cv2.putText(frame, text, (text_x, text_y), cv2.FONT_ITALIC, 4, (0, 0, 0), 50, cv2.LINE_AA)
    cv2.putText(frame, text, (text_x, text_y), cv2.FONT_ITALIC, 4, (255, 255, 255), 10, cv2.LINE_AA)

    return frame

curr_frame = 0
words = [word.replace(",", "").replace(".", "") for word in preprocessed_content.upper().split()]
total_frames = int(cv2.VideoCapture(output_video_path).get(cv2.CAP_PROP_FRAME_COUNT))
frames_per_word = int(total_frames / len(words))

input_video_path = output_video_path
new_output_video_path = directory_path+f"/output_{formatted_timestamp}_caption_reddit.mp4"

video = VideoFileClip(input_video_path)
out_video = video.fl_image(lambda frame: pipeline(frame, words))
out_video.write_videofile(new_output_video_path, audio=True, preset='ultrafast')

Moviepy - Building video /content/drive/MyDrive/Colab Notebooks/UD/My/output/output_20230830213527_caption_reddit.mp4.
MoviePy - Writing audio in output_20230830213527_caption_redditTEMP_MPY_wvf_snd.mp3




MoviePy - Done.
Moviepy - Writing video /content/drive/MyDrive/Colab Notebooks/UD/My/output/output_20230830213527_caption_reddit.mp4






Moviepy - Done !
Moviepy - video ready /content/drive/MyDrive/Colab Notebooks/UD/My/output/output_20230830213527_caption_reddit.mp4


## Uploading

In [None]:
import google.auth
import google_auth_oauthlib.flow
import googleapiclient.discovery
import googleapiclient.errors
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials

In [None]:
# Load authorized user info from file
with open('/content/drive/MyDrive/Colab Notebooks/UD/Others/token.json', 'r') as file:
    authorized_user_info = json.load(file)

creds = Credentials.from_authorized_user_info(authorized_user_info, ['https://www.googleapis.com/auth/youtube.upload'])
creds.refresh(Request())
credentials = creds

youtube = googleapiclient.discovery.build('youtube', 'v3', credentials=credentials)

In [None]:
# Define the function to upload the video file
def upload_short(video_title, video_description, video_file_path, thumbnail_path):
    request = youtube.videos().insert(
        part="snippet,status",
        body={
            "snippet": {
                "title": video_title,
                "description": video_description,
                "defaultLanguage": "en",
                "defaultAudioLanguage": "en",
                "channelId": 'UCQIUg_sLpoqGfRiOJ4L2-TA',
                "thumbnails": {
                    "default": {
                        "url": thumbnail_path
                    }
                }
            },
            "status": {
                "privacyStatus": "public",
                "madeForKids": False,
                "selfDeclaredMadeForKids": False,
                "embeddable": True,
                "license": "youtube",
                "publicStatsViewable": True
            },
        },
        media_body=googleapiclient.http.MediaFileUpload(video_file_path),
    )
    response = request.execute()
    return response

In [None]:
# Call the upload function with your desired parameters
response = upload_short(shorts_title, shorts_description, new_output_video_path, thumbnail_file_path)
response

{'kind': 'youtube#video',
 'etag': 'lU2JI60tCchJ7R1XvF_XF9W4Tis',
 'id': 'd3wD5E6rO3Y',
 'snippet': {'publishedAt': '2023-08-30T22:02:49Z',
  'channelId': 'UCQIUg_sLpoqGfRiOJ4L2-TA',
  'title': "My Extraordinary Son: A Mother's Love #shorts",
  'description': 'Meet My Extraordinary Son: A Story of Love, Neurodiversity, and Embracing Differences',
  'thumbnails': {'default': {'url': 'https://i.ytimg.com/vi/d3wD5E6rO3Y/default.jpg',
    'width': 120,
    'height': 90},
   'medium': {'url': 'https://i.ytimg.com/vi/d3wD5E6rO3Y/mqdefault.jpg',
    'width': 320,
    'height': 180},
   'high': {'url': 'https://i.ytimg.com/vi/d3wD5E6rO3Y/hqdefault.jpg',
    'width': 480,
    'height': 360}},
  'channelTitle': 'Eerie Shorts',
  'categoryId': '22',
  'liveBroadcastContent': 'none',
  'defaultLanguage': 'en',
  'localized': {'title': "My Extraordinary Son: A Mother's Love #shorts",
   'description': 'Meet My Extraordinary Son: A Story of Love, Neurodiversity, and Embracing Differences'},
  'defau