In [1]:
import dask.dataframe as dd
import simplejson as json

import dask
from dask.distributed import Client
import pandas as pd
from collections import Counter

In [2]:
INPUT_MASK = '../events/{event_count}-{nfiles}/year=*/month=*/day=*/hour=*/*/part*.parquet'
OUTPUT_MASK = '../aggs_dask/{event_count}-{nfiles}/*.json'


In [3]:
dask.set_options(get=dask.get)

<dask.context.set_options at 0x116eb9150>

In [4]:
client = Client()



--------------------
counter_chunk
1
--------------------
--------------------
counter_chunk
1
counter_chunk
1
--------------------
counter_chunk
1
                 customer                       url                  ts  \
referrer                                                                  
http://bing.com/    a.com  http://a.com/articles/16 2017-09-17 08:00:00   

                          referrer  counts  
referrer                                    
http://bing.com/  http://bing.com/       1                     customer                       url                  ts  \
referrer                                                                    
http://google.com/    a.com  http://a.com/articles/19 2017-09-17 06:00:00   

                              referrer  counts  
referrer                                        
http://google.com/  http://google.com/       1  

                 customer                       url                  ts  \
referrer                             

Function:  execute_task
args:      ((<built-in function apply>, <function _agg_finalize at 0x1169bcd70>, [(<function _concat at 0x116977aa0>, [                                                       count-session_id-6d6c48f47a6d276a1bd7fe2a2248b673  \
customer url                      ts                                                                       
a.com    http://a.com/articles/17 2017-09-17 01:00:00                                                  1   

                                                       count_unique-session_id-c06b832f28152d5e12d83925f0c07826  \
customer url                      ts                                                                              
a.com    http://a.com/articles/17 2017-09-17 01:00:00                                                  1          

                                                      counter-referrer-95243ccd29300c6455d4161a8174d006  
customer url                      ts                                            

--------------------
counter_chunk
--------------------
[(u'http://bing.com/', 1)]
--------------------
--------------------
counter_chunk
counter_chunk
counter_chunk
[(u'http://google.com/', 1)]
[(u'http://google.com/', 1)]
[(u'http://bing.com/', 1)]
--------------------
--------------------
counter_chunk
--------------------
counter_chunk
counter_chunk
[(u'http://google.com/', 1), (u'http://bing.com/', 2)]
--------------------
[(u'http://facebook.com/', 1)]
[(u'http://facebook.com/', 1)]
counter_chunk
[(u'http://google.com/', 1)]
--------------------
counter_agg
('chunk', [(u'http://google.com/', 1)])
('total', Counter({u'http://google.com/': 1}))
--------------------
counter_agg
('chunk', [(u'http://bing.com/', 1)])
('total', Counter({u'http://bing.com/': 1}))
--------------------
counter_agg
('chunk', [(u'http://bing.com/', 2)])
('total', Counter({u'http://bing.com/': 2}))
--------------------
counter_agg
('chunk', [(u'http://bing.com/', 1)])
('total', Counter({u'http://bing.com/':

In [83]:
def read_data(read_path):
    """Reads the original Parquet data.
    :returns: DataFrame
    """
    df = dd.read_parquet(read_path).drop('hour', axis=1)
    return df


In [178]:
def counter_chunk(ser):
    sdf = ser.value_counts().to_frame('counts').reset_index()
    sdf = sdf.set_index(sdf.referrer)
    res = sdf.counts.to_dict().items()
    return res


In [179]:
def counter_agg(chunks):
    total = Counter()
    for chunk in chunks:
        if not isinstance(chunk[0], tuple):
            chunk = [chunk]
        current = Counter(dict(chunk))
        total = total + current
    return json.dumps(dict(total))


In [180]:
EVENT_COUNT = 10
NFILES = 24


In [181]:
read_path = INPUT_MASK.format(event_count=EVENT_COUNT, nfiles=NFILES)
write_path = OUTPUT_MASK.format(event_count=EVENT_COUNT, nfiles=NFILES)

In [182]:
df = read_data(read_path)

In [183]:
df.head(2, npartitions=8)

Unnamed: 0,url,referrer,session_id,ts,customer
0,http://a.com/articles/17,http://bing.com/,yyy,2017-09-17 01:03:00,a.com
0,http://a.com/articles/19,http://google.com/,yyy,2017-09-17 06:19:00,a.com


In [184]:
df['ts'] = df['ts'].dt.floor('1H')

In [185]:
gb = df.groupby(['customer', 'url', 'ts'])

In [186]:
counter = dd.Aggregation(
    'counter',
    lambda s: counter_chunk(s),
    lambda s: s.apply(counter_agg),
)


In [187]:
count_unique = dd.Aggregation(
    'count_unique',
    lambda s: s.nunique(),
    lambda s: s.nunique()
)


In [188]:
ag = gb.agg({
    'session_id': [count_unique, 'count'],
    'referrer': counter}
)


--------------------
counter_chunk
[('foo', 2)]
--------------------
counter_agg
('chunk', [('foo', 2)])
('total', Counter({'foo': 2}))


In [190]:
ag.head(8)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,referrer,session_id,session_id
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,counter,count_unique,count
customer,url,ts,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2
a.com,http://a.com/articles/13,2017-09-17 12:00:00,"{""http://google.com/"": 1}",1,2
a.com,http://a.com/articles/16,2017-09-17 08:00:00,"{""http://bing.com/"": 1}",1,1
a.com,http://a.com/articles/16,2017-09-17 12:00:00,"{""http://bing.com/"": 2}",1,1
a.com,http://a.com/articles/17,2017-09-17 01:00:00,"{""http://bing.com/"": 1}",1,1
a.com,http://a.com/articles/17,2017-09-17 23:00:00,"{""http://facebook.com/"": 1}",1,1
a.com,http://a.com/articles/19,2017-09-17 06:00:00,"{""http://google.com/"": 1}",1,1
a.com,http://a.com/articles/2,2017-09-17 19:00:00,"{""http://google.com/"": 1}",1,1
a.com,http://a.com/articles/20,2017-09-17 10:00:00,"{""http://google.com/"": 1}",1,1
