In [1]:
import pandas as pd
import dask.bag as db
import dask.dataframe as dd
import json
import datetime

  import pandas.util.testing as tm


In [2]:
posts = (db.read_text('posts/file*.jsonl')
           .map(json.loads))

In [3]:
def flatten(x):
    y = {}
    y['group'] = x['group']
    y['date'] = x['date']
    y['text'] = x['text']
    y['likes'] = x['likes']['count']
    y['marked_as_ads'] = bool(x['marked_as_ads'])
    return y

posts_flattened = posts.map(flatten)

In [4]:
freq = posts.pluck('group').frequencies()

In [5]:
%%time
freq_comp = freq.compute()

CPU times: user 47.4 ms, sys: 37.6 ms, total: 85.1 ms
Wall time: 10.1 s


In [8]:
pd.Series(dict(freq_comp))

bbc                     54898
chaosss_now              7414
currenttime             20975
echomsk                 61233
eshkin_krot              3099
ia_panorama              3794
lentaru                124867
mbkhmedia               11937
mediazzzona             23136
meduzaproject           59690
novgaz                  53261
novostnaya_lenta_vk      2658
oldlentach              29002
orangeeast              20568
rublacklist              9547
satyrabezsortyra        41551
svobodaradio            51015
takiedela_ru            14678
tj                      73473
true_lentach            21404
tsargradtv             102997
tv.jihad                 2709
tvrain                 119969
dtype: int64

In [9]:
min_date = posts.pluck('date').min()

In [10]:
min_date_comp = datetime.datetime.fromtimestamp(min_date.compute())
min_date_comp

datetime.datetime(2010, 8, 27, 19, 27, 22)

In [11]:
def mean(x):
    x = list(x)
    return sum(x) / len(x)


means = (posts_flattened
         .groupby(lambda x: x['group'])
         .map(lambda x: (x[0], mean(y['marked_as_ads'] for y in x[1]))))

In [12]:
%%time
means.compute()

CPU times: user 45.6 s, sys: 3.82 s, total: 49.4 s
Wall time: 51.7 s


[('orangeeast', 0.0028199144301828084),
 ('mbkhmedia', 8.377314233056883e-05),
 ('rublacklist', 0.0),
 ('tsargradtv', 3.88360826043477e-05),
 ('satyrabezsortyra', 7.220042838920844e-05),
 ('tv.jihad', 0.0),
 ('takiedela_ru', 0.0),
 ('lentaru', 2.4025563199243997e-05),
 ('currenttime', 0.0),
 ('tj', 0.0011841084480013065),
 ('eshkin_krot', 0.0),
 ('echomsk', 0.0005552561527281041),
 ('svobodaradio', 1.9602077820248946e-05),
 ('ia_panorama', 0.0),
 ('tvrain', 6.66838933391126e-05),
 ('novostnaya_lenta_vk', 0.0018811136192626034),
 ('bbc', 0.0002550183977558381),
 ('true_lentach', 0.0023827321995888617),
 ('chaosss_now', 0.03371998920960345),
 ('mediazzzona', 0.0),
 ('oldlentach', 0.010585476863664576),
 ('novgaz', 9.387732111676462e-05),
 ('meduzaproject', 0.0)]

In [13]:
ddf = (posts_flattened.to_dataframe()
                      .astype({'group': 'category', 
                               'text': 'string'})
                      .repartition(npartitions=12))

In [14]:
ddf_per = ddf.persist()

In [15]:
means = ddf.groupby('group').marked_as_ads.mean()

In [17]:
means = ddf_per.groupby('group').marked_as_ads.mean()

In [18]:
%%time
means_comp = means.compute(scheduler='threads')

CPU times: user 73.7 ms, sys: 10.4 ms, total: 84.1 ms
Wall time: 62.5 ms


In [19]:
df = ddf.compute()

In [60]:
%%time
means = df.groupby('group').marked_as_ads.mean()

CPU times: user 3.97 ms, sys: 2.26 ms, total: 6.23 ms
Wall time: 5.07 ms


In [81]:
lens = (ddf_per['text'].astype('object')
                       .str.split()
                       .apply(len).mean())

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


In [83]:
lens.compute()

40.25878109435694

In [22]:
import os
os.mkdir('data')

In [23]:
df.to_csv('data/texts.csv')

In [2]:
df = pd.read_csv('data/texts.csv')

In [6]:
df.group.value_counts().index

Index(['lentaru', 'tvrain', 'tsargradtv', 'tj', 'echomsk', 'meduzaproject',
       'bbc', 'novgaz', 'svobodaradio', 'satyrabezsortyra', 'oldlentach',
       'mediazzzona', 'true_lentach', 'currenttime', 'orangeeast',
       'takiedela_ru', 'mbkhmedia', 'rublacklist', 'chaosss_now',
       'ia_panorama', 'eshkin_krot', 'tv.jihad', 'novostnaya_lenta_vk'],
      dtype='object')

In [11]:
df.group.unique()

array(['bbc', 'chaosss_now', 'currenttime', 'echomsk', 'eshkin_krot',
       'ia_panorama', 'lentaru', 'mbkhmedia', 'mediazzzona',
       'meduzaproject', 'novgaz', 'novostnaya_lenta_vk', 'oldlentach',
       'orangeeast', 'rublacklist', 'satyrabezsortyra', 'svobodaradio',
       'takiedela_ru', 'tj', 'true_lentach', 'tsargradtv', 'tv.jihad',
       'tvrain'], dtype=object)

In [None]:
groups = ['bbc', 'chaosss_now', 'currenttime', 'echomsk', 'eshkin_krot',
       'ia_panorama', 'lentaru', 'mbkhmedia', 'mediazzzona',
       'meduzaproject', 'novgaz', 'novostnaya_lenta_vk', 'oldlentach',
       'orangeeast', 'rublacklist', 'satyrabezsortyra', 'svobodaradio',
       'takiedela_ru', 'tj', 'true_lentach', 'tsargradtv', 'tv.jihad',
       'tvrain']