# Imports

In [None]:
!pip install anthropic
!pip install ffmpeg-python
!pip install av
!pip install scenedetect

In [None]:
import os
import pandas as pd
import anthropic
from json import loads,dumps
import time
from glob import glob
import json
import pandas as pd
import seaborn as sns
from sklearn.metrics import accuracy_score,classification_report,confusion_matrix

In [None]:
import cv2
import torch
import random
import ffmpeg
import warnings
import numpy as np
from PIL import Image
from glob import glob
import soundfile as sf
import matplotlib.pyplot as plt
#from pydub import AudioSegment
from scipy.signal import resample
import typing_extensions as typing
from scenedetect import open_video,  VideoStreamCv2, SceneManager
from scenedetect.detectors import ContentDetector

In [None]:
limit = 5000 

# Data Loading

In [None]:
# Setting up video directories 

true1dir = '/kaggle/input/video-classification-data/Videos/Videos/videos-true' 
true2dir = '/kaggle/input/video-classification-data/Videos/Videos/videos-true-2' 
kids_dir = '/kaggle/input/video-classification-data/Made-For-Kids/Made-For-Kids'

false1dir = '/kaggle/input/video-classification-data/Videos/Videos/videos-false' 
false2dir = '/kaggle/input/video-classification-data/Videos/Videos/videos-false-2' 
non_kids_dir = '/kaggle/input/video-classification-data/Non Made For Kids/Non Made For Kids'

In [None]:
# Functions to extract corresponding transcriptions and ground truths 

def get_transcriptions_and_paths(vids_dir, ground_truths, transcriptions_path): 

    ids = os.listdir(vids_dir) 

    labels_df = pd.DataFrame({ 
        'IDs': ids, 
        'Labels': [ground_truths] * len(ids) 
    }) 
    
    transcriptions_df = pd.read_csv(transcriptions_path) 
    
    # Merging dataframes 
    df = pd.merge(labels_df, transcriptions_df, left_on='IDs', right_on='Video Id') 

    # Extracting transcriptions 
    ids = list(df['IDs']) 
    paths = [vids_dir + '/' + id for id in ids] 
    all_transcriptions = list(df['Transcription']) 

    # Extracting data from transcripts 
    transcriptions_temp = [] 
    lengths = [] 
    for (i, id_) in enumerate(ids): 
        transcriptions_temp.append(all_transcriptions[i].split("chunks")[0]) 
        lengths.append(len(all_transcriptions[i].split("chunks")[0])) 

    # setting limit on transcription length 
    transcriptions = [x[:limit] + '...' if len(x) > limit else x for x in transcriptions_temp] 

    return paths, transcriptions, [ground_truths] * len(ids) 

In [None]:
# Loading paths, transcriptions, and labels 

true1_paths, true1_transcriptions, true1_labels = get_transcriptions_and_paths(true1dir, 'Child Directed', '/kaggle/input/video-classification-data/Translated Transcriptions/Translated Transcriptions/true-1-translated-transcriptions.csv') 
true2_paths, true2_transcriptions, true2_labels = get_transcriptions_and_paths(true2dir, 'Child Directed', '/kaggle/input/video-classification-data/Translated Transcriptions/Translated Transcriptions/true-2-translated-transcriptions.csv') 
kids_paths, kids_transcriptions, kids_labels = get_transcriptions_and_paths(kids_dir, 'Child Directed', '/kaggle/input/video-classification-data/Translated Transcriptions/Translated Transcriptions/made-for-kids_translated_transcriptions.csv') 

false1_paths, false1_transcriptions, false1_labels = get_transcriptions_and_paths(false1dir, 'Not Child Directed', '/kaggle/input/video-classification-data/Translated Transcriptions/Translated Transcriptions/false-1-translated-transcriptions.csv') 
false2_paths, false2_transcriptions, false2_labels = get_transcriptions_and_paths(false2dir, 'Not Child Directed', '/kaggle/input/video-classification-data/Translated Transcriptions/Translated Transcriptions/false-2-translated-transcriptions.csv') 
non_kids_paths, non_kids_transcriptions, non_kids_labels = get_transcriptions_and_paths(non_kids_dir, 'Not Child Directed', '/kaggle/input/video-classification-data/Translated Transcriptions/Translated Transcriptions/non-made-for-kids_translated_transcriptions.csv') 

In [None]:
# Final combined list of paths and ground labels 

paths = [] 
transcriptions = [] 
primary_labels = [] 

paths.extend(false1_paths) 
paths.extend(true1_paths) 
paths.extend(false2_paths) 
paths.extend(true2_paths) 
paths.extend(non_kids_paths)
paths.extend(kids_paths) 

primary_labels.extend(false1_labels) 
primary_labels.extend(true1_labels) 
primary_labels.extend(false2_labels) 
primary_labels.extend(true2_labels) 
primary_labels.extend(non_kids_labels) 
primary_labels.extend(kids_labels) 

transcriptions.extend(false1_transcriptions) 
transcriptions.extend(true1_transcriptions) 
transcriptions.extend(false2_transcriptions) 
transcriptions.extend(true2_transcriptions) 
transcriptions.extend(non_kids_transcriptions) 
transcriptions.extend(kids_transcriptions) 

video_ids = [path.split('/')[-1] for path in paths]

In [None]:
print(len(paths), len(primary_labels), len(transcriptions), len(video_ids)) 

**Extracting Images** 

In [None]:
def detect_scenes(video_path, threshold = 30):
    """Detect scenes in a video and return scene start and end frames."""
    scene_list = []
    while len(scene_list) < 6 and threshold > 0:
        threshold //= 2
    
        video = open_video(video_path)
        scene_manager = SceneManager()
        scene_manager.add_detector(ContentDetector(threshold=threshold))
    
        scene_manager.detect_scenes(video)
        scene_list = scene_manager.get_scene_list()
    
    return scene_list


def get_top_n_longest_scenes(scene_list, n):
    '''Return the top n longest scenes with start and end frame indices.'''
    scene_durations = [(start, end - start) for start, end in scene_list]
    scene_durations.sort(key=lambda x: x[1], reverse=True)

    # Top n longest scenes with start and end frame indices
    longest_scenes = [(start, start + duration) for start, duration in scene_durations[:n]]
    return longest_scenes

def sort_scenes_by_frame(scenes_list):
    '''Sort scenes by their start frame number.'''
    sorted_scenes = sorted(scenes_list, key=lambda scene: scene[0].get_frames())
    return sorted_scenes


def get_num_grids(video_path):
    '''Get number of grids to be created'''
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    duration = total_frames / fps

    # Calculate number of grids based on the duration
    duration = round(duration, 2)
    if ((duration // 60) + 1) <= 5:
        return int(((duration // 60) + 1))
    else:
        return 5

def extract_k_frames_from_scene(video_path, scene, k):
    '''Extract k frames evenly spaced from each scene.'''
    # Extract frame numbers from scene start and end
    start_frame = scene[0].get_frames() + 1
    end_frame = scene[1].get_frames() - 1

    # Create k equally spaced frame indices within the scene's range
    frame_indices = np.linspace(start_frame, end_frame, k, dtype=int)
    
    cap = cv2.VideoCapture(video_path)
    frames = []

    # Extract frames from calculated indices
    for frame_no in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_no)
        ret, frame = cap.read()
        if ret:
            frames.append(frame)
    
    cap.release()
    return frames


def create_image_grid(frames, grid_size=(1000, 1000)):
    '''Arrange 6 frames into a 3x2 grid and resize to the specified grid size.'''
    # Ensure all frames have the same size for concatenation
    frames = [cv2.resize(frame, (640, 360)) for frame in frames]  # Resize to a common size like 640x360
    rows = [np.concatenate(frames[i:i+2], axis=1) for i in range(0, 6, 2)]
    image_grid = np.concatenate(rows, axis=0)
    
    return np.array(Image.fromarray(image_grid).resize(grid_size))

In [None]:
def get_images(video_path, n=6):
    ''' 1. Detect scenes
        2. Get k; where k = num_grids
        3. Get the 6 longest scenes
        4. Sort scenes wrt frame numbers
        5. Extract n * k frames
        6. Create k image grids of n frames each
     '''
    scene_list = detect_scenes(video_path)
    k = get_num_grids(video_path)
    #k = 1 # For Single Grid of Major Scene Frames
    longest_scenes = get_top_n_longest_scenes(scene_list, n*k)
    scenes = sort_scenes_by_frame(longest_scenes)

    frames = []
    for scene in scenes:
        frames.extend(extract_k_frames_from_scene(video_path, scene, 1))

    grids = []
    for i in range(k):
        start_idx = i * n
        end_idx = start_idx + n
        grid_frames = frames[start_idx:end_idx]
        grid = create_image_grid(grid_frames, grid_size=(1000, 1000))
        grids.append(grid)

    return grids

In [None]:
def get_images_2(video_path, n = 6):
    ''' 
    Caters the videos where <= 6 scenes are extracted
    '''

    # Step 1: Detect scenes
    scene_list = detect_scenes(video_path)
    if not scene_list:
        return []  # Handle case where no scenes are detected
    
    # Step 2: Get the number of grids (k)
    k = get_num_grids(video_path)
    
    # Total number of frames needed
    total_frames_needed = n * k
    available_scenes = len(scene_list)

    if available_scenes == 0:
        return []  # Handle case where no scenes are available for frame extraction
    
    # Step 3: Adjust the number of frames extracted per scene
    frames_per_scene = total_frames_needed // available_scenes
    remaining_frames = total_frames_needed % available_scenes

    if available_scenes == 1:
        frames_per_scene = total_frames_needed  # Assign all frames to the single scene
        remaining_frames = 0  # No remaining frames

    # Extract frames evenly across all scenes
    frames = []
    for i, scene in enumerate(scene_list):
        num_frames = frames_per_scene + (1 if i < remaining_frames else 0)
        frames.extend(extract_k_frames_from_scene(video_path, scene, num_frames))

    # Ensure we have exactly total_frames_needed frames
    frames = frames[:total_frames_needed]
    
    # Step 4: Create image grids
    grids = []
    if len(frames) < n:  # If fewer frames are available than needed
        # Repeat the available frames until there are enough
        frames = frames * (n // len(frames)) + frames[:(n % len(frames))]

    # Create the grids
    for i in range(k):
        start_idx = i * n
        end_idx = start_idx + n
        grid_frames = frames[start_idx:end_idx]
        if len(grid_frames) > 0:  # Ensure we have frames to create the grid
            grid = create_image_grid(grid_frames, grid_size=(1000, 1000))
            grids.append(grid)
    
    return grids

**Defining Model** 

In [None]:
os.environ["ANTHROPIC_API_KEY"] = "" # add key here 

In [None]:
client = anthropic.Anthropic() 

In [None]:
import base64
import httpx

# Function to encode the image

def encode_image(image_path):
    with open(image_path, "rb") as image_file: 
        image_media_type = "image/png" 
        image_data = base64.b64encode(image_file.read()).decode('utf-8') 
        return image_media_type, image_data 

In [None]:
def classify_video_with_images(text_input, audio_transcription, image_paths):
    
    image_contents = [
        {
            "type": "image",
            "source": {
                "type": "base64",
                "media_type": img_media_type,
                "data": img_data,
            }
        }
        for image_path in image_paths
        for img_media_type, img_data in [encode_image(image_path)]
    ]
    
    # Prepare the payload with images and text
    response = client.messages.create(
        model="claude-3-5-sonnet-20240620",
        system="You are a content classification assistant that evaluates image frames and audio transcriptions from videos according to strict guidelines.",
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": [
                    *image_contents,  # Unpack the list of images
                    {
                        "type": "text",
                        "text": text_input
                    },
                    {
                        "type": "text",
                        "text": audio_transcription
                    }
                ],
            }
        ]
    )

    return response

**Prompt** 

In [None]:
# Create the prompt.
prompt = """
    A piece of content is child-directed if it meets any of these criteria:

    Designed for Children:
    The content is explicitly created with children as the intended audience, such as:
      - Educational videos for kids.
      - Child-friendly video games with cartoonish graphics and non-violent gameplay and story-telling
      - Animated movies/clips suitable for family viewing
      - Simple crafts, activities, or demonstrations aimed at children.

    Child-Appealing Elements:
    The content includes features commonly enjoyed by children, such as:
    - Colorful, cartoonish, or animated visuals (e.g., animals, anthropomorphic characters, fantasy creatures).
    - Light-hearted portrayals of themes like mischief or conflict, but without mature visuals, strong language, or adult humor.
    - Whimsical, or playful themes.
    - Non-violent and simplified gameplay or narratives.
    - Content with rhythmic elements (e.g., chants, exclamations, or songs appealing to young viewers).

    Important Considerations:
    - Content can be child-directed even if it also appeals to older audiences.
    - Gaming content is child-directed if it features family-friendly gameplay and cartoonish visuals.
    - Movie/TV clips are child-directed if rated G/PG.
    - Presence of mild peril or conflict is acceptable if presented appropriately.

    Exclusions:
    - Do not label content as "Child Directed" if it features significant:
      - Dark, mature, or violent themes.
      - Romantic subplots or adult humor.
      - Complex dialogue or advanced vocabulary inappropriate for children.
      - Songs that are not nursery rhymes or explicitly created for children.
    - Recipe, DIY, or instructional videos unless simplified for children (e.g., "Cooking for Kids").

    Instructions:
    Labeling:
    If the video meets the child-directed criteria label it as "Child Directed".
    If it does not meet these criteria label it as "Not Child Directed".

    Indicate the spoken language if any.
    Provide a brief justification explaining why the video is considered child-directed or not.

    Make the outputs in JSON format (keys inlcude 'label' (only one of: 'Child Directed', or 'Not Child Directed'), 'justification', and 'language' (if any is spoken)).
  Please say nothing else outside of this json format. 
  """

In [None]:
ids_covered = ['none']

**Running Model on Dataset** 

In [None]:
ids = []
labels = []
responses = []
predicted_labels = [] 
languages= [] 
remaining = [] 

img_dir = '/kaggle/working/Images'
if not os.path.exists(img_dir):
    os.makedirs(img_dir)

for i in range(len(paths)): 
    
    if video_ids[i] not in ids_covered:
        try:
            contents_of_ad = os.listdir(paths[i]) 
            contents_of_ad.remove('audio.mp3') 
            video_path = paths[i] + '/' + contents_of_ad[0] 
            audio_path = paths[i] + '/audio.mp3' 
    
            try: 
                print(i, video_path)
    
                # Extract multiple images representative of the video
                try:
                    images = get_images(video_path)
                except:
                    images = get_images_2(video_path)
    
                # Save each image returned by extract_images_of_frames
                image_paths = []
                for idx, img in enumerate(images):
                    # Convert NumPy array to PIL image
                    image = Image.fromarray(img)
                    
                    # Save the image to a file 
                    image_name = f"{video_ids[i]}_{idx + 1}.png"
                    image_path = os.path.join(img_dir, image_name)
                    image.save(image_path)
                    image_paths.append(image_path)
                    
                # Display the images
                fig, axes = plt.subplots(1, len(images), figsize=(10, 3))
                if len(images) == 1:
                    axes.imshow(images[0])
                    axes.axis('off')
                else:
                    for ax, img in zip(axes, images):
                        ax.imshow(img)
                        ax.axis('off')
    
                plt.tight_layout()
                plt.show()
    
                audio_transcription = transcriptions[i]
    
                # Make inference
                classification_response = classify_video_with_images(prompt, audio_transcription, image_paths)
                temp_id = video_ids[i] 
                temp_label = primary_labels[i] 
                temp_response = classification_response.content[0].text 
                pred_temp = json.loads(classification_response.content[0].text).get('label') 
                lang_temp = json.loads(classification_response.content[0].text).get('language') 
    
                print('Id:', temp_id, '. Primary Label:', temp_label, '\nResponse:', temp_response) 
    
                ids.append(temp_id) 
                labels.append(temp_label) 
                responses.append(temp_response) 
                predicted_labels.append(pred_temp) 
                languages.append(lang_temp) 
                
            except: 
                print('failed for ', i) 
                remaining.append(video_ids[i])
        except: 
            print('path not found ', i) 
            remaining.append(video_ids[i])

        time.sleep(20)

In [None]:
len(ids) 

In [None]:
# At the end, print remaining videos 

print("Remaining videos with errors:", remaining) 

In [None]:
for i in range(len(responses)): 
    print('True Label: ', labels[i], '\tPrediction: ', predicted_labels[i]) 

In [None]:
new_df = pd.DataFrame({
    'Video Id': ids,
    'Primary Label': labels,
    'Predicted Label': predicted_labels,
    'Languages': languages, 
    'Response': responses 
})

new_df.head() 

In [None]:
results_dir = '/kaggle/working/results'
if not os.path.exists(results_dir):
    os.makedirs(results_dir)

In [None]:
new_df.to_csv('/kaggle/working/results/DAVSP_Claude-3.5.csv', index=False)

In [None]:
# Changing to binary lists 

predictions = [1 if pred == 'Not Child Directed' else 0 for pred in predicted_labels] 
ground_truths = [1 if label == 'Not Child Directed' else 0 for label in labels] 

In [None]:
# Obtaining classification report 
from sklearn.metrics import classification_report 

report = classification_report(ground_truths, predictions) 
print(report) 

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(ground_truths, predictions)

plt.figure(figsize=(6,4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Not Child Directed', 'Child Directed'], yticklabels=['Not Child Directed', 'Child Directed'])
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()