<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc" style="margin-top: 1em;"><ul class="toc-item"><li><span><a href="#Write-out-the-video-table" data-toc-modified-id="Write-out-the-video-table-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Write out the video table</a></span></li><li><span><a href="#Write-out-interval-sets" data-toc-modified-id="Write-out-interval-sets-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Write out interval sets</a></span><ul class="toc-item"><li><span><a href="#Commercials" data-toc-modified-id="Commercials-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>Commercials</a></span></li><li><span><a href="#Faces" data-toc-modified-id="Faces-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>Faces</a></span></li><li><span><a href="#Identity" data-toc-modified-id="Identity-2.3"><span class="toc-item-num">2.3&nbsp;&nbsp;</span>Identity</a></span></li><li><span><a href="#Our-labels" data-toc-modified-id="Our-labels-2.4"><span class="toc-item-num">2.4&nbsp;&nbsp;</span>Our labels</a></span></li></ul></li></ul></div>

In [13]:
import pyspark.sql.functions as func
from pyspark.sql.window import Window
import os
from collections import defaultdict
import random
import pickle
import json
from pytz import timezone
from pathlib import Path
from tqdm import tqdm

from django.db.models import F, ExpressionWrapper, FloatField, IntegerField

from esper.spark_util import *
from esper.major_canonical_shows import MAJOR_CANONICAL_SHOWS

WIDGET_DATA_DIR = '/app/data/widget-data'
if not os.path.exists(WIDGET_DATA_DIR):
    os.makedirs(WIDGET_DATA_DIR)

# Write out the video table

In [None]:
def get_video_name(p):
    return Path(p).name.split('.')[0]

UTC = timezone('UTC')
EST = timezone('EST')
DATE_FORMAT = '%Y-%m-%d'
def get_date_minute_from_name(p):
    channel, ymd, hms = p.split('_', 3)[:3]
    timestamp = datetime.datetime.strptime(ymd + hms, '%Y%m%d%H%M%S')
    timestamp_est = timestamp.replace(tzinfo=UTC).astimezone(tz=EST)
    assert timestamp.hour != timestamp_est.hour
    return timestamp_est.strftime(DATE_FORMAT), timestamp_est.hour * 60 + timestamp_est.minute

video_data = []
for v in Video.objects.filter(
    duplicate=False, corrupted=False,
).values(
    'id', 'path', 'show__canonical_show__name', 'channel__name', 'num_frames', 'fps', 'width', 'height'
):
    video_name = get_video_name(v['path'])
    video_date, video_minute = get_date_minute_from_name(video_name)
    video_data.append((
        v['id'],
        video_name,
        v['show__canonical_show__name'],
        v['channel__name'],
        video_date,
        video_minute,
        v['num_frames'],
        v['fps'],
        v['width'],
        v['height']
    ))
                      
VIDEO_DATA_PATH = os.path.join(WIDGET_DATA_DIR, 'videos.json')
with open(VIDEO_DATA_PATH, 'w') as f:
    json.dump(video_data, f)

print('Done!')

# Write out interval sets

In [14]:
from typing import List, Tuple, Optional, BinaryIO


class IntervalSetMappingWriter(object):

    def __init__(self, path: str):
        self._fp = open(path, 'wb')
        self._path = path

    def __enter__(self) -> 'IntervalSetMappingWriter':
        return self

    def __exit__(self, type, value, tb):
        self.close()

    def __fmt_u32(self, v: int) -> bytes:
        return v.to_bytes(4, byteorder='little')

    def write(self, id_: int, intervals: List[Tuple[int, int]]) -> None:
        assert self._fp is not None
        self._fp.write(self.__fmt_u32(id_))
        self._fp.write(self.__fmt_u32(len(intervals)))
        for a, b in intervals:
            assert b > a, 'invalid interval: ({}, {})'.format(a, b)
            self._fp.write(self.__fmt_u32(a))
            self._fp.write(self.__fmt_u32(b))

    def close(self) -> None:
        if self._fp is not None:
            self._fp.close()
            self._fp = None

    
class IntervalAccumulator(object):
    
    def __init__(self, fuzz=250):
        self._intervals = None
        self._fuzz = fuzz
        
    def add(self, start, end):
        assert start <= end
        if not self._intervals:
            self._intervals = [(start, end)]
        else:
            last_int = self._intervals[-1]
            if start > last_int[1] + self._fuzz:
                self._intervals.append((start, end))
            elif end > last_int[1]:
                assert start >= last_int[0]
                assert last_int[0] <= end
                self._intervals[-1] = (last_int[0], end)
    
    def get(self):
        return self._intervals
    

class IntervalListMappingWriter(object):

    def __init__(self, path: str, payload_len: int):
        self._fp = open(path, 'wb')
        self._path = path
        self._payload_len = payload_len

    def __enter__(self) -> 'IntervalListMappingWriter':
        return self

    def __exit__(self, type, value, tb) -> None:
        self.close()

    def __fmt_u32(self, v: int) -> bytes:
        return v.to_bytes(4, byteorder='little')

    def __fmt_payload(self, v: int) -> bytes:
        return v.to_bytes(self._payload_len, byteorder='little')

    def write(self, id_: int, intervals: List[Tuple[int, int, int]]) -> None:
        assert self._fp is not None
        self._fp.write(self.__fmt_u32(id_))
        self._fp.write(self.__fmt_u32(len(intervals)))
        for a, b, c in intervals:
            assert b > a, 'invalid interval: ({}, {})'.format(a, b)
            self._fp.write(self.__fmt_u32(a))
            self._fp.write(self.__fmt_u32(b))
            self._fp.write(self.__fmt_payload(c))

    def close(self) -> None:
        if self._fp is not None:
            self._fp.close()
            self._fp = None


def encode_payload(is_male, is_host, height):
    ret = 0
    if is_male:
        ret |= 1
    if is_host:
        ret |= 1 << 1
    ret |= height << 2
    return ret

## Commercials

In [None]:
COMMERCIAL_INTERVAL_FILE = '/app/data/widget-data/commercials.iset.bin'

commercials_by_video_id = defaultdict(list) 
for c in Commercial.objects.filter(
    labeler__name='haotian-commercials', 
    video__duplicate=False, video__corrupted=False
).annotate(
    start_ms=ExpressionWrapper(F('min_frame') / F('video__fps') * 1000, output_field=IntegerField()),
    end_ms=ExpressionWrapper(F('max_frame') / F('video__fps') * 1000, output_field=IntegerField())
).values('video__id', 'start_ms', 'end_ms'):
    commercials_by_video_id[c['video__id']].append((c['start_ms'], c['end_ms']))

with IntervalSetMappingWriter(
    os.path.join(COMMERCIAL_INTERVAL_FILE, COMMERCIAL_INTERVAL_FILE)
) as COMM_INTS:
    for video_id in sorted(commercials_by_video_id.keys()):
        COMM_INTS.write(video_id, list(sorted(commercials_by_video_id[video_id])))
print('Done!')

## Faces

In [15]:
face_genders = get_face_genders()
face_genders = face_genders.where(
    (face_genders.labeler_id == Labeler.objects.get(name='knn-gender').id)
)
face_genders = face_genders.withColumn(
    'start_time', face_genders.min_frame / face_genders.fps)
face_genders = face_genders.withColumn(
    'end_time', face_genders.max_frame / face_genders.fps)

In [17]:
FACE_INTERVAL_DIR = '/app/data/widget-data/face'
if not os.path.exists(FACE_INTERVAL_DIR):
    os.makedirs(FACE_INTERVAL_DIR)

face_genders_int = face_genders
face_genders_int = face_genders_int.withColumn(
    'start_ms', (face_genders_int.start_time * 1000).cast('int'))
face_genders_int = face_genders_int.withColumn(
    'end_ms', (face_genders_int.end_time * 1000).cast('int'))
    
# DEBUG
# face_genders_int = face_genders_int.where(face_genders_int.video_id < 10)
    
with IntervalListMappingWriter(
    os.path.join(WIDGET_DATA_DIR, 'faces.ilist.bin'), 1
) as ALL_FACES:
    fg_query = face_genders_int.select(
        'video_id', 'start_ms', 'end_ms', 'male_probability', 
        'host_probability', 'height'
    ).sort('video_id', 'start_ms', 'end_ms')

    n_videos_done = 0
    curr_video_id = None
    for fg in fg_query.collect():
        if fg.video_id != curr_video_id:
            if curr_video_id is not None:
                n_videos_done += 1
                if n_videos_done % 1000 == 0:
                    print('Processed {} videos'.format(n_videos_done))
                if curr_video_faces:
                    ALL_FACES.write(curr_video_id, curr_video_faces)
            
            curr_video_id = fg.video_id
            curr_video_faces = []

        curr_video_faces.append(
            (fg.start_ms, fg.end_ms, 
             encode_payload(
                 fg.male_probability >= 0.5, 
                 fg.host_probability >= 0.5,
                 min(round(fg.height * 100), 63)  # 6-bits
             ))
        )
                
    if curr_video_id is not None:
        if curr_video_faces:
            ALL_FACES.write(curr_video_id, curr_video_faces)
print('Done!')

Processed 1000 videos
Processed 2000 videos
Processed 3000 videos
Processed 4000 videos
Processed 5000 videos
Processed 6000 videos
Processed 7000 videos
Processed 8000 videos
Processed 9000 videos
Processed 10000 videos
Processed 11000 videos
Processed 12000 videos
Processed 13000 videos
Processed 14000 videos
Processed 15000 videos
Processed 16000 videos
Processed 17000 videos
Processed 18000 videos
Processed 19000 videos
Processed 20000 videos
Processed 21000 videos
Processed 22000 videos
Processed 23000 videos
Processed 24000 videos
Processed 25000 videos
Processed 26000 videos
Processed 27000 videos
Processed 28000 videos
Processed 29000 videos
Processed 30000 videos
Processed 31000 videos
Processed 32000 videos
Processed 33000 videos
Processed 34000 videos
Processed 35000 videos
Processed 36000 videos
Processed 37000 videos
Processed 38000 videos
Processed 39000 videos
Processed 40000 videos
Processed 41000 videos
Processed 42000 videos
Processed 43000 videos
Processed 44000 vide

## Identity

In [18]:
face_identities = get_face_identities()

face_genders_basic = spark.load('query_facegender')
face_genders_basic = face_genders_basic.where(
    face_genders_basic.labeler_id == Labeler.objects.get(name='knn-gender').id)

# identity_labeler_qs = (
#     Labeler.objects.filter(name__startswith='face-identity:') |
#     Labeler.objects.filter(name__startswith='face-identity-converted:') |
#     Labeler.objects.filter(name__startswith='face-identity-uncommon:')
# )
identity_labeler_qs = (
    Labeler.objects.filter(name='face-identity-rekognition') |
    Labeler.objects.filter(name='face-identity-rekognition:augmented-l2-dist=0.7')
)
identity_labeler_ids = [x.id for x in identity_labeler_qs]

face_identities = face_identities.where(
    face_identities.labeler_id.isin(identity_labeler_ids))

face_identities = face_identities.join(
    face_genders_basic.select('face_id', 'gender_id'), 
    face_identities.face_id == face_genders_basic.face_id, 'left_outer'
).select(
    *[c if c != 'face_id' else 'face_identities.face_id' 
      for c in face_identities.columns],
    face_genders_basic.gender_id
)

face_identities = face_identities.withColumn(
    'start_time', face_identities.min_frame / face_identities.fps)
face_identities = face_identities.withColumn(
    'end_time', face_identities.max_frame / face_identities.fps)

face_identities_int = face_identities.where(face_identities.probability >= 0.5)
face_identities_int = face_identities_int.withColumn(
    'start_ms', (face_identities_int.start_time * 1000).cast('int'))
face_identities_int = face_identities_int.withColumn(
    'end_ms', (face_identities_int.end_time * 1000).cast('int'))

In [19]:
MIN_SCREENTIME_THRESHOLD = 600 # 10 minutes

selected_identity_ids = []
face_identities_counts = face_identities.groupby(
    face_identities.identity_id
).agg(
    func.count(face_identities.face_id).alias('face_count')
)
face_identities_counts.show()
for fi_count in face_identities_counts.where(
    face_identities_counts.face_count >= MIN_SCREENTIME_THRESHOLD * 3
).collect():
    selected_identity_ids.append(fi_count['identity_id'])
print('Exporting intervals for {} identities'.format(len(selected_identity_ids)))

+-----------+----------+
|identity_id|face_count|
+-----------+----------+
|       1238|    101525|
|      16861|       493|
|       4935|     28931|
|      29719|       939|
|      28836|      8068|
|        833|     44446|
|      29601|       383|
|       5518|      2846|
|      38868|       450|
|       7993|       330|
|       7982|       284|
|      40383|       253|
|       1591|     10816|
|       2366|        69|
|      41751|      1706|
|      43527|      3347|
|      27484|       996|
|       1088|     38736|
|       9427|       216|
|      99621|        67|
+-----------+----------+
only showing top 20 rows

Exporting intervals for 5572 identities


In [20]:
IDENTITY_INTERVAL_DIR = '/app/data/widget-data/aws-smoothed-identity'
if not os.path.exists(IDENTITY_INTERVAL_DIR):
    os.makedirs(IDENTITY_INTERVAL_DIR)

fi_query = face_identities_int.where(
    face_identities_int.identity_id.isin(selected_identity_ids)
).select(
    'video_id', 'identity_id', 'start_ms', 'end_ms', 
    'host_probability', 'gender_id', 'height'
).sort('video_id', 'identity_id', 'start_ms', 'end_ms')

identity_ilist_writers = {}
identity_id_to_name = {i.id : i.name.lower() for i in Identity.objects.all()}
MALE_GENDER_ID = Gender.objects.get(name='M').id 
def flush_identity_accumulators(video_id, ilist_accumulators):
    for identity_id, face_ilist in ilist_accumulators.items():
        if face_ilist:
            if identity_id not in identity_ilist_writers:
                identity_ilist_writers[identity_id] = IntervalListMappingWriter(
                    os.path.join(
                        IDENTITY_INTERVAL_DIR, 
                        '{}.ilist.bin'.format(identity_id_to_name[identity_id])
                    ), 1)
            identity_ilist_writers[identity_id].write(video_id, face_ilist)

n_videos_done = 0
curr_video_id = None
for fi in fi_query.collect():
    if fi.video_id != curr_video_id:
        if curr_video_id is not None:
            n_videos_done += 1
            if n_videos_done % 1000 == 0:
                print('Processed {} videos'.format(n_videos_done))
            flush_identity_accumulators(
                curr_video_id, curr_ilist_accumulators)
                    
        curr_video_id = fi.video_id
        curr_ilist_accumulators = defaultdict(list)
    curr_ilist_accumulators[fi.identity_id].append(
        (fi.start_ms, fi.end_ms, 
         encode_payload(
             fi.gender_id == MALE_GENDER_ID, 
             fi.host_probability >= 0.5,
             min(round(fi.height * 100), 63)  # 6-bits
         ))
    )
    
if curr_video_id is not None:
    flush_identity_accumulators(curr_video_id, curr_ilist_accumulators)
            
for iw in identity_ilist_writers.values():
    iw.close()
del identity_ilist_writers
print('Done!')

Processed 1000 videos
Processed 2000 videos
Processed 3000 videos
Processed 4000 videos
Processed 5000 videos
Processed 6000 videos
Processed 7000 videos
Processed 8000 videos
Processed 9000 videos
Processed 10000 videos
Processed 11000 videos
Processed 12000 videos
Processed 13000 videos
Processed 14000 videos
Processed 15000 videos
Processed 16000 videos
Processed 17000 videos
Processed 18000 videos
Processed 19000 videos
Processed 20000 videos
Processed 21000 videos
Processed 22000 videos
Processed 23000 videos
Processed 24000 videos
Processed 25000 videos
Processed 26000 videos
Processed 27000 videos
Processed 28000 videos
Processed 29000 videos
Processed 30000 videos
Processed 31000 videos
Processed 32000 videos
Processed 33000 videos
Processed 34000 videos
Processed 35000 videos
Processed 36000 videos
Processed 37000 videos
Processed 38000 videos
Processed 39000 videos
Processed 40000 videos
Processed 41000 videos
Processed 42000 videos
Processed 43000 videos
Processed 44000 vide

## Our labels

In [21]:
face_identities = get_face_identities()

face_genders_basic = spark.load('query_facegender')
face_genders_basic = face_genders_basic.where(
    face_genders_basic.labeler_id == Labeler.objects.get(name='knn-gender').id)

identity_labeler_qs = (
    Labeler.objects.filter(name__startswith='face-identity:') |
    Labeler.objects.filter(name__startswith='face-identity-converted:') |
    Labeler.objects.filter(name__startswith='face-identity-uncommon:')
)
# identity_labeler_qs = (
#     Labeler.objects.filter(name='face-identity-rekognition') |
#     Labeler.objects.filter(name='face-identity-rekognition:augmented-l2-dist=0.7')
# )
identity_labeler_ids = [x.id for x in identity_labeler_qs]

face_identities = face_identities.where(
    face_identities.labeler_id.isin(identity_labeler_ids))

face_identities = face_identities.join(
    face_genders_basic.select('face_id', 'gender_id'), 
    face_identities.face_id == face_genders_basic.face_id, 'left_outer'
).select(
    *[c if c != 'face_id' else 'face_identities.face_id' 
      for c in face_identities.columns],
    face_genders_basic.gender_id
)

face_identities = face_identities.withColumn(
    'start_time', face_identities.min_frame / face_identities.fps)
face_identities = face_identities.withColumn(
    'end_time', face_identities.max_frame / face_identities.fps)

face_identities_int = face_identities.where(face_identities.probability >= 0.5)
face_identities_int = face_identities_int.withColumn(
    'start_ms', (face_identities_int.start_time * 1000).cast('int'))
face_identities_int = face_identities_int.withColumn(
    'end_ms', (face_identities_int.end_time * 1000).cast('int'))

In [22]:
IDENTITY_INTERVAL_DIR = '/app/data/widget-data/our-identity'
if not os.path.exists(IDENTITY_INTERVAL_DIR):
    os.makedirs(IDENTITY_INTERVAL_DIR)

fi_query = face_identities_int.where(
    face_identities_int.identity_id.isin(selected_identity_ids)
).select(
    'video_id', 'identity_id', 'start_ms', 'end_ms', 
    'host_probability', 'gender_id', 'height'
).sort('video_id', 'identity_id', 'start_ms', 'end_ms')

identity_ilist_writers = {}
identity_id_to_name = {i.id : i.name.lower() for i in Identity.objects.all()}
MALE_GENDER_ID = Gender.objects.get(name='M').id 
def flush_identity_accumulators(video_id, ilist_accumulators):
    for identity_id, face_ilist in ilist_accumulators.items():
        if face_ilist:
            if identity_id not in identity_ilist_writers:
                identity_ilist_writers[identity_id] = IntervalListMappingWriter(
                    os.path.join(
                        IDENTITY_INTERVAL_DIR, 
                        '{}.ilist.bin'.format(identity_id_to_name[identity_id])
                    ), 1)
            identity_ilist_writers[identity_id].write(video_id, face_ilist)

n_videos_done = 0
curr_video_id = None
for fi in fi_query.collect():
    if fi.video_id != curr_video_id:
        if curr_video_id is not None:
            n_videos_done += 1
            if n_videos_done % 1000 == 0:
                print('Processed {} videos'.format(n_videos_done))
            flush_identity_accumulators(
                curr_video_id, curr_ilist_accumulators)
                    
        curr_video_id = fi.video_id
        curr_ilist_accumulators = defaultdict(list)
    curr_ilist_accumulators[fi.identity_id].append(
        (fi.start_ms, fi.end_ms, 
         encode_payload(
             fi.gender_id == MALE_GENDER_ID, 
             fi.host_probability >= 0.5,
             min(round(fi.height * 100), 63)
         ))
    )
    
if curr_video_id is not None:
    flush_identity_accumulators(curr_video_id, curr_ilist_accumulators)
            
for iw in identity_ilist_writers.values():
    iw.close()
del identity_ilist_writers
print('Done!')

Processed 1000 videos
Processed 2000 videos
Processed 3000 videos
Processed 4000 videos
Processed 5000 videos
Processed 6000 videos
Processed 7000 videos
Processed 8000 videos
Processed 9000 videos
Processed 10000 videos
Processed 11000 videos
Processed 12000 videos
Processed 13000 videos
Processed 14000 videos
Processed 15000 videos
Processed 16000 videos
Processed 17000 videos
Processed 18000 videos
Processed 19000 videos
Processed 20000 videos
Processed 21000 videos
Processed 22000 videos
Processed 23000 videos
Processed 24000 videos
Processed 25000 videos
Processed 26000 videos
Processed 27000 videos
Processed 28000 videos
Processed 29000 videos
Processed 30000 videos
Processed 31000 videos
Processed 32000 videos
Processed 33000 videos
Processed 34000 videos
Processed 35000 videos
Processed 36000 videos
Processed 37000 videos
Processed 38000 videos
Processed 39000 videos
Processed 40000 videos
Processed 41000 videos
Processed 42000 videos
Processed 43000 videos
Processed 44000 vide