In [157]:
import cv2
import os
import numpy as np

In [158]:
def select_key_frames(video_path, scene_change_threshold=0.2):
    key_frames = []
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Unable to open video file")
        return key_frames
    
    prev_frame = None
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        if prev_frame is not None:
            diff = cv2.absdiff(prev_frame, frame)
            diff_sum = diff.sum() / (frame.shape[0] * frame.shape[1])
            if diff_sum > scene_change_threshold:
                key_frames.append((frame_count, frame))
        prev_frame = frame.copy()
        frame_count += 1
    
    cap.release()
    return key_frames

def save_key_frames(key_frames, output_dir):
    for idx, frame in key_frames:
        cv2.imwrite(f"{output_dir}/key_frame_{idx}.jpg", frame)



In [159]:
video_path = "/Users/sanjiv/CS521/Final/news/3 officers killed 5 wounded during standoff.mp4"
output_dir = "/Users/sanjiv/CS521/Final/news_key_frames"
scene_change_threshold = 50 

key_frames = select_key_frames(video_path, scene_change_threshold)
save_key_frames(key_frames, output_dir)

In [160]:
def generate_canary_input(frame_path, resize_factors, dissimilarity_threshold=0.1):
    frame = cv2.imread(frame_path)
    frame_height, frame_width, _ = frame.shape

    smallest_dissimilarity = float('inf')
    selected_canary = None

    for factor in resize_factors:
        resized_frame = cv2.resize(frame, (frame_width // factor, frame_height // factor))
        smm = calculate_smm(frame, resized_frame)
        smsd = calculate_smsd(frame, resized_frame)

        if smm <= dissimilarity_threshold and smsd <= dissimilarity_threshold:
            if smm + smsd < smallest_dissimilarity:
                smallest_dissimilarity = smm + smsd
                selected_canary = resized_frame

    return selected_canary

def calculate_smm(frame1, frame2):
    smm = abs(frame1.mean() - frame2.mean()) / frame1.mean()
    return smm

def calculate_smsd(frame1, frame2):
    smsd = abs(frame1.std() - frame2.std()) / frame1.std()
    return smsd

def save_canary_input(canary_input, output_dir, frame_name):
    cv2.imwrite(os.path.join(output_dir, frame_name), canary_input)



In [161]:
key_frames_dir = "news_key_frames"
output_dir = "news_canary"
resize_factors = [16, 32, 64, 128, 256]  
dissimilarity_threshold = 0.1  # Adjust as needed

for frame_name in os.listdir(key_frames_dir):
    frame_path = os.path.join(key_frames_dir, frame_name)
    canary_input = generate_canary_input(frame_path, resize_factors, dissimilarity_threshold)
    if canary_input is not None:
        save_canary_input(canary_input, output_dir, frame_name)

In [162]:
def compute_psnr(original_frame, processed_frame, max_pixel_value=255):
    original_frame = np.array(original_frame)
    processed_frame = np.array(processed_frame)
    if original_frame.shape != processed_frame.shape:
        raise ValueError("Original and processed frames must have the same shape.")
    processed_frame = np.clip(processed_frame, 0, max_pixel_value)
    mse = np.mean((original_frame - processed_frame) ** 2)
    if mse == 0:
        psnr_value = float('inf') 
    else:
        psnr_value = 20 * np.log10(max_pixel_value / np.sqrt(mse))
    
    return psnr_value

def apply_approximation(original_frame, approximation_settings):
    original_frame = np.array(original_frame)
    approx_levels = approximation_settings[:2]
    processed_frame = original_frame.copy()
    processed_frame = processed_frame.astype(np.int64)

    for i in range(0, original_frame.shape[0], approx_levels[0]):
        for j in range(0, original_frame.shape[1], approx_levels[1]):
            processed_frame[i:i+approx_levels[0], j:j+approx_levels[1], :] += np.array(approximation_settings[2:], dtype=np.int64)

    processed_frame = processed_frame[:original_frame.shape[0]-approx_levels[0], :original_frame.shape[1]-approx_levels[1], :]
    processed_frame = np.clip(processed_frame, 0, 255).astype(np.uint8)
    
    return processed_frame

import cv2

def search_with_canary_inputs(original_frame, current_setting, canary_input):
    best_setting = current_setting
    current_frame = apply_approximation(original_frame, current_setting)
    canary_input_resized = cv2.resize(canary_input, (current_frame.shape[1], current_frame.shape[0]))
    current_psnr = compute_psnr(current_frame, canary_input_resized)

    while True:
        candidate_settings = generate_candidate_settings(current_setting)
        best_candidate = None
        best_candidate_error = float('inf')
        
        for candidate in candidate_settings:
            candidate_frame = apply_approximation(original_frame, candidate)
            candidate_frame_resized = cv2.resize(candidate_frame, (current_frame.shape[1], current_frame.shape[0]))
            
            candidate_error = compute_psnr(candidate_frame_resized, canary_input_resized)
            
            if candidate_error < best_candidate_error:
                best_candidate = candidate
                best_candidate_error = candidate_error
        
        if best_candidate_error < current_psnr:
            best_setting = best_candidate
            current_psnr = best_candidate_error
            current_setting = best_setting
        else:
            break 
    
    return best_setting, current_psnr

def generate_candidate_settings(current_setting):
    candidate_settings = []
    for i, knob in enumerate(current_setting):
        candidate = list(current_setting)
        candidate[i] += 1
        candidate_settings.append(candidate)
    return candidate_settings

In [163]:
canary_folder = "news_canary"
canary_inputs = []
for filename in os.listdir(canary_folder):
    if filename.endswith(".png") or filename.endswith(".jpg"):
        image_path = os.path.join(canary_folder, filename)
        canary_frame = cv2.imread(image_path)
        canary_inputs.append(canary_frame)

original_folder = "news_key_frames"
original_frames = []
for filename in os.listdir(original_folder):
    if filename.endswith(".png") or filename.endswith(".jpg"):
        image_path = os.path.join(original_folder, filename)
        original_frame = cv2.imread(image_path)
        original_frames.append(original_frame)

best_settings = []
best_psnrs = []
current_setting = [1, 1, 1] 
for original_frame, canary_input in zip(original_frames, canary_inputs):
    best_setting, best_psnr = search_with_canary_inputs(original_frame, current_setting, canary_input)
    best_settings.append(best_setting)
    best_psnrs.append(best_psnr)

for i, (setting, psnr) in enumerate(zip(best_settings, best_psnrs)):
    print(f"Canary Input {i+1}: Best Setting: {setting}, Best PSNR: {psnr}")

In [None]:
def apply_settings_to_video(video_file, list_of_approximation_settings):

    cap = cv2.VideoCapture(video_file)
    if not cap.isOpened():
        raise ValueError("Error opening video file")

    original_video = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        original_video.append(frame)
    
    list_of_processed_videos = []
    for settings in list_of_approximation_settings:
        processed_video = []
        for frame in original_video:
            processed_frame = apply_approximation(frame, settings)
            processed_video.append(processed_frame)
        list_of_processed_videos.append(processed_video)
    
    cap.release()

    return list_of_processed_videos

In [None]:
print((best_settings))
video_file = "/Users/sanjiv/CS521/Final/news_clips/Apocalypse Lukashenko warns Russian nuclear weapons deployed in Belarus_random_clip.mp4"
list_processed_video = apply_settings_to_video(video_file, best_settings)

[[1, 5, 1], [7, 2, 15]]


In [None]:
def compute_video_psnr(processed_video, canary_inputs):
    """
    Compute the PSNR for each processed video with respect to its corresponding canary input.

    Args:
    - processed_video: list of numpy arrays representing the processed videos
    - canary_inputs: list of numpy arrays representing the canary input frames

    Returns:
    - psnr_values: list of PSNR values for each processed video
    """
    psnr_values = []
    for processed_frame, canary_frame in zip(processed_video, canary_inputs):
        processed_frame_resized = cv2.resize(processed_frame, (canary_frame.shape[1], canary_frame.shape[0]))
        psnr = compute_psnr(processed_frame_resized, canary_frame)
        psnr_values.append(psnr)
    return psnr_values

In [None]:
psnr_values_list = []
for processed_video in list_processed_video:
    psnr_values = compute_video_psnr(processed_video, canary_inputs)
    psnr_values_list.append(psnr_values)

best_index = np.argmax([max(psnr_values) for psnr_values in psnr_values_list])
best_processed_video = list_processed_video[best_index]
best_psnr = max(psnr_values_list[best_index])

print(f"Best processed video index: {best_index}")
print(f"Best PSNR: {best_psnr}")

Best processed video index: 0
Best PSNR: 29.064841482021166


In [None]:
def save_best_processed_video_as_mp4(list_of_processed_videos, best_index, output_file, bitrate=1000000, codec='mp4v'):

    best_processed_video = list_of_processed_videos[best_index]
    height, width, _ = best_processed_video[0].shape
    fourcc = cv2.VideoWriter_fourcc(*codec)
    out = cv2.VideoWriter(output_file, fourcc, 30, (width, height), isColor=True)
    out.set(cv2.CAP_PROP_BITRATE, bitrate)

    for frame in best_processed_video:
        out.write(frame)

    out.release()

output_file = "best_processed_video_news.mp4"  
bitrate = 5000  
save_best_processed_video_as_mp4(list_processed_video, best_index, output_file, bitrate=bitrate)