In [1]:
# imports
from downsample import *
import pandas as pd
from dask import delayed
import dask.dataframe as dd
import numpy as np

In [41]:
#paths
datapath='../../datasets/'
apath = os.path.join(datapath, 'tracks.csv')
bpath = os.path.join(datapath, 'songs.csv')

In [49]:
# load datasets
# tmp_A = pd.read_csv(apath, encoding='latin-1')
# tmp_B = pd.read_csv(bpath, encoding='latin-1')
# tmp_A['id'] = range(len(tmp_A))
# tmp_B['id'] = range(len(tmp_B))
A = dd.read_csv(apath, blocksize=25000000, encoding='latin-1')
B = dd.read_csv(bpath, blocksize=25000000, encoding='latin-1')


In [50]:
type(A)

dask.dataframe.core.DataFrame

In [52]:
A.npartitions

3

In [53]:
spath = os.path.join(datapath, 'stopwords')
stopwords = get_stop_words(spath)
stopwords.extend(['and', 'in', 'the', 'my', 'me', 'to', 'you', 'i', 'andre', 'from', 'a', 'of', 'the', 'version', 'love', 'live', 'la', 'mix', 'album', 'dont'])
stopwords = list(set(stopwords))

In [54]:
# preprocessing, tokenizing: ltable
ltokens = [0]*A.npartitions
for i in xrange(A.npartitions):
    lstrings = (delayed)(preprocess_table)(A.get_partition(i), 'id')
    ltokens[i] = (delayed)(tokenize_strings)(lstrings, stopwords=stopwords)
invindex = (delayed)(build_inv_index)(ltokens)
    

In [55]:
#sample
Bsample = B.sample(frac=0.01, random_state=0)

In [64]:
help(Bsample.reduction)

Help on method reduction in module dask.dataframe.core:

reduction(self, chunk, aggregate=None, combine=None, meta='__no_default__', token=None, split_every=None, chunk_kwargs=None, aggregate_kwargs=None, combine_kwargs=None, **kwargs) method of dask.dataframe.core.DataFrame instance
    Generic row-wise reductions.
    
    Parameters
    ----------
    chunk : callable
        Function to operate on each partition. Should return a
        ``pandas.DataFrame``, ``pandas.Series``, or a scalar.
    aggregate : callable, optional
        Function to operate on the concatenated result of ``chunk``. If not
        specified, defaults to ``chunk``. Used to do the final aggregation
        in a tree reduction.
    
        The input to ``aggregate`` depends on the output of ``chunk``.
        If the output of ``chunk`` is a:
    
        - scalar: Input is a Series, with one row per partition.
        - Series: Input is a DataFrame, with one row per partition. Columns
          are the rows 

In [56]:
# preprocessing, tokenizing, probe: ltable
probe_results = [0]*Bsample.npartitions
for i in xrange(Bsample.npartitions):
    rstrings = (delayed)(preprocess_table)(Bsample.get_partition(i), 'id')
    rtokens = (delayed)(tokenize_strings)(rstrings, stopwords=stopwords)
    result = (delayed)(probe)(rtokens, invindex, 1)
    probe_results[i] = result


In [57]:
# get lrids
lrids = delayed(get_lrids, nout=2)(probe_results)

In [58]:
def postprocess(lrids, A, B, lid, rid):
    if isinstance(A, pd.core.frame.DataFrame):
        s_ltbl = A[A[lid].isin(lrids[0])]
    else:   
        s_ltbl = A.map_partitions(lambda x: x[x[lid].isin(lrids[0])], meta=A)
    
    if isinstance(B, pd.core.frame.DataFrame):
        s_rtbl = B[B[rid].isin(lrids[1])]
    else:
        s_rtbl = B.map_partitions(lambda x: x[x[rid].isin(lrids[1])], meta=B)
    return (s_ltbl, s_rtbl)

In [59]:
results = delayed(postprocess)(lrids, A, Bsample, 'id', 'id')
s_ltbl = dd.from_delayed(results[0], meta=A)
s_rtbl = dd.from_delayed(results[1], meta=Bsample)

In [68]:
help(dd.from_delayed)

Help on function from_delayed in module dask.dataframe.io.io:

from_delayed(dfs, meta=None, divisions=None, prefix='from-delayed', metadata=None)
    Create Dask DataFrame from many Dask Delayed objects
    
    Parameters
    ----------
    dfs : list of Delayed
        An iterable of ``dask.delayed.Delayed`` objects, such as come from
        ``dask.delayed`` These comprise the individual partitions of the
        resulting dataframe.
    meta : pd.DataFrame, pd.Series, dict, iterable, tuple, optional
        An empty ``pd.DataFrame`` or ``pd.Series`` that matches the dtypes and
        column names of the output. This metadata is necessary for many
        algorithms in dask dataframe to work.  For ease of use, some
        alternative inputs are also available. Instead of a ``DataFrame``, a
        ``dict`` of ``{name: dtype}`` or iterable of ``(name, dtype)`` can be
        provided. Instead of a series, a tuple of ``(name, dtype)`` can be
        used. If not provided, dask will 

In [61]:
from dask.diagnostics import Profiler, ResourceProfiler, ProgressBar

In [62]:
with Profiler() as prof, ProgressBar():
    sa = s_ltbl.compute()

[###########                             ] | 28% Completed |  2.1s

  result = (True, func(*args, **kwds))


[########################################] | 100% Completed | 57.1s


In [63]:
prof.visualize()