In [None]:
from dask.distributed import LocalCluster, Client
from dask import delayed, compute
from collections import OrderedDict

import dask.dataframe as dd
import fastparquet as fq

import multiprocessing as mp
import pandas as pd
import numpy as np
import time

In [None]:
to_read = input()

In [None]:
data = fq.ParquetFile(to_read).to_pandas()

In [None]:
data.columns

In [None]:
programs = list(data.program.unique())
programs.sort()
programs = pd.Series(programs)

In [None]:
mask = programs.str.startswith('GOOD MORNING')
programs.loc[mask]

In [None]:
to_match = OrderedDict(year='2017', month='02', program='GOOD MORNING AMERICA', demo='WM2554', time='07:00-09:00', mon=1, tue=1, wed=1, thur=1, fri=1)

The following section compares query runtime with and without reduction. Building a chain of loc slows things down severely. The best time was without reduction.

In [None]:
%%time
frame, mask = data, np.ones(len(data)).astype(bool)
for k, v in to_match.items():
    t1 = time.time()
    #mask &= frame.loc[mask, k] == v
    mask &= data[k] == v
    frame = frame.loc[mask]
    t2 = time.time()
    #print(k, t2-t1)
frame.imp.sum()

In [None]:
%%time
mask = np.ones(len(data)).astype(bool)
for k, v in to_match.items():
    t1 = time.time()
    #mask &= data.loc[mask, k] == v
    mask &= data[k] == v
    t2 = time.time()
    #print(k, t2-t1)
frame = data.loc[mask]
frame.imp.sum()

time to daskify!

In [None]:
cluster = LocalCluster()

In [None]:
client = Client(cluster)

In [None]:
df = dd.from_pandas(data, npartitions=mp.cpu_count())

In [None]:
%%time
ddata = client.persist(df)

In [None]:
ddata.visualize()

In [None]:
#masks = [ddata.loc[:, k] == v for k, v in to_match.items()]
masks = [ddata[k] == v for k, v in to_match.items()]

In [None]:
mask = masks[0]
for m in masks[1:]:
    mask &= m

In [None]:
y = ddata.loc[mask, 'imp'].sum()

In [None]:
%%timeit
yy = y.compute()

In [None]:
y.compute()

manual sanity check and loc investigtaion

In [None]:
%%timeit
m = data.loc[:, 'year'] == '2017'
m &= data.loc[m, 'month'] == '02'
m &= data.loc[m, 'demo'] == to_match['demo']
m &= data.loc[m, 'program'] == to_match['program']
m &= data.loc[m, 'time'] == '07:00-09:00'
data[m].imp.sum()

In [None]:
%%timeit
m = data.year == '2017'
m &= data.month == '02'
m &= data.demo == to_match['demo']
m &= data.program == to_match['program']
m &= data.time == '07:00-09:00'
data[m].imp.sum()