# Functions

### Gen Niche List

In [2]:
import pandas as pd
from pytrends.request import TrendReq
# need to modify this to recursivly parse the outputs down the json tree


def gen_niche_list():
    pytrends = TrendReq(hl='en-US')
    json_response = pytrends.categories()
    data = json_response['children']
    prefix = 'metadata_'
    df = pd.json_normalize(
        data, 'children', ['name', 'id'], record_prefix=prefix)
    metadata_cols = [col for col in df.columns if col.startswith(prefix)]
    df = pd.concat(
        [df[['name', 'id']], df[metadata_cols].apply(pd.Series)], axis=1)
    df = df.explode("metadata_children").reset_index(drop=True)
    df = pd.concat([df.drop(["metadata_children"], axis=1),
                   df["metadata_children"].apply(pd.Series)], axis=1)

    # Remove completely NaN columns
    df = df.dropna(axis=1, how='all')

    # Column cleaning operations

    columns = df.columns.tolist()

    # Rename first "name" column to "parent_category"
    columns[0] = 'parent_cat_desc'
    columns[1] = 'parent_cat_id'

    df.columns = columns

    # Column dtype setting

    df['id'] = df['id'].astype('Int64')

    # Explode the final childrens column

    df = df.explode("children").reset_index(drop=True)
    df = pd.concat([df.drop(["children"], axis=1),
                   df["children"].apply(pd.Series)], axis=1)

    # Remove completely NaN columns
    df = df.dropna(axis=1, how='all')

    # Column cleaning operations part 2 lol

    columns_2 = df.columns.tolist()

    # Rename first "name" column to "parent_category"
    columns_2[4] = 'sub_cat_level_1'
    columns_2[5] = 'sub_cat_level_1_id'
    columns_2[6] = 'sub_cat_level_2'
    columns_2[7] = 'sub_cat_level_2_id'

    df.columns = columns_2

    # Explode the final childrens column

    df = df.explode("children").reset_index(drop=True)
    df = pd.concat([df.drop(["children"], axis=1),
                   df["children"].apply(pd.Series)], axis=1)

    # Remove completely NaN columns
    df = df.dropna(axis=1, how='all')

    # Column cleaning operations part 2 lol

    columns_3 = df.columns.tolist()

    # Rename first "name" column to "parent_category"
    columns_3[8] = 'sub_cat_level_3'
    columns_3[9] = 'sub_cat_level_3_id'

    df.columns = columns_3

    df = df.explode("children").reset_index(drop=True)
    df = pd.concat([df.drop(["children"], axis=1),
                   df["children"].apply(pd.Series)], axis=1)

    # Remove completely NaN columns
    df = df.dropna(axis=1, how='all')

    columns_4 = df.columns.tolist()

    # Rename first "name" column to "parent_category"
    columns_4[10] = 'sub_cat_level_4'
    columns_4[11] = 'sub_cat_level_4_id'

    df.columns = columns_4

    df['sub_cat_level_2_id'] = df['sub_cat_level_2_id'].astype('Int64')
    df['sub_cat_level_3_id'] = df['sub_cat_level_3_id'].astype('Int64')
    df['sub_cat_level_4_id'] = df['sub_cat_level_4_id'].astype('Int64')

    df = df.fillna(pd.NA)

    # Define a list of all subcategory columns
    subcategory_columns = ['sub_cat_level_1', 'sub_cat_level_2',
                           'sub_cat_level_3', 'sub_cat_level_4', 'parent_cat_desc']
    subcategory_columns_id = ['sub_cat_level_1_id', 'sub_cat_level_2_id',
                              'sub_cat_level_3_id', 'sub_cat_level_4_id', 'parent_cat_id']

    # Coalesce the subcategory columns into a single column containing only the first non-null value
    df['niche_desc'] = df[subcategory_columns].apply(
        lambda x: x.dropna().iloc[0], axis=1)
    df['niche_id'] = df[subcategory_columns_id].apply(
        lambda x: x.dropna().iloc[0], axis=1)

    df = df.drop(columns=['sub_cat_level_1', 'sub_cat_level_2',
                 'sub_cat_level_3', 'sub_cat_level_4', 'parent_cat_desc'])
    df = df.drop(columns=['sub_cat_level_1_id', 'sub_cat_level_2_id',
                 'sub_cat_level_3_id', 'sub_cat_level_4_id', 'parent_cat_id'])
    df = df.drop(columns=['metadata_name', 'metadata_id'])

    return df


In [None]:
# could replace top news with top tweets might be better content for the base source to summerize/ mutate


# Gen Trending Idea

In [10]:
import pandas as pd
from GoogleNews import GoogleNews
from newspaper import Article, Config, ArticleException


def fetch_top_stories(keyword, n):
    # initialize GoogleNews object
    googlenews = GoogleNews()
    googlenews.set_lang('en')

    # search for top news related to the keyword
    googlenews.search(keyword)

    # get URLs of top n stories
    links = googlenews.get_links()[:n]

    # initialize list to store dataframes
    df_list = []

    # loop through URLs
    for url in links:
        # initialize Article object with custom configuration
        config = Config()
        config.browser_user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
        article = Article(url, config=config)

        try:
            article.download()
            article.parse()

            # check if summary is available
            if article.summary:
                summary = article.summary
            else:
                summary = None

            # create dataframe with metadata and content
            df = pd.DataFrame({
                'title': [article.title],
                'authors': [article.authors],
                'publish_date': [article.publish_date],
                'summary': [summary],
                'content': [article.text],
                'error': [None]
            })

        except ArticleException as e:
            print(f"Article {url} failed with error: {e}")
            # create dataframe with error message
            df = pd.DataFrame({
                'title': [None],
                'authors': [None],
                'publish_date': [None],
                'summary': [None],
                'content': [None],
                'error': [f"Article {url} failed with error: {e}"]
            })

        # add dataframe to list
        df_list.append(df)

    # concatenate dataframes
    result_df = pd.concat(df_list, ignore_index=True)

    return result_df


In [3]:
#sudo code

#Function
# input: a search term string
# output: a list of top trending youtube videos URLs related to that search term

#Function
# input: a list youtube urls
# output: a data frame containing the following columns [url, is_clippable, peak_video_moments]

#Function
# input: a url of a youtube video
# output: the file path which had the downloaded youtube video was written to as an .mp4

#Function
# input: file path to a mp4 file
# output: the file path of the clipped video which was written as an .mp4


import pandas as pd
from pytube import YouTube
import moviepy.editor as mp
import re

# Function to get top trending youtube videos URLs related to a search term
def get_trending_videos(search_term):
    # Use YouTube API to get search results for the given term
    # Extract the video URLs from the search results
    # Sort the URLs based on the view count and return the top URLs
    # Return the list of top URLs
    return top_urls

# Function to get data frame containing [url, is_clippable, peak_video_moments] for a list of youtube urls
def get_video_details(urls):
    video_details = []
    for url in urls:
        try:
            # Use pytube to get the video object
            video = YouTube(url)
            # Check if the video is clippable (duration less than 10 minutes)
            is_clippable = True if video.length <= 600 else False
            # Use moviepy to get peak moments of the video
            clip = mp.VideoFileClip(video.filename)
            peak_video_moments = clip.highlights()
            # Append the details to the list
            video_details.append([url, is_clippable, peak_video_moments])
        except:
            print(f"Error occurred while processing URL: {url}")
    # Convert the list to a pandas data frame
    video_df = pd.DataFrame(video_details, columns=['url', 'is_clippable', 'peak_video_moments'])
    return video_df

# Function to download a youtube video and return the file path of the downloaded .mp4 file
def download_video(url):
    try:
        # Use pytube to get the video object and download the video
        video = YouTube(url)
        video_stream = video.streams.get_highest_resolution()
        video_stream.download()
        # Return the file path of the downloaded .mp4 file
        return video.filename
    except:
        print(f"Error occurred while downloading video from URL: {url}")

# Function to clip a video and return the file path of the clipped .mp4 file
def clip_video(video_path):
    try:
        # Use moviepy to clip the video and save it to a new file
        clip = mp.VideoFileClip(video_path)
        clip = clip.subclip(0, 30) # clip the first 30 seconds of the video
        clip_path = re.sub(".mp4", "_clip.mp4", video_path)
        clip.write_videofile(clip_path)
        # Return the file path of the clipped .mp4 file
        return clip_path
    except:
        print(f"Error occurred while clipping video: {video_path}")




# Text

In [4]:
def build_short_text_prompt(title, content):
    """
    Builds a string using the given title and content.
    """
    import re

    # Replace all line breaks, carriage returns, and other non-alphanumeric characters with a space
    cleaned_content = re.sub(r'[\n\r\W]+', ' ', content)

    # Remove any leading or trailing whitespace
    cleaned_content = cleaned_content.strip()

    output_string = f"Find the 3 most interesting fact from the following article and condense them into a bulleted list containing as less than or equal to 9 words each:\n {cleaned_content} \n"

    return output_string


In [12]:
import openai
import os

openai.api_key = os.environ['open_ai_key']


def generate_interesting_fact(prompt):
    """
    Generates a text completion for the given prompt using the OpenAI API.

    Args:
        prompt (str): The prompt to generate a completion for.

    Returns:
        str: The generated text completion.
    """

    response = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=[
        {"role": "user", "content": f"{prompt}"},
    ]
    )

    # Retrieve the generated completion
    response_str = response.choices[0].message.content.strip()

    return response_str


# Video

In [6]:
import os
import requests

def download_video_from_pexels(search_term, niche_name_clean):
    # Make a request to the Pexels API to get videos based on the keyword
    pexels_private_key = os.environ['pexels_api_key']
    url = f'https://api.pexels.com/videos/search?query={search_term}&per_page=1'
    headers = {'Authorization': 'k7D5HBnwo54elZXMc73DE4OiPUbYspcXwgMyNLWWpMyJHXe1AhTL0wcT'}
    # headers = {'Authorization': pexels_private_key}
    response = requests.get(url, headers=headers)

    # Get the URL of the video with the highest quality
    video_url = response.json()['videos'][0]['video_files'][0]['link']

    # Download the video
    response = requests.get(video_url)

    # Save the video to a file in the current working directory
    filepath = f'data/video_example/{niche_name_clean}.mp4'
    with open(filepath, 'wb') as f:
        f.write(response.content)
    
    return filepath


In [8]:
test_path = download_video_from_pexels('k - pop', 'anime_&_manga')

### Video & Audio | Download

In [13]:
# add search block here then download

import requests

# retruns first video id just for demo, could make more advanced search in the future


def get_video_id(resource, api_key, expires, hmac, project_id, user_id, keywords, content_type):

    url = f"https://api.videoblocks.com{resource}?APIKEY={api_key}&EXPIRES={expires}&HMAC={hmac}&project_id={project_id}&user_id={user_id}&keywords={keywords}&content_type={content_type}"
    payload = {}
    headers = {}
    response = requests.request("GET", url, headers=headers, data=payload)

    if response.status_code == 200:
        data = response.json()
        return data["results"][0]["id"]
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None


In [12]:
def fetch_video_url(resource, api_key, expires, hmac, project_id, user_id):

    url = f"https://api.videoblocks.com{resource}?APIKEY={api_key}&EXPIRES={expires}&HMAC={hmac}&project_id={project_id}&user_id={user_id}"
    payload = {}
    headers = {}
    response = requests.request("GET", url, headers=headers, data=payload)

    if response.status_code == 200:
        data = response.json()
        return data["MOV"]["_1080p"]
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None


In [13]:
import os
import requests


def download_video(url, filename):
    # Create the data/video_example directory if it doesn't exist
    os.makedirs('data/video_example', exist_ok=True)

    # Send an HTTP request to the URL
    response = requests.get(url)
    download_path = 'data/video_example/' + filename
    # Open a file for writing in the data/video_example directory
    with open(download_path, 'wb') as f:
        # Write the content of the response to the file
        f.write(response.content)

    return download_path


# Audio

In [16]:
from TikTokApi import TikTokApi
from moviepy.editor import AudioFileClip

def get_tiktok_trending_audio(search_term, niche_name_clean, output_len=10):
    # set up TikTok API client
    api = TikTokApi()

    # search for audio clips that match a keyword input
    results = api.search_music(search_term, count=10)

    # parse the API response and extract the first result's unique ID
    if results:
        music_id = results[0]['id']
    else:
        print(f"No results found for keyword '{search_term}'")
        return None

    # download the audio clip using its unique ID
    try:
        music_data = api.get_music_object(music_id)
        music_url = music_data['playUrl']
        music_file = api.get_bytes(music_url)
    except Exception as e:
        print(f"Error downloading music: {e}")
        return None

    # extract the first output_len seconds of the audio clip
    music = AudioFileClip(music_file)
    music_extract = music.subclip(0, output_len)

    # save the extracted audio clip to disk
    output_file = f"{niche_name_clean}.mp3"
    music_extract.write_audiofile(output_file)

    return output_file

In [17]:
test_path = get_tiktok_trending_audio(
    search_term='Anime & Manga', 
    niche_name_clean='anime_&_manga', 
    output_len=15
    )

RuntimeError: This event loop is already running

### Audio | download

# need to add section for play.ht api

In [14]:
# add search block here then download

import requests

# retruns first video id just for demo, could make more advanced search in the future


def get_audio_id(resource, api_key, expires, hmac, project_id, user_id, keywords, content_type):

    url = f"https://api.audioblocks.com{resource}?APIKEY={api_key}&EXPIRES={expires}&HMAC={hmac}&project_id={project_id}&user_id={user_id}&keywords={keywords}&content_type={content_type}"
    payload = {}
    headers = {}
    response = requests.request("GET", url, headers=headers, data=payload)

    if response.status_code == 200:
        data = response.json()
        return data["results"][0]["id"]
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None


In [15]:
def fetch_audio_url(resource, api_key, expires, hmac, project_id, user_id):

    url = f"https://api.audioblocks.com{resource}?APIKEY={api_key}&EXPIRES={expires}&HMAC={hmac}&project_id={project_id}&user_id={user_id}"
    payload = {}
    headers = {}
    response = requests.request("GET", url, headers=headers, data=payload)

    if response.status_code == 200:
        data = response.json()
        return data['WAV']
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None


In [16]:
import os
import requests


def download_audio(url, filename):
    # Create the data/video_example directory if it doesn't exist
    os.makedirs('data/video_example', exist_ok=True)

    # Send an HTTP request to the URL
    response = requests.get(url)
    download_path = 'data/video_example/' + filename
    # Open a file for writing in the data/video_example directory
    with open(download_path, 'wb') as f:
        # Write the content of the response to the file
        f.write(response.content)

    return download_path


### Video | Composite

## video and audio text

In [17]:
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip, AudioFileClip

# !cat /etc/ImageMagick-6/policy.xml | sed 's/none/read,write/g'> /etc/ImageMagick-6/policy.xml
ImageMagickBinary = {
    'magick': '/usr/bin/magick',
    'convert': '/usr/bin/convert'
}


def create_composite(video_path, audio_path, text, font, fontsize, color, pos, output_path):
    # Load the video file
    video_clip = VideoFileClip(video_path)

    # Load the audio file
    audio_clip = AudioFileClip(audio_path)

    # Create the text clip
    text_clip = TextClip(text, fontsize=fontsize, font=font, color=color)

    # setting position of text in the center and duration will be 10 seconds 
    text_clip = text_clip.set_pos('center').set_duration(video_clip.duration)

    audio_clip = audio_clip.set_duration(video_clip.duration)

    # Combine the video clip, audio clip and text clip
    final_clip = CompositeVideoClip([video_clip, text_clip]).set_audio(audio_clip)

    # Write the video clip with the new text overlay and audio to a file
    final_clip.write_videofile(
        filename=output_path,
        audio_codec='aac',
        fps=48,
    )


## video and image, text

In [15]:
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip, AudioFileClip

# !cat /etc/ImageMagick-6/policy.xml | sed 's/none/read,write/g'> /etc/ImageMagick-6/policy.xml
ImageMagickBinary = {
'magick': '/usr/bin/magick',
'convert': '/usr/bin/convert'
}


def create_composite(video_path, image_path, text, font, fontsize, color, pos, output_path):
    # Load the video file
    video_clip = VideoFileClip(video_path)

    # Create the text clip
    text_clip = TextClip(text, fontsize=fontsize, font=font, color=color)

    # setting position of text in the center and duration will be 10 seconds 
    text_clip = text_clip.set_pos('center').set_duration(video_clip.duration)

    # Load the image and video clips
    image_clip = ImageClip(image_path).set_duration(video_clip.duration)

    # Add the video clip in the bottom right corner of the image
    video_clip = video_clip.resize(height=140)
    video_clip = video_clip.set_position(('right', 'bottom'))

    # Combine the clips and write the output file
    final_clip = CompositeVideoClip([image_clip, text_clip, video_clip])

    # Write the video clip with the new text overlay and audio to a file
    final_clip.write_videofile(
        filename=output_path,
        audio_codec='aac',
        fps=48,
    )

## video and video, text

In [26]:
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip, AudioFileClip

# !cat /etc/ImageMagick-6/policy.xml | sed 's/none/read,write/g'> /etc/ImageMagick-6/policy.xml
ImageMagickBinary = {
    'magick': '/usr/bin/magick',
    'convert': '/usr/bin/convert'
}


def create_composite(video_path, background_video_path, text, font, fontsize, color, pos, output_path):
    # Load the video files
    video_clip = VideoFileClip(video_path)
    background_clip = VideoFileClip(background_video_path)

    # Create the text clip
    text_clip = TextClip(text, fontsize=fontsize, font=font, color=color)

    # Set position and duration of text to match the video clips
    text_clip = text_clip.set_pos('center').set_duration(video_clip.duration)

    # Trim the background clip to match the duration of the video clip
    background_clip = background_clip.subclip(0, video_clip.duration)

    # Resize the background clip to match the video clip dimensions
    background_clip = background_clip.resize(video_clip.size)

    # Add the video clip in the bottom right corner of the background clip
    video_clip = video_clip.resize(height=140)
    video_clip = video_clip.set_position(('right', 'bottom'))

    # Combine the clips and write the output file
    final_clip = CompositeVideoClip([background_clip, text_clip, video_clip])

    # Write the video clip with the new text overlay and audio to a file
    final_clip.write_videofile(
        filename=output_path,
        audio_codec='aac',
        fps=48,
    )



In [27]:
create_composite(
    video_path = 'data/video_example/digi_avatar_example_v2.mp4', 
    background_video_path = 'data/video_example/anime_&_manga.mp4',
    text =  shorts_text_candidate, 
    font = "DejaVu-Serif-Bold", 
    fontsize = 15, 
    color = "white", 
    pos = None, 
    output_path = 'data/video_example/output_test_digitar_v2.mp4',
    )


Moviepy - Building video data/video_example/output_test_digitar_v2.mp4.
MoviePy - Writing audio in output_test_digitar_v2TEMP_MPY_wvf_snd.mp4


                                                                    

MoviePy - Done.
Moviepy - Writing video data/video_example/output_test_digitar_v2.mp4



                                                              

Moviepy - Done !
Moviepy - video ready data/video_example/output_test_digitar_v2.mp4


# Flow draft

In [17]:
niche_df = gen_niche_list()
# niche_name = niche_df['niche_desc'][0]
niche_name = "K pop"


top_stories_df = fetch_top_stories(keyword=niche_name, n=5)
title = top_stories_df['title'][0]
content = top_stories_df['content'][0]

In [18]:
shorts_prompt = build_short_text_prompt(
    title=title,
    content=content
)


In [19]:
shorts_text_candidate = generate_interesting_fact(
    prompt=shorts_prompt[:1000],
)


In [21]:
print(shorts_text_candidate)


- Aespa is a fourth gen K-pop group
- They will perform at an impressive location
- They are the first in their generation to do so


### Execute video grab

In [22]:
# required modules
import time
import hmac
import hashlib
import os

# Provided by Storyblocks
public_key = os.environ['storyblocks_api_key_public_test']
private_key = os.environ['storyblocks_api_key_private_test']
baseUrl = "https://api.videoblocks.com"
expires = str(int(time.time()))
resource = "/api/v2/videos/search"
hmacBuilder = hmac.new(bytearray(private_key + expires, 'utf-8'),
                       resource.encode('utf-8'), hashlib.sha256)
hmacHex = hmacBuilder.hexdigest()
project_id = "auto_shorts_demo"
user_id = "auto_shorts_bot_process"
keywords = niche_name
content_type = "footage"


video_id = get_video_id(
    resource=resource,
    api_key=public_key,
    expires=expires,
    hmac=hmacHex,
    project_id=project_id,
    user_id=user_id,
    keywords=keywords,
    content_type=content_type,
)


In [23]:
# required modules
import time
import hmac
import hashlib
import os

# Provided by Storyblocks
public_key = os.environ['storyblocks_api_key_public_test']
private_key = os.environ['storyblocks_api_key_private_test']
baseUrl = "https://api.videoblocks.com"
expires = str(int(time.time()))
resource = f"/api/v2/videos/stock-item/download/{video_id}"
hmacBuilder = hmac.new(bytearray(private_key + expires, 'utf-8'),
                       resource.encode('utf-8'), hashlib.sha256)
hmacHex = hmacBuilder.hexdigest()
project_id = "auto_shorts_demo"
user_id = "auto_shorts_bot_process"


video_url = fetch_video_url(
    resource=resource,
    api_key=public_key,
    expires=expires,
    hmac=hmacHex,
    project_id=project_id,
    user_id=user_id,
)


In [25]:
niche_clean = niche_name.replace(' ', '_').lower()

video_path = download_video(
    url=video_url,
    filename=f"{niche_clean}_{video_id}.mp4"
)


In [26]:
video_path

'data/video_example/arts_&_entertainment_5143986.mp4'

### Find Audio

In [27]:
# required modules
import time
import hmac
import hashlib
import os

# https://documentation.storyblocks.com/?_ga=2.209332763.846714940.1678072328-561796075.1674608327&_gac=1.252807931.1677039592.CjwKCAiA9NGfBhBvEiwAq5vSy4yIwO38LysWrZvL0h1E3otnsyv6YqT7i4aTe8HTwFTtCfnA08AymxoCTcsQAvD_BwE

# Provided by Storyblocks
public_key = os.environ['storyblocks_api_key_public_test']
private_key = os.environ['storyblocks_api_key_private_test']
baseUrl = "https://api.audioblocks.com"
expires = str(int(time.time()))
resource = "/api/v2/audio/search"
hmacBuilder = hmac.new(bytearray(private_key + expires, 'utf-8'),
                       resource.encode('utf-8'), hashlib.sha256)
hmacHex = hmacBuilder.hexdigest()
project_id = "auto_shorts_demo"
user_id = "auto_shorts_bot_process"
keywords = niche_name
content_type = "music"


audio_id = get_audio_id(
    resource=resource,
    api_key=public_key,
    expires=expires,
    hmac=hmacHex,
    project_id=project_id,
    user_id=user_id,
    keywords=keywords,
    content_type=content_type,
)


In [28]:
audio_id

179496

In [29]:
#fetch audio urll

# required modules
import time
import hmac
import hashlib
import os

# Provided by Storyblocks
public_key = os.environ['storyblocks_api_key_public_test']
private_key = os.environ['storyblocks_api_key_private_test']
baseUrl = "https://api.audioblocks.com"
expires = str(int(time.time()))
resource = f"/api/v2/audio/stock-item/download/{audio_id}"
hmacBuilder = hmac.new(bytearray(private_key + expires, 'utf-8'),
                       resource.encode('utf-8'), hashlib.sha256)
hmacHex = hmacBuilder.hexdigest()
project_id = "auto_shorts_demo"
user_id = "auto_shorts_bot_process"


audio_url = fetch_audio_url(
    resource=resource,
    api_key=public_key,
    expires=expires,
    hmac=hmacHex,
    project_id=project_id,
    user_id=user_id,
)


In [30]:
audio_url

'https://dm0qx8t0i9gc9.cloudfront.net/content/audio/HNxwBHlArk43bm5tw/audioblocks-robert-frost-iii_bright-happy-fun-jolly-v2_educational-full-mix-108-bpm_Svq1vspuw.wav?type=download&origin=AUDIOBLOCKS&timestamp_ms=1678075763429&publicKey=test_be5c923b8fca02fcc6a733ad883a8561ecfc2fff5b9c4365816124def58&apiVersion=2.0&endUserId=277b79e281cfc09f7571bb900689777fa2cd579d&projectId=auto_shorts_demo&stockItemId=179496&format=WAV&response-content-disposition=attachment&Expires=1678077563&Signature=Zz8DnuvZkVRq9tcK1zcEix5DCMUIr~EskaBdmT6ufRAGNwjAVEDJ6d0csVZBHOch3R11EJZecTlzj5TTIwPHYTkwAsI8Fgv-9h9GoQZeuLIJcft16u-6fuEsWo5QYeCSvK7oLYlQZGqwI4mEYl-0NbO-hI1rBDfcxQnQTBS5sc0artg1W1-wlO2SatgsP6dMStzrfI-MAfgRiLJcwfTVJwIVdQkZtc9PwI0t4ua7L-vqU1DYtAebtiBkwpKpG~R907rqn~pOr-Q9CcYhbb55PrPmBvFf6rOnjflFxIfn-elwix3u0s8BVmufneHSZ7MguonS3wzxcNvkexMU5Z9e7w__&Key-Pair-Id=APKAIAQ2JPZIIO2PHR7Q'

In [31]:
# need to make the directory this all gets written to temporary and named base don the niche
niche_clean = niche_name.replace(' ', '_').lower()

audio_path = download_audio(
    url=audio_url,
    filename=f"{niche_clean}_{audio_id}.wav"
)


### Composite

In [42]:
create_composite(
    video_path = video_path, 
    audio_path = audio_path,
    text =  f"{title} \n\n {shorts_text_candidate}", 
    font = "DejaVu-Serif-Bold", 
    fontsize = 35, 
    color = "white", 
    pos = None, 
    output_path = 'data/video_example/output_edit_example.mp4',
    )


Moviepy - Building video data/video_example/output_edit_example.mp4.
MoviePy - Writing audio in output_edit_exampleTEMP_MPY_wvf_snd.mp4


                                                                    

MoviePy - Done.
Moviepy - Writing video data/video_example/output_edit_example.mp4



                                                              

Moviepy - Done !
Moviepy - video ready data/video_example/output_edit_example.mp4


# Generate affliate link

In [None]:
import hashlib
import hmac
import urllib.parse
import requests

def generate_affiliate_link(input_str):
    # Enter your Amazon API credentials
    AWS_ACCESS_KEY = 'YOUR_AWS_ACCESS_KEY'
    AWS_SECRET_KEY = 'YOUR_AWS_SECRET_KEY'
    AWS_ASSOCIATE_TAG = 'YOUR_AWS_ASSOCIATE_TAG'

    # Construct the API request
    params = {
        'Service': 'AWSECommerceService',
        'Operation': 'ItemSearch',
        'Keywords': input_str,
        'AssociateTag': AWS_ASSOCIATE_TAG,
        'ResponseGroup': 'ItemIds',
        'SearchIndex': 'All'
    }

    # Construct the request URL
    base_url = 'https://webservices.amazon.com/onca/xml'
    query_string = urllib.parse.urlencode(sorted(params.items()))
    request_url = f'{base_url}?{query_string}'

    # Generate the signature for the request
    message = f'GET\nwebservices.amazon.com\n/onca/xml\n{query_string}'.encode('utf-8')
    signature = hmac.new(AWS_SECRET_KEY.encode('utf-8'), message, hashlib.sha256).hexdigest()
    request_url += f'&Signature={urllib.parse.quote(signature)}'

    # Make the API request and parse the response
    response = requests.get(request_url)
    response_text = response.text

    # Extract the item ID from the response
    item_id = response_text.split('<ItemId>')[1].split('</ItemId>')[0]

    # Construct the referral link using the item ID
    referral_link = f'https://www.amazon.com/dp/{item_id}/?tag={AWS_ASSOCIATE_TAG}'

    return referral_link


# upload to all syndicated platforms

### youtube

In [4]:
# https://developers.google.com/youtube/v3/guides/uploading_a_video

# #!/usr/bin/python

import httplib
import httplib2
import os
import random
import sys
import time

from apiclient.discovery import build
from apiclient.errors import HttpError
from apiclient.http import MediaFileUpload
from oauth2client.client import flow_from_clientsecrets
from oauth2client.file import Storage
from oauth2client.tools import argparser, run_flow


# Explicitly tell the underlying HTTP transport library not to retry, since
# we are handling retry logic ourselves.
httplib2.RETRIES = 1

# Maximum number of times to retry before giving up.
MAX_RETRIES = 10

# Always retry when these exceptions are raised.
RETRIABLE_EXCEPTIONS = (httplib2.HttpLib2Error, IOError, httplib.NotConnected,
  httplib.IncompleteRead, httplib.ImproperConnectionState,
  httplib.CannotSendRequest, httplib.CannotSendHeader,
  httplib.ResponseNotReady, httplib.BadStatusLine)

# Always retry when an apiclient.errors.HttpError with one of these status
# codes is raised.
RETRIABLE_STATUS_CODES = [500, 502, 503, 504]

# The CLIENT_SECRETS_FILE variable specifies the name of a file that contains
# the OAuth 2.0 information for this application, including its client_id and
# client_secret. You can acquire an OAuth 2.0 client ID and client secret from
# the Google API Console at
# https://console.cloud.google.com/.
# Please ensure that you have enabled the YouTube Data API for your project.
# For more information about using OAuth2 to access the YouTube Data API, see:
#   https://developers.google.com/youtube/v3/guides/authentication
# For more information about the client_secrets.json file format, see:
#   https://developers.google.com/api-client-library/python/guide/aaa_client_secrets
CLIENT_SECRETS_FILE = "client_secrets.json"

# This OAuth 2.0 access scope allows an application to upload files to the
# authenticated user's YouTube channel, but doesn't allow other types of access.
YOUTUBE_UPLOAD_SCOPE = "https://www.googleapis.com/auth/youtube.upload"
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"

# This variable defines a message to display if the CLIENT_SECRETS_FILE is
# missing.
MISSING_CLIENT_SECRETS_MESSAGE = """
WARNING: Please configure OAuth 2.0

To make this sample run you will need to populate the client_secrets.json file
found at:

   %s

with information from the API Console
https://console.cloud.google.com/

For more information about the client_secrets.json file format, please visit:
https://developers.google.com/api-client-library/python/guide/aaa_client_secrets
""" % os.path.abspath(os.path.join(os.path.dirname(__file__),
                                   CLIENT_SECRETS_FILE))

VALID_PRIVACY_STATUSES = ("public", "private", "unlisted")


def get_authenticated_service(args):
  flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE,
    scope=YOUTUBE_UPLOAD_SCOPE,
    message=MISSING_CLIENT_SECRETS_MESSAGE)

  storage = Storage("%s-oauth2.json" % sys.argv[0])
  credentials = storage.get()

  if credentials is None or credentials.invalid:
    credentials = run_flow(flow, storage, args)

  return build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION,
    http=credentials.authorize(httplib2.Http()))

def initialize_upload(youtube, options):
  tags = None
  if options.keywords:
    tags = options.keywords.split(",")

  body=dict(
    snippet=dict(
      title=options.title,
      description=options.description,
      tags=tags,
      categoryId=options.category
    ),
    status=dict(
      privacyStatus=options.privacyStatus
    )
  )

  # Call the API's videos.insert method to create and upload the video.
  insert_request = youtube.videos().insert(
    part=",".join(body.keys()),
    body=body,
    # The chunksize parameter specifies the size of each chunk of data, in
    # bytes, that will be uploaded at a time. Set a higher value for
    # reliable connections as fewer chunks lead to faster uploads. Set a lower
    # value for better recovery on less reliable connections.
    #
    # Setting "chunksize" equal to -1 in the code below means that the entire
    # file will be uploaded in a single HTTP request. (If the upload fails,
    # it will still be retried where it left off.) This is usually a best
    # practice, but if you're using Python older than 2.6 or if you're
    # running on App Engine, you should set the chunksize to something like
    # 1024 * 1024 (1 megabyte).
    media_body=MediaFileUpload(options.file, chunksize=-1, resumable=True)
  )

  resumable_upload(insert_request)

# This method implements an exponential backoff strategy to resume a
# failed upload.
def resumable_upload(insert_request):
  response = None
  error = None
  retry = 0
  while response is None:
    try:
      print "Uploading file..."
      status, response = insert_request.next_chunk()
      if response is not None:
        if 'id' in response:
          print "Video id '%s' was successfully uploaded." % response['id']
        else:
          exit("The upload failed with an unexpected response: %s" % response)
    except HttpError, e:
      if e.resp.status in RETRIABLE_STATUS_CODES:
        error = "A retriable HTTP error %d occurred:\n%s" % (e.resp.status,
                                                             e.content)
      else:
        raise
    except RETRIABLE_EXCEPTIONS, e:
      error = "A retriable error occurred: %s" % e

    if error is not None:
      print error
      retry += 1
      if retry > MAX_RETRIES:
        exit("No longer attempting to retry.")

      max_sleep = 2 ** retry
      sleep_seconds = random.random() * max_sleep
      print "Sleeping %f seconds and then retrying..." % sleep_seconds
      time.sleep(sleep_seconds)

if __name__ == '__main__':
  argparser.add_argument("--file", required=True, help="Video file to upload")
  argparser.add_argument("--title", help="Video title", default="Test Title")
  argparser.add_argument("--description", help="Video description",
    default="Test Description")
  argparser.add_argument("--category", default="22",
    help="Numeric video category. " +
      "See https://developers.google.com/youtube/v3/docs/videoCategories/list")
  argparser.add_argument("--keywords", help="Video keywords, comma separated",
    default="")
  argparser.add_argument("--privacyStatus", choices=VALID_PRIVACY_STATUSES,
    default=VALID_PRIVACY_STATUSES[0], help="Video privacy status.")
  args = argparser.parse_args()

  if not os.path.exists(args.file):
    exit("Please specify a valid file using the --file= parameter.")

  youtube = get_authenticated_service(args)
  try:
    initialize_upload(youtube, args)
  except HttpError, e:
    print "An HTTP error %d occurred:\n%s" % (e.resp.status, e.content)


SyntaxError: Missing parentheses in call to 'print'. Did you mean print("Uploading file...")? (1807360858.py, line 133)

# Affliant links api

In [None]:
# https://docs.affluent.io/reference/post_v1-actions-1

import requests

url = "https://server-api.affluent.io/v1/actions/"

payload = {
    "currency": "USD",
    "firstDayOfWeek": 1,
    "limit": 100,
    "offset": 0,
    "includeSkus": "false"
}
headers = {
    "accept": "application/stream+json",
    "content-type": "application/json"
}

response = requests.post(url, json=payload, headers=headers)

print(response.text)

# scraps

In [None]:
import openai
import os

openai.api_key = os.environ['open_ai_key']


def generate_bullet_summary(prompt, max_tokens_param):
    """
    Generates a text completion for the given prompt using the OpenAI API.

    Args:
        prompt (str): The prompt to generate a completion for.

    Returns:
        str: The generated text completion.
    """
    # Set up the API parameters
    model_engine = "text-davinci-003"  # Replace with your desired language model
    max_tokens = max_tokens_param  # Maximum number of tokens to generate
    n = 1  # Number of completions to generate
    stop_sequence = None  # Sequence to stop generation at

    # Generate the completion using the OpenAI API
    response = openai.Completion.create(
        model=model_engine,
        prompt=prompt,
        max_tokens=max_tokens,
        n=n,
        stop=stop_sequence,
        temperature=0.5,
    )

    # Retrieve the generated completion
    response_str = response.choices[0].text.strip()

    return response_str
