In [1]:
%reload_ext autoreload
%autoreload 2

In [64]:
from pathlib import Path
import json
import pandas as pd
from toolz import *
import boto3
import gzip
import shutil
from itertools import starmap
import pprint
from collections import namedtuple, Counter

In [3]:
def init_dir(adir):
    #if adir.isdir(): 
    #    shutil.rmtree()
    adir.mkdir(exist_ok=True)
    print(f"initialized directory {adir}")

In [4]:
DATA_D = Path.home() / 'DATA' / 'NBC'
SAMPLES_D = DATA_D / 'samples'
WORK_D = DATA_D / 'work'
init_dir(WORK_D)

initialized directory /Users/wmcabee/DATA/NBC/work


In [5]:
def read_mfiles(pattern):
    reader = SAMPLES_D.glob(pattern)
    #reader = take(3, reader)
    reader = map(pd.read_csv, reader)
    df = pd.concat(reader)
    df = df.rename(columns={'name': 'key'})
    return df


In [6]:
s3 = boto3.resource('s3')
bucket = s3.Bucket('nbc-digital-cloned')

In [7]:
def filter_files(df, sample=None):
    
    if  sample is not None:
        df = df[df['key'].isin(sample.keys())].copy()
    return df

In [8]:
def download_events_file(key, outdir):
    key = Path(key)
    filename_gz = outdir / key.name
    if not filename_gz.is_file():
        print(f'downloading {str(key)}')
        bucket.download_file(str(key), str(filename_gz))
    return str(key), str(filename_gz)

In [35]:
EVENT_NAMES_TO_KEEP = {
    'Ad End', 'Ad Start',
    'Ad Pod Start', 'Ad Pod End',
    'Video Start', 'Video End',
}

EventRec = namedtuple('EventRec', 'key idx event_type event_name event')

def filter_events(key, idx, event):
    details = event['events']
    if len(details) ==0:
        return None
    detail = details[0]
    event_type = detail['event_type']
    data = detail['data']
    custom_attrs = data['custom_attributes']
    if len(custom_attrs) ==0 :
        return None
    event_name = data['event_name']
    if event_name not in EVENT_NAMES_TO_KEEP:
        return None
    #print(event_type)
    return EventRec(key, idx,  event_type, event_name, event)

In [36]:
def read_events(key, filename_gz, custom_filter=None):
    
    with gzip.open(filename_gz, 'r') as f_in:
        print(f'processing file {filename_gz}')
        reader = enumerate(f_in, start=1)
        reader = ((key, idx, json.loads(line)) for idx, line in reader)
        reader = starmap(filter_events, reader)
        reader = filter(None,reader)
        if custom_filter:
            reader = custom_filter(reader)
            
        for x in reader:
            yield x

In [79]:
def default_select_func(event_rec: EventRec, event_cnt: int) -> bool:
    return True, event_cnt
    

def print_events(reader, select_func=default_select_func):
    event_cnt = 0
    for event_rec in reader:
        print_event, at_event = select_func(event_rec, event_cnt)
        if print_event:
            event_cnt = event_cnt + 1
            print('event_cnt:', event_cnt)
            if at_event == event_cnt:
                pprint.pprint(event_rec._asdict())
                print('stopping early')
                break
        yield event_rec

In [133]:
def parse_customer_id(user_identities):
    if user_identities is None or len(user_identities) == 0:
        return 'None'
    
    if len(user_identities) > 1:
        identity_types = frequencies(x['identity_type'] for x in user_identities)
        if set(identity_types.keys()) != {'customer_id', 'other'}:
            raise Exception("Test case: unexpected identity_type")

    customer_ids  = list(x['identity'] for x in user_identities if x['identity_type'] == 'customer_id')
    if len(customer_ids) >1:
        raise Exception("Test case: multiple customer ids")
        
    customer_id = 'None' if len(customer_ids) == 0  else customer_ids[0]
    return customer_id

def parse_event(event_rec:EventRec):
    try: 
        event = event_rec.event
        detail = event['events'][0]
        event_type = detail['event_type']
        data = detail['data']
        custom_attrs = data['custom_attributes']
        customer_id = parse_customer_id(user_identities=event.get('user_identities'))
        drec = {
            'key' : event_rec.key,
            'idx': event_rec.idx,
            'mpid': event['mpid'],
            'event_type' : event_type,
            'event_name' : data['event_name'],
            'customer_id': customer_id,
            'mvpd': custom_attrs['MVPD'],
            'ip': event['ip'],
            'platform': custom_attrs['Platform'],
            
            'video_id': custom_attrs.get('Video ID', 'None'),
            'video_type': custom_attrs.get('Video Type', 'None'),
            'video_duration': custom_attrs.get('Video Duration'),
            'show': custom_attrs.get('Show', 'None'),
            'season': custom_attrs.get('Season', 'None'),
            'episode_number': custom_attrs.get('Episode Number', 'None'),
            'episode_title': custom_attrs.get('Episode Title', 'None'),


            'video_end_type': custom_attrs.get('Video End Type', 'None'),
            
            'event_id':  data['event_id'],
            'event_num': data.get('event_num'),
            'session_id': data['session_id'],

            'duration_watched': custom_attrs.get('Duration Watched'),
            'resume': custom_attrs.get('Resume', 'None'),
            'resume_time': custom_attrs.get('Resume Time'),
            'event_start_unixtime_ms': data['event_start_unixtime_ms'],
            'session_start_unixtime_ms': data['session_start_unixtime_ms'],
            'timestamp_unixtime_ms': data['timestamp_unixtime_ms'],
            
            # Ad end specific fields
            'ad_duration_watched': custom_attrs.get('durationWatched', None),
            'ad_end_type': custom_attrs.get('endType', 'None'),
            'percentage_complete': custom_attrs.get('percentageCompleted', None),
            
            # Advertising examples
            'campaign_name': custom_attrs.get('campaignName', 'None'),
            'creative_name': custom_attrs.get('creativeName', 'None'),
            
            # Ad Pod specific fields
            'ad_pod_duration': custom_attrs.get('Ad Pod Duration', None),
            'ad_pod_qty': custom_attrs.get('Ad Pod Quantity', None),
            'ad_pod_type': custom_attrs.get('Ad Pod Type', 'None'),
        }
    except Exception as e:
        print( f"EROROR: problem event: '{e}'", type(e) )
        pprint.pprint(event)
        raise
    return drec

In [134]:
def generate_sample():
    sample = DATA[DATA.event_type == 'custom_event'].sample(20)
    return sample.groupby('key').idx.agg( lambda x: set(x)).to_dict() 
#SAMPLE = generate_sample()
#SAMPLE

In [138]:
sample = None
mfiles = read_mfiles('android*.csv') # key='NBCProd/Android/NBC_20190721000056010325_85540.txt.gz'
mfiles = mfiles.iloc[:30]
mfiles = filter_files(df=mfiles , sample=sample)
reader = (download_events_file(key, outdir=WORK_D, ) for key in mfiles.key) # PosixPath('/Users/wmcabee/DATA/NBC/work/NBC_20190721000056010325_85540.txt')
reader = (read_events(key, filename_gz) for key, filename_gz in  reader)
reader = concat(reader)
#reader = print_events(reader)
#reader = print_events(reader, select_func= lambda event_rec, event_cnt: (event_rec.event_name == 'Ad Pod End', 10))

reader = map(parse_event, reader) # record from parsed event
df = pd.DataFrame.from_records(reader)
print('done')
#reader

processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000056010325_85540.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000113773691_56054.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000121543970_93823.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000130825897_67658.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000156556714_67731.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000210900586_87848.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000558444736_07714.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000607923887_77168.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000620973708_69520.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000630105741_78870.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000658167548_79257.txt.gz
processing file /Users/wmcabee/DATA/NBC/work/NBC_20190721000713383767_19683.txt.gz
proc

In [143]:
print(df.event_name.value_counts())

124577


In [144]:
DATA = df
DATA.head().T

Unnamed: 0,0,1,2,3,4
key,NBCProd/Android/NBC_20190721000056010325_85540...,NBCProd/Android/NBC_20190721000056010325_85540...,NBCProd/Android/NBC_20190721000056010325_85540...,NBCProd/Android/NBC_20190721000056010325_85540...,NBCProd/Android/NBC_20190721000056010325_85540...
idx,7,24,25,27,28
mpid,-2460710111339786074,-8808560119265257687,-8808560119265257687,1885070203579443563,1885070203579443563
event_type,custom_event,custom_event,custom_event,custom_event,custom_event
event_name,Video Start,Ad End,Ad Pod End,Ad Pod Start,Ad Start
customer_id,505b4a64-e497-43e0-86fa-79e6ffbaf125,15eccdce-48d7-4d28-84fb-fc43c1facfce,15eccdce-48d7-4d28-84fb-fc43c1facfce,36c942b5-5e75-4911-962d-e959d0da346d,36c942b5-5e75-4911-962d-e959d0da346d
mvpd,Optimum,Unauthenticated,Unauthenticated,Unauthenticated,Unauthenticated
ip,2601:240:cb01:27f2:d834:42ba:57b0:cdef,2607:fb90:929a:b8bd::abd7:7b01,2607:fb90:929a:b8bd::abd7:7b01,2601:409:c100:11e0:1131:b1a6:2a01:d73e,2601:409:c100:11e0:1131:b1a6:2a01:d73e
platform,FireTV,Android,Android,FireTV,FireTV
video_id,3988962,3986005,3986005,3970922,3970922


In [141]:
outfile = WORK_D / 'android.csv'
DATA.to_csv(outfile, index=None)