### COURSE WORK

The purpose of this work is to create a tool that will speed up the processing of military video reports before publication on the network, namely, the anonymization of faces, voice changes, and watermarking.
In general, we have accumulated a bunch of typical videos that are waiting for processing and anonymization, the videos are usually very similar to each other and represent the appearance of one person or a group showing something and saying a small text. Sometimes there are dynamic shots, but mostly it is a more static video. On the downside, almost all videos have very poor image quality.

Libraries used
cv2 and dlib are for processing and handling images/videos. The face_recognition and mtcnn libraries will be used for detecting faces in the video frames. librosa and soundfile are used for audio manipulation, and moviepy.editor is required for handling the video file and its audio track. These libraries provide the functions and methods necessary to fulfill the objectives of the code.

All work can be divided into two parts - processing of the video itself (pixelation of faces and watermarking) and sound processing.

Video processing, initially we use a detector to find faces in the frame, for this we have a choice between two detectors (face_recognition or MTCNN), the first one we can use for videos with good quality, it works quite fast, but it does not have the same recognition clarity as the MTCNN that we use for videos with a poor image or faces that are partially covered or in shadow. But using MTCNN, we will get an increase in processing time along with the accuracy of the detector.

In order to be sure that all faces are found, we can specify the expected number of faces in the video, and the detector will trigger in each frame until we find the expected number.
We can also forcefully specify that the dector should trigger every N frames or if the picture in the frame has undergone major changes compared to the previous one (for example, a sudden change of frame or the appearance of something new in the frame) - for this we use Mean Squared Error (MSE) is a popular method in the field of image processing and computer vision for quantifying the difference between two images. In this specific context, it is used to determine how much content has changed between two frames of a video.

After we have detected the necessary faces, we initialize the trackers. We pixelate each region of detected face in an image and add a watermark to the frame if necessary.
After processing the entire video, we save it in the temp folder.

Since OpenCV does not support audio and we need not only to get the sound, but also to change the voices on it to anonymize them, we use moviepy editor in order to take the sound from the original video and with the librosa pitch_shift effect, we shift the pitch of the audio and after add this sound to the new processed video file.

In [11]:
# Import necessary libraries
import cv2
import numpy as np
from matplotlib import pyplot as plt
import dlib
import face_recognition
from tqdm import tqdm
from mtcnn.mtcnn import MTCNN
import librosa
import soundfile as sf
from moviepy.editor import VideoFileClip, AudioFileClip

# Parameters for figure size
plt.rcParams['figure.figsize'] = [15, 10]

In [2]:
# Define constants
VIDEO_SOURCE = 'data/video_1.mp4' # the path to the original video file to process
WATERMARK_SOURCE = 'data/watermark.png'  # the path to watermark png file (set None for no watermark)
DETECTOR = 'mtcnn_detector'  # Detector type (face_recognition, mtcnn_detector)
THRESHOLD = 0  # Threshold value for the Mean Squared Error (set to 0 for disabling this feature)
MAX_FRAMES = 0  # Number of frames to wait before re-detecting faces (set to 0 for disabling this feature) 
MIN_FACES = 2  # Minimum number of detecting faces (set to 0 for disabling this feature)

In [3]:
# Initialize MTCNN for face detection
if DETECTOR == 'mtcnn_detector':
    mtcnn_detector = MTCNN()

In [4]:
# Open video file
video = cv2.VideoCapture(VIDEO_SOURCE)

# Obtain properties of the video
frame_rate = video.get(cv2.CAP_PROP_FPS)
frame_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

# Create a VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('temp/output.avi', fourcc, frame_rate, (frame_width, frame_height))

In [5]:
# Check if the WATERMARK_SOURCE is not None to proceed with adding a watermark to the video
if WATERMARK_SOURCE:
    # Load the watermark image with the alpha channel (-1 indicates loading the image with alpha channel)
    watermark = cv2.imread(WATERMARK_SOURCE, -1)
    # Resize the watermark image to a desirable size; here we are resizing it to 100x100 pixels
    watermark = cv2.resize(watermark, (100, 100))
    
    # Get the dimensions of the watermark image
    (wH, wW) = watermark.shape[:2]
    # Split the watermark image into four channels: Blue, Green, Red, and Alpha (transparency)
    # using the cv2.split() function
    (wB, wG, wR, wA) = cv2.split(watermark)
    # Merge back the Blue, Green, and Red channels excluding the Alpha channel
    watermark_rgb = cv2.merge([wB, wG, wR])
    # Create a binary mask of the alpha channel by thresholding it
    _, alpha_mask = cv2.threshold(wA, 0, 255, cv2.THRESH_BINARY)
    # Convert this grayscale alpha_mask to a three-channel image 
    watermark_mask = cv2.cvtColor(alpha_mask, cv2.COLOR_GRAY2BGR)
    # Obtain the inverse of the watermark_mask
    # This will be used to black-out the area where the watermark will be placed in the input images
    watermark_mask_inv = cv2.bitwise_not(watermark_mask)

In [6]:
# Function to calculate Mean Squared Error between two images
def mse(imageA, imageB):
    # Subtract imageB from imageA, square the difference,
    # and then summation of the squares is calculated 
    err = np.sum((imageA.astype("float") - imageB.astype("float")) ** 2)

    # The summation calculated earlier is then divided by the total number of
    # pixels in the image to get the Mean Squared Error (MSE)
    # This is equivalent to averaging all squared differences
    err /= float(imageA.shape[0] * imageA.shape[1] * imageA.shape[2])

    # Return the MSE
    return err

In [7]:
# Function to pixelate region in an image
def pixelate_region(image, top_left, bottom_right, pix_size):
    # Get the coordinates
    x1, y1 = top_left
    x2, y2 = bottom_right

    # Break the region into small square slices (each of size pix_size x pix_size)
    for y in range(y1, y2, pix_size):
        for x in range(x1, x2, pix_size):
            # For each slice calculate the average colour
            # The size check is to ignore the pixels where size < `pix_size`x`pix_size` at the borders (if any)
            if image[y:y + pix_size, x:x + pix_size].size > 0:
                # Calculate the average colour of a `pix_size`x`pix_size` block
                avg_color = np.mean(np.mean(image[y:y + pix_size, x:x + pix_size], axis=0), axis=0)
            else:
                continue # If size is smaller than a block, it is ignored

            # Replace the original pixels at this block with the average colour, effectively pixelating it
            image[y:y + pix_size, x:x + pix_size] = avg_color

    # Return the pixelated image
    return image

In [8]:
# Function to detect faces and initialize trackers
def detect_faces_and_initialize_trackers(detector, frame, previous_frame=None, threshold=0):
    # Use MTCNN detector if selected
    if detector == 'mtcnn_detector':
        # Perform face detection using MTCNN
        face_locations = mtcnn_detector.detect_faces(frame)
        # Initialization of Correlation trackers for each detected face
        trackers = [dlib.correlation_tracker() for _ in range(len(face_locations))]
        # For each detected face, setup the tracker
        for face, tracker in zip(face_locations, trackers):
            # Extract bounding box parameters from the current face
            (x, y, w, h) = face['box']
            # Define a Dlib rectangle object from the bounding box parameters
            rect = dlib.rectangle(x, y, x + w, y + h)
            # Start the tracker on the current face
            tracker.start_track(frame, rect)

    # Use Face Recognition detector if it is selected 
    elif detector == 'face_recognition':
        # Use a CNN model for face detection in the initial frame 
        # or if the Mean Squared Error with the previous frame exceeds the threshold
        if initial_frame or (previous_frame is not None and threshold > 0 and mse(frame, previous_frame) > threshold):
            face_locations = face_recognition.face_locations(frame, model="cnn")  # use CNN model here
        else:
            # Use the default face_recognition face detector
            face_locations = face_recognition.face_locations(frame)

        # Initialize correlation trackers
        trackers = [dlib.correlation_tracker() for _ in range(len(face_locations))]
        # For each detected face, setup the tracker
        for face, tracker in zip(face_locations, trackers):
            # Convert the bounding box parameters to Dlib rectangle
            top, right, bottom, left = face
            rect = dlib.rectangle(left, top, right, bottom)
            # Start the tracker on the face
            tracker.start_track(frame, rect)

    # Return the face locations and initialized trackers
    return face_locations, trackers

In [9]:
# Initialize variables
initial_frame = True # Boolean flag to indicate if the current frame is the initial frame
previous_frame = None # Stores the previous frame of the video
iteration = 0 # Counter to keep track of the current frame number
trackers = [] # List to store tracker objects for each detected face

# A lambda function to determine whether face detection should be performed on the current frame.
# The face detection will be executed if any of the following conditions is true:
# 1. It's the initial frame.
# 2. Current frame number is a multiple of MAX_FRAMES.
# 3. The number of face trackers is less than MIN_FACES.
# 4. Mean Squared Error between the current and previous frame exceeds the THRESHOLD.
tracker_condition = lambda: (initial_frame or
                             MAX_FRAMES > 0 and iteration % MAX_FRAMES == 0 or
                             MIN_FACES > 0 and len(trackers) < MIN_FACES or
                             (previous_frame is not None and THRESHOLD > 0 and mse(frame, previous_frame) > THRESHOLD))

# Iterate over frames and process each one
for _ in tqdm(range(total_frames)):
    # Read a frame from the video.
    ok, frame = video.read()
    
    # If video frame is not read successfully, then we reach the end, break the loop.
    if not ok:
        break

    # If the condition defined in tracker_condition is True, detect faces and update trackers
    if tracker_condition():
        face_locations, trackers = detect_faces_and_initialize_trackers(DETECTOR, frame, previous_frame, THRESHOLD)
        initial_frame = False

    # Store current frame as the previous frame for the next iteration
    previous_frame = frame.copy()
    
    # Update each tracker and pixelate each detected face in the current frame
    for tracker in trackers:
        tracker.update(frame)
        
        # Obtain the position of the tracked object
        pos = tracker.get_position()
        
        # Convert the position data to integer format
        x = int(pos.left())
        y = int(pos.top())
        w = int(pos.width())
        h = int(pos.height())

        # Pixelate the face region in the frame
        frame = pixelate_region(frame, (x, y), (x + w, y + h), 8)

    # If WATERMARK_SOURCE is not None, apply the watermark to the frame
    if WATERMARK_SOURCE:
        # Define the region of interest for the watermark image in the original image
        roi = frame[0:wH, 0:wW]
    
        # Black-out the area behind the logo in our original ROI
        img1_bg = cv2.bitwise_and(roi, watermark_mask_inv)
    
        # Mask out the watermark from its image
        img2_fg = cv2.bitwise_and(watermark_rgb, watermark_mask)
    
        # Merge these two to create the final watermark
        dst = cv2.add(img1_bg, img2_fg)
    
        # And place the result back into the original image roi
        frame[0:wH, 0:wW] = dst

    # Increment the frame counter
    iteration += 1
    
    # Write the frame to the output video file.
    out.write(frame)

# When everything is done, release the captures
out.release()
video.release()

  0%|          | 0/268 [00:00<?, ?it/s]

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 147ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 88ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 45ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 167ms/step


100%|██████████| 268/268 [00:06<00:00, 41.13it/s]


In [10]:
# Manipulate audio in the original video to produce a pitch-shifted version

# Extract and modify audio from original video
# Load the source video file using moviepy's VideoFileClip
clip = VideoFileClip(VIDEO_SOURCE)
# Save the audio from the source video to a temporary MP3 file using moviepy's write_audiofile
clip.audio.write_audiofile("temp/temp_audio.mp3")
# Use librosa to load the temporary audio file, extracting the audio time series and sampling rate
y, sr = librosa.load('temp/temp_audio.mp3')
# Shift the pitch of the audio using librosa. -4 semitones is equivalent to decreasing the pitch by two tones
y_shifted = librosa.effects.pitch_shift(y, sr=sr, n_steps=-4, bins_per_octave=12)  # shift pitch
# Write the modified audio to a WAV file using the soundfile library. WAV was chosen as it is lossless 
# and widely accepted by most audio and video applications
sf.write('temp/temp_shifted_audio.wav', y_shifted, sr)

# Combine the modified audio with the original video
# Load the modified audio file
shifted_audio = AudioFileClip('temp/temp_shifted_audio.wav')
# Load the original video file without audio
videoclip = VideoFileClip('temp/output.avi')
# Replace the original video's audio with the modified audio
videoclip.audio = shifted_audio
# Write the final result to an output video file
videoclip.write_videofile("result/output.mp4")

MoviePy - Writing audio in temp/temp_audio.mp3


                                                                    

MoviePy - Done.




Moviepy - Building video result/output.mp4.
MoviePy - Writing audio in outputTEMP_MPY_wvf_snd.mp3


                                                                    

MoviePy - Done.
Moviepy - Writing video result/output.mp4



                                                               

Moviepy - Done !
Moviepy - video ready result/output.mp4
