### Global imports

In [8]:
import sys
import os
import subprocess
import struct
import skimage.transform, skimage.color
import numpy as np

### Local imports

In [9]:
project_path = !pwd
project_path = project_path[0]

# cd to the h264-extractor folder
lib_path = os.path.abspath(os.path.join(project_path, 'h264-extractor', 'openh264', 'info_shipout'))
if lib_path not in sys.path:
    sys.path.append(lib_path)

from slice_pb2 import Slice

In [43]:
CACHE_FOLDER_NAME = 'tmp'

class VideoHandler():
    h264_extractor_filename = None

    def __init__(self, video_filename, h264_extractor_filename=None):
        """Video class constructor

        Args:
            video_filename (_type_): absolute path to the video file

        Raises:
            FileNotFoundError: raises error if the video file cannot be found
        """
        if not os.path.exists(video_filename):
            raise FileNotFoundError(f'cannot locate the video file "{video_filename}"')
        # Original file properties
        self.filename = video_filename
        self.path = os.path.dirname(video_filename)
        self.name = os.path.basename(video_filename).split('.')[0]
        self.extension = os.path.basename(video_filename).split('.')[1]

        # Elaborated files properties
        self.h264_filename = None
        self.yuv_filename = None
        self.coded_data_filename = None

        if h264_extractor_filename is not None:
            VideoHandler.set_h264_extractor_filename(h264_extractor_filename)

    def set_h264_extractor_filename(h264_extractor_filename):
        if not os.path.exists(h264_extractor_filename):
            raise FileNotFoundError(f'cannot locate the binary file "{h264_extractor_filename}", was it built?')
        VideoHandler.h264_extractor_filename = h264_extractor_filename

    def _compute_h264(self):
        if VideoHandler.h264_extractor_filename is None:
            raise ValueError('h264 extractor binary path is not set')
        if not os.path.exists(os.path.join(self.path, CACHE_FOLDER_NAME)):
            os.makedirs(os.path.join(self.path, CACHE_FOLDER_NAME))
        h264_filename = os.path.join(self.path, CACHE_FOLDER_NAME, self.name + '.h264')

        # extract h264 from the mp4 file using ffmpeg
        cp = subprocess.run(
                ['ffmpeg', '-y', '-i', self.filename, '-vcodec', 'copy', '-an', 
                '-bsf:v', 'h264_mp4toannexb', h264_filename],
                check=True
            )
        
        if not os.path.exists(h264_filename):
            raise FileNotFoundError(f'cannot locate the h264 file "{h264_filename}", it probably hasn\'t been generated')
        self.h264_filename = h264_filename
        return self
    
    def extract_yuv_and_codes(self):
        self._compute_h264()
        
        # create the cache folder if it doesn't exist
        if not os.path.exists(os.path.join(self.path, CACHE_FOLDER_NAME)):
            os.makedirs(os.path.join(self.path, CACHE_FOLDER_NAME))
        
        # compute the filenames
        self.yuv_filename = os.path.join(self.path, CACHE_FOLDER_NAME, self.name + '.yuv') # YUV frames
        self.coded_data_filename = os.path.join(self.path, CACHE_FOLDER_NAME, self.name + '.msg') # encoding parameters

        # run the extractor to get the yuv and coded data files
        cp = subprocess.run(
                [self.h264_extractor_filename, self.h264_filename, '--yuv_out', self.yuv_filename, '--info_out', self.coded_data_filename, '--n_threads', '0'],
                # for now, only setting threads to 0 is allowed; using other values can result in 
                # unexpected behaviors
                check=True
            )
        
        # remove the h264 file since it's not needed anymore
        os.remove(self.h264_filename)
        if os.path.exists(self.h264_filename):
            print(f'WARNING: could not remove the h264 file "{self.h264_filename}"')

        # raise exception if the files haven't been generated
        if not os.path.exists(self.yuv_filename) or not os.path.exists(self.coded_data_filename):
            raise FileNotFoundError(f'cannot locate the yuv and/or coded data files, they probably haven\'t been generated')
        return self
    
    def clean_cache(self):
        if os.path.exists(os.path.join(self.path, CACHE_FOLDER_NAME)):
            for files in os.listdir(os.path.join(self.path, CACHE_FOLDER_NAME)):
                os.remove(os.path.join(self.path, CACHE_FOLDER_NAME, files))
            os.rmdir(os.path.join(self.path, CACHE_FOLDER_NAME))

# Extract encoding parameters

In [44]:
project_path = !pwd
project_path = project_path[0]

bin_path = os.path.abspath(os.path.join(project_path, 'h264-extractor', 'bin'))
h264_ext_bin = os.path.join(bin_path, 'h264dec_ext_info')
video_filename = os.path.join(project_path, 'bunny.mp4')

video = VideoHandler(video_filename, h264_ext_bin)
video.extract_yuv_and_codes()

ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers
  built with gcc 11 (Ubuntu 11.2.0-19ubuntu1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.22.04.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librabbitmq --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --enable-libvpx --enab

------------------------------------------------------


<__main__.VideoHandler at 0x7f1d4428beb0>

### Read YUV and encoding parameters

In [4]:



def get_ep_file_iterator(filename):
    with open(filename, 'rb') as file:
        file_size = os.stat(filename).st_size
        while file.tell() < file_size:
            length_bytes = file.read(4)
            # Interpret data as little-endian unsigned int to convert from C layer to Python
            length = struct.unpack('<I', length_bytes)[0]
            yield file.read(length)

def get_slice_iterator(filename):
    iterator = get_ep_file_iterator(filename)
    for bytes in iterator:
        slice = Slice()
        slice.ParseFromString(bytes)
        yield slice


def rgb_from_yuv(width, height, frame_num, yuv_filename):
    """Reads an RGB frame from a YUV file.

    Args:
        width (_type_): width of the source yuv video
        height (_type_): height of the source yuv video
        frame_num (_type_): index of the desired frame
        filename (_type_): filename of the yuv file

    Returns:
        _type_: RGB frame
    """
    
    # In YUV420 format, each pixel of the Y (luma) component is represented by 1 byte, while the U and V (chroma) components are subsampled, so each of them is represented by 0.25 bytes. Hence, the total size is (width * height * 1.5).
    frame_size = int(width * height * 1.5)
    
    # Color space conversion constants
    U_MAX = 0.436
    V_MAX = 0.615

    with open(yuv_filename, 'rb') as file:
        # Read the frame at the specified frame number
        file.seek(frame_num * frame_size)
        
        y = np.frombuffer(file.read(width * height), dtype=np.uint8).reshape((height, width))
        u = np.frombuffer(file.read(width * height // 4), dtype=np.uint8).reshape((height // 2, width // 2))
        v = np.frombuffer(file.read(width * height // 4), dtype=np.uint8).reshape((height // 2, width // 2))

        # Rescale subsampled chroma components to the same size as the luma component
        y = skimage.img_as_float32(y)
        u = skimage.transform.rescale(u, 2.0, 1, anti_aliasing=False)
        v = skimage.transform.rescale(v, 2.0, 1, anti_aliasing=False)

        # Color space conversion
        u = (u * 2 * U_MAX) - U_MAX
        v = (v * 2 * V_MAX) - V_MAX

        # Convert to RGB
        yuv = np.dstack([y, u, v])
        rgb = skimage.color.yuv2rgb(yuv)
    return rgb

### Extract GOPs and GOP features

In [6]:
from slice_pb2 import SliceType

def extract_gop(slice_iterator, gop_length: int, width: int = 0, height: int = 0) -> list:
    # TODO: how to crop?
    gop = []

    slice = next(slice_iterator)
    frame_index = 0
    if slice.type != SliceType.I:
        # The first slice is expected to be of type Intra. Find next I slice to start gop
        while(slice.type != SliceType.I):
            try:
                slice = next(slice_iterator)
                frame_index += 1
            except StopIteration:
                raise ValueError('No Intra slice found')

    gop.append((slice, frame_index))

    while len(gop) < gop_length:
        try:
            slice = next(slice_iterator)
            frame_index += 1
            if slice.type == SliceType.I:
                # GOP is over
                break
            else:
                gop.append((slice, frame_index))
        except StopIteration:
            if len(gop) < gop_length:
                raise ValueError(f'Unable to reach desired GOP length of {gop_length}, actual gop length is {len(gop)}')
            else:
                break

    return gop

def extract_gop_features(gop: list, yuv_filename: str = None):
    features = {
        'i_frame': None,
        'diff_frames': [],
        'frame_types': [],
        'mb_types': [],
        'luma_qps': []
    }
    for slice, frame_number in gop:
        features['frame_types'].append(slice.type)
        if slice.type == SliceType.I:
            features['i_frame'] = rgb_from_yuv(slice.width, slice.height, frame_number, yuv_filename)
            # include difference between I frame and itself (zeros)
            features['diff_frames'].append(features['i_frame'] - features['i_frame'])
        else:
            features['diff_frames'].append(rgb_from_yuv(slice.width, slice.height, frame_number, yuv_filename) - features['i_frame']) # abs()?
        for mb in slice.mbs:
            features['mb_types'].append(mb.type)
            features['luma_qps'].append(mb.luma_qp)

    return features

slice_iterator = get_slice_iterator(ep_filename)
get_slice_iterator
GOP_LENGTH = 5

gop = extract_gop(slice_iterator, GOP_LENGTH)
print(f'GOP length: {len(gop)}')

features = extract_gop_features(gop, yuv_filename)

for key in features.keys():
    print(f'{key}: {features[key]}')

GOP length: 5
i_frame: [[[0.15799489 0.21106124 0.15001661]
  [0.16191645 0.21498281 0.15393818]
  [0.16583802 0.21890438 0.15785975]
  ...
  [0.3687092  0.63626048 0.84792309]
  [0.3804739  0.64802518 0.85968779]
  [0.42361116 0.69116243 0.90282504]]

 [[0.193289   0.24635536 0.18531073]
  [0.18936743 0.24243379 0.18138916]
  [0.1815243  0.23459065 0.17354602]
  ...
  [0.37263077 0.64018205 0.85184465]
  [0.40400331 0.67155459 0.8832172 ]
  [0.443219   0.71077028 0.92243289]]

 [[0.27172037 0.32478672 0.2637421 ]
  [0.26387723 0.31694359 0.25589896]
  [0.25211253 0.30517888 0.24413425]
  ...
  [0.42753272 0.695084   0.90674661]
  [0.45106213 0.71861341 0.93027602]
  [0.46282684 0.73037812 0.94204073]]

 ...

 [[0.37444936 0.47650206 0.2106951 ]
  [0.37444936 0.47650206 0.2106951 ]
  [0.37837093 0.48042363 0.21461667]
  ...
  [0.37265025 0.46168546 0.18850711]
  [0.36941597 0.45673905 0.18805997]
  [0.3654944  0.45281749 0.1841384 ]]

 [[0.37444936 0.47650206 0.2106951 ]
  [0.37444936 

In [None]:
def intra_preprocessing(frame):
    #vit1
    pass

def diff_preprocessing(frame):
    #vit1
    pass

def frame_types_preprocessing(frame_types):
    #embedding
    pass

def mb_types_preprocessing(mb_types):
    #embedding
    #vit2
    pass

def luma_qps_preprocessing(luma_qps):
    #vit2
    pass

In [None]:
import matplotlib.pyplot as plt
plt.rcParams['figure.dpi'] = 120
from matplotlib.patches import Rectangle
from matplotlib.lines import Line2D
import numpy as np

plt.imshow(rgb_frame)
plt.title('Frame {} decoded'.format(FRAME_NUM))

In [None]:
# now, visualize the encoding parameters

# first we need to get the size of the macroblocks from the enums

import re
from slice_pb2 import MacroblockType

# parse the size of macroblocks from the enum names
mb_size_regex = re.compile('([0-9]{1,2})x([0-9]{1,2})')
mb_size_dict = dict()
# Iterate through all the enum entries to build a dictionary of macroblock sizes
for key, val in MacroblockType.items():
    search_result = mb_size_regex.search(key)
    if search_result is not None:
        mb_size_x = int(search_result.group(1))
        mb_size_y = int(search_result.group(2))
        mb_size_dict[key] = (mb_size_x, mb_size_y)
    else:
        mb_size_dict[key] = (16, 16)

# show some entires
for key in list(mb_size_dict.keys())[:10]:
    print(f'{key}={mb_size_dict[key]}')

In [None]:
# now, begin the visualization
plt.imshow(rgb_frame)
ax = plt.gca()

mb_color_cycle = plt.get_cmap('Set1')
mb_alpha = 0.5
mb_colors = np.asarray([mb_color_cycle(i) for i in range(5)])
mb_colors[:, 3] = mb_alpha
mb_labels = ['INTRA', 'DIRECT_SKIP', 'SKIP', 'DIRECT', 'INTER']
line_color = 'black'
line_width = 0.2

for mb in slice.mbs:
    mb_type = MacroblockType.Name(mb.type)
    mb_size = mb_size_dict[mb_type]

    mb_label_index = None

    # determine the color
    if 'INTRA' in mb_type:
        mb_label_index = 0
    elif 'SKIP' in mb_type and 'DIRECT' in mb_type:
        mb_label_index = 1
    elif 'SKIP' in mb_type:
        mb_label_index = 2
    elif 'DIRECT' in mb_type:
        mb_label_index = 3
    else:
        mb_label_index = 4

    color = mb_colors[mb_label_index]
    
    # compute the lower left corner of the macroblock
    mb_x = mb.x * 16
    mb_y = mb.y * 16

    patches = []

    if mb_size == (16, 16):
        patches.append(Rectangle((mb_x, mb_y), 16, 16, 
            facecolor=color, edgecolor=line_color, linewidth=line_width))
    elif mb_size == (8, 16):
        patches.append(Rectangle((mb_x, mb_y), 8, 16, 
            facecolor=color, edgecolor=line_color, linewidth=line_width))
        patches.append(Rectangle((mb_x + 8, mb_y), 8, 16, 
            facecolor=color, edgecolor=line_color, linewidth=line_width))
    elif mb_size == (16, 8):
        patches.append(Rectangle((mb_x, mb_y), 16, 8, 
            facecolor=color, edgecolor=line_color, linewidth=line_width))
        patches.append(Rectangle((mb_x, mb_y+8), 16, 8, 
            facecolor=color, edgecolor=line_color, linewidth=line_width))
    elif mb_size == (8, 8):
        for i in range(2):
            for j in range(2):
                patches.append(Rectangle((mb_x+i*8, mb_y+j*8), 8, 8, 
                    facecolor=color, edgecolor=line_color, linewidth=line_width))
    elif mb_size == (4,4):
        for i in range(4):
            for j in range(4):
                patches.append(Rectangle((mb_x+i*4, mb_y+j*4), 4, 4, 
                    facecolor=color, edgecolor=line_color, linewidth=line_width))
    else:
        raise ValueError(f'unsupported macroblock size {mb_size}')

    for patch in patches:
        ax.add_patch(patch)

# generate the legend
custom_legends = [Line2D([0], [0], color=x, lw=3) for x in mb_colors]
plt.legend(custom_legends, mb_labels, bbox_to_anchor=(1.1, 1.05))
plt.title('Macroblock Partition and Type')