In [1]:
import json
import pandas as pd

HOME = os.environ["HOME"]
sys.path.append(HOME)
from ds_config import *

from pyspark.sql import HiveContext
from pyspark.sql import Row

import numpy as np

In [4]:
# Get URL prefixes
sqlc = HiveContext(sc)

content_servers = sqlc.sql("SELECT * FROM contentservers") \
    .map(lambda x: x.asDict()) \
    .collect()

urls_dict = { }
for x in content_servers:
    if x["httpreaduri"]:
        urls_dict[x['serverid']] = x['httpreaduri']

sc_urls = sc.broadcast(urls_dict)

del content_servers

In [5]:
def get_url(clip):
    try:
        url = []

        if "PLAY TYPE" not in clip["data"]:
            return []
        if clip["data"]["PLAY TYPE"].lower() not in ["punt", "fg"]:
            return []
        if "angles" not in clip["angles"] or clip["angles"] is None:
            return []

        angle = {}


        angles = [
            angle.asDict() 
            for angle in clip["angles"] 
            if angle.an == "Wide"
        ]
        if len(angles) == 0:
            return []
        else:
            angle = angles[0]

        good_angle = "mf" in angle and angle["mf"] is not None and len(angle["mf"]) > 0
        good_angle *= "si" in angle

        if not good_angle:
            return []

        media_file = [mf.asDict() for mf in angle["mf"]][0]
        return [(
            clip["data"]["PLAY TYPE"].lower(), 
            sc_urls.value[angle["si"]] + media_file["fn"],
            clip["team_id"]
        )]
    except:
        return []
    

In [42]:
# Uncomment to draw a random sample
urls_rdd = sc.textFile("s3n://ds-fulla/mongo/20150323/monolith/coredata/clips") \
    .sample(False, .01) \
    .map(json.loads) \
    .flatMap(get_urls) \
    .cache()

In [43]:
# Repartition and save to s3
urls_rdd.repartition(25).saveAsTextFile("s3n://hudl-hadoop/GLaDOS/fg_punt_locations_3")

In [65]:
# Collect URL's and create a DataFrame
df = pd.DataFrame(
    sc.textFile("s3n://hudl-hadoop/GLaDOS/fg_punt_locations_3")
    .map(lambda x: x.replace("u'", "'").replace("'", '"'))
    .map(json.loads)
    .collect()
)

In [66]:
df.head()

Unnamed: 0,label,url
0,punt,http://vh.hudl.com/4227/20608/29754/289817/TI/...
1,fg,http://ve.hudl.com/3844/9631/16294/164142/FS/1...
2,punt,http://ve.hudl.com/8254/20098/28843/264408/42/...
3,punt,http://vf.hudl.com/8605/20854/30206/277677/SA/...
4,punt,http://vd.hudl.com/2863/6920/19553/191011/89/0...


In [71]:
gb = df.groupby("label").label

In [73]:
gb.count()

label
fg       6259
punt    28449
Name: label, dtype: int64

In [74]:
fg = df[df.label == "fg"]
punt = df[df.label == "punt"]

In [97]:
train_fg = set(list(np.random.choice(fg.index, replace=False, size=1000)))
train_punt = set(list(np.random.choice(punt.index, replace=False, size=1000)))

In [98]:
train_fg_df = fg[fg.index.map(lambda x: x in train_fg)]
test_fg_df = fg[~fg.index.map(lambda x: x in train_fg)]

In [99]:
train_punt_df = punt[punt.index.map(lambda x: x in train_punt)]
test_punt_df = punt[~punt.index.map(lambda x: x in train_punt)]

In [108]:
(df.label == "fg").mean()

0.18033306442318775

In [105]:
train_fg_df.to_csv("fg_train.csv", index=False)
test_fg_df.to_csv("fg_test.csv", index=False)
train_punt_df.to_csv("punt_train.csv", index=False)
test_punt_df.to_csv("punt_test.csv", index=False)