In [1]:
import pandas as pd
import pandavro as pdx
import json

df_history = pdx.read_avro("../data/history.avro")
with open("../data/dictionary.json") as fp:
    dictionary = json.load(fp)
df_timestamp = pd.read_json("../data/metadata.json", lines=True)

keys = [elem for elem in dictionary]
values = [dictionary[idx] for idx in keys]
df_dict = pd.DataFrame({"document" : keys, "pos" : values})

df_history = df_history.merge(df_dict, on="document")

In [2]:
df = df_history.groupby("id")

In [3]:
df_history

Unnamed: 0,document,id,timestamp,pos
0,@moondol_209,#cfbda49fbe26a2668b00428ca561d9a7,2019-02-18T00:00:00,195872
1,@moondol_209,#d2882e2d92af7b9603df8d3496478a6a,2019-01-12T17:00:00,195872
2,@moondol_209,#50001d770ccdc6c4ee2739328ae60051,2019-01-12T13:00:00,195872
3,@moondol_209,#a30676fa0756df1ee7ebe956db493d71,2019-02-06T15:00:00,195872
4,@moondol_209,#20c9d2f2f557f943bdaaf41d45ed644b,2019-01-27T20:00:00,195872
5,@moondol_209,#7785ece1084786c70b00942b0c9a0cb6,2019-01-12T19:00:00,195872
6,@moondol_209,#8b6b67701c2ad8f8e604a0c9cd1b5209,2019-01-12T22:00:00,195872
7,@moondol_209,#e8bc8a922e7596221a9289f31ce80db9,2019-02-21T21:00:00,195872
8,@moondol_209,#a1684b1d8d625fe3ccaf463d96164465,2019-02-25T02:00:00,195872
9,@moondol_209,#5548c6d5aca91bdca91c348238429956,2019-01-12T15:00:00,195872


In [4]:
df_history["date"] = pd.to_datetime(df_history["timestamp"], format="%Y-%m-%dT%H:%M:%S")

In [5]:
df_train = df_history[(df_history['date'] < '2019-02-22')]
df_eval = df_history[(df_history['date'] >= '2019-02-22')]

In [15]:
import dask.dataframe as dd
import multiprocessing

In [86]:
dask_frame = dd.from_pandas(df_history, npartitions=4 * multiprocessing.cpu_count())

In [87]:
def pad_array(x, flag):
    if len(x) == 15:
        return x
    else:
        
        if len(x) == 0:
            print(flag)
        
        to_pad = [x[0]] * (15 - len(x))
        return to_pad + x
        

def top_n(frame, columns):
    
    df_train = frame[(frame['date'] < '2019-02-22')]
    df_train = df_train.sort_values(by=columns)
    train_array = list(df_train["pos"].values)
    train_length = len(train_array)
    
    df_eval = frame[(frame['date'] >= '2019-02-22')]
    df_eval = df_eval.sort_values(by=columns)
    eval_array = list(df_eval["pos"].values)
    eval_length = len(eval_array)
    
    if train_length == 0:
        train_array = list(eval_array)
        return np.array(pad_array(train_array[-15:], 1)), np.array(eval_array)
    
    if eval_length == 0:
        if train_length >= 20:
            train_array = train_array[-20:-5]
            eval_array = train_array[-5:]
        elif train_length < 20 and train_length > 15:
            x = train_length - 15
            train_array = train_array[-train_length:-x]
            eval_array = train_array[-x:]
        else:
            if train_length != 1:
                train_array = train_array[-train_length:-1]
            eval_array = train_array[-1:]
        return np.array(pad_array(train_array[-15:], 2)), np.array(eval_array)
    
    return np.array(pad_array(train_array[-15:], 3)), np.array(eval_array)
        
result = dask_frame\
    .groupby("id", group_keys=False)\
    .apply(top_n, columns=["timestamp", "document"])

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


In [88]:
res = result.compute()

KeyboardInterrupt: 

In [35]:
array.append

['#cfbda49fbe26a2668b00428ca561d9a7',
 '#d2882e2d92af7b9603df8d3496478a6a',
 '#50001d770ccdc6c4ee2739328ae60051',
 '#a30676fa0756df1ee7ebe956db493d71',
 '#20c9d2f2f557f943bdaaf41d45ed644b',
 '#7785ece1084786c70b00942b0c9a0cb6',
 '#8b6b67701c2ad8f8e604a0c9cd1b5209',
 '#e8bc8a922e7596221a9289f31ce80db9',
 '#a1684b1d8d625fe3ccaf463d96164465',
 '#5548c6d5aca91bdca91c348238429956',
 '#7582bc49de59e49a9e61ecd49b9e1472',
 '#6a93ec48118d358f8b8706a12c21f6c5',
 '#8b13555c98e6b4c8dd43ab6b6e3471e7',
 '#3751ab29d941647c17baa97e29d21c79',
 '#7b03a728bd87fd1309b7ac38500e5c6f',
 '#9ef170cacaa30f62d6de34a5ad42a37e',
 '#f3a503659b064ab6de10d3ac8252bef4',
 '#1ae1485d8000a41c624082e4147e6360',
 '#5dffdbe473c7da70a28c78de4709f180',
 '#6418104e7a926a5cbc7060f3fd5ab86f',
 '#d759a20b9b17cb6735fc1a00f1efc927',
 '#7c497ac727c8a922233e671c94d405ba',
 '#f823fafcf1de1be97b5cea892bbc8805',
 '#bd1687ad67c9e9d81927bbc9527629ee',
 '#37c3ae4cbd77682f5df5f6bb30875d43',
 '#f12dcc2fafe3a75251deee425d45e588',
 '#1ab9d154c

In [19]:
temp = pd.DataFrame({
    "id" : [[1, 2, 3]]
})

In [38]:
temp = dd.from_pandas(temp, npartitions=1)


In [40]:
temp.compute()

Unnamed: 0,id
0,"[1, 2, 3]"


In [28]:
df_train = dask_frame[(dask_frame['date'] < '2019-02-22')]
df_eval = dask_frame[(dask_frame['date'] >= '2019-02-22')]

In [31]:
type(df_train.compute())

pandas.core.frame.DataFrame

In [80]:
array = [1]
array[-1:-1]

[]

In [49]:
import numpy as np

df = pd.DataFrame({'x': [1, 2, 3, 4, 5],
                   'y': [1., 2., 3., 4., 5.]})
ddf = dd.from_pandas(df, npartitions=2)

def myadd(row, a, b=1):
    return np.array(row), np.array(row) + a

res = ddf.apply(myadd, axis=1, args=(2,), b=1.5) 

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=(None, 'object'))



In [63]:
res = res.compute()

In [67]:
res[0][1]

array([3., 3.])

In [59]:
def pad_array(x):
    if len(x) == 15:
        return x
    else:
        to_pad = [x[0]] * (15 - len(x))
        return to_pad + x
    
len(pad_array(array))

15

In [None]:
temp = pd.D