In [1]:
from dask_sql import Context
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd

In [2]:
client = Client()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 36457 instead


In [3]:
c = Context()
ddf = dd.read_csv("scripts/output_dir/N_1e8_K_1e2_single.csv")
c.create_table("h2oData", ddf)

In [4]:
def check_query(q, sql, sort_by=''):
    sql = sql.sort_values(sort_by)
    sql = sql.reset_index()
    sql = sql.drop('index', axis=1, errors='ignore')
    q = q.sort_values(sort_by)
    q = q.reset_index()
    q = q.drop('index', axis=1, errors='ignore')

    pd.testing.assert_frame_equal(q, sql)

In [5]:
%%time
#q1
ddfq1 = ddf[["id1", "v1"]]
q1 = ddfq1.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()

CPU times: user 913 ms, sys: 375 ms, total: 1.29 s
Wall time: 8.18 s


In [6]:
%%time 
sql_q1 = c.sql("SELECT id1, sum(v1) AS v1 FROM h2oData GROUP BY id1").compute()

CPU times: user 484 ms, sys: 179 ms, total: 663 ms
Wall time: 3.78 s


In [7]:
check_query(q1, sql_q1, sort_by='id1')

In [8]:
%%time
ddf_q2 = ddf[["id1", "id2", "v1"]]
q2 = (ddf_q2.groupby(["id1", "id2"], dropna=False, observed=True)
        .agg({"v1": "sum"})
        .compute())

CPU times: user 613 ms, sys: 204 ms, total: 817 ms
Wall time: 5.04 s


In [9]:
%%time 
sql_q2 = c.sql("SELECT id1, id2, sum(v1) AS v1 FROM h2oData GROUP BY id1, id2").compute()

CPU times: user 643 ms, sys: 208 ms, total: 851 ms
Wall time: 5.6 s


In [10]:
check_query(q2, sql_q2, sort_by=['id1', 'id2'])

In [11]:
%%time 
ddf_q3 = ddf[["id3", "v1", "v3"]]
q3 = (
        ddf_q3.groupby("id3", dropna=False, observed=True)
        .agg({"v1": "sum", "v3": "mean"})
        .compute()
    )

CPU times: user 3.02 s, sys: 1.39 s, total: 4.41 s
Wall time: 38.5 s


In [13]:
%%time 
sql_q3 = c.sql("""SELECT id3, sum(v1) AS v1, avg(v3) AS v3 
                FROM h2oData 
                GROUP BY 
                id3""").compute()

CPU times: user 3.81 s, sys: 1.54 s, total: 5.35 s
Wall time: 48.5 s


In [14]:
check_query(q3, sql_q3, sort_by=['v1', 'v3'])

In [15]:
%%time
ddf_q4 = ddf[["id4", "v1", "v2", "v3"]]
q4 = (
        ddf_q4.groupby("id4", dropna=False, observed=True)
        .agg({"v1": "mean", "v2": "mean", "v3": "mean"})
        .compute()
    )

CPU times: user 735 ms, sys: 0 ns, total: 735 ms
Wall time: 2.54 s


In [16]:
%%time 
sql_q4 = c.sql("""SELECT id4, avg(v1) AS v1, avg(v2) AS v2, avg(v3) AS v3 
                  FROM h2oData 
                  GROUP BY 
                  id4""").compute()

CPU times: user 676 ms, sys: 0 ns, total: 676 ms
Wall time: 2.5 s


In [17]:
check_query(q4, sql_q4, sort_by=['v1', 'v2', 'v3'])

In [18]:
%%time
ddf_q5 = ddf[["id6", "v1", "v2", "v3"]]
q5 = (
        ddf_q5.groupby("id6", dropna=False, observed=True)
        .agg({"v1": "sum", "v2": "sum", "v3": "sum"})
        .compute()
    )

CPU times: user 1.62 s, sys: 370 ms, total: 1.99 s
Wall time: 7.67 s


In [19]:
%%time 
sql_q5 = c.sql("""SELECT id6, sum(v1) AS v1, sum(v2) AS v2, sum(v3) AS v3 
                  FROM h2oData 
                  GROUP BY 
                  id6""").compute()

CPU times: user 1.65 s, sys: 534 ms, total: 2.18 s
Wall time: 8.88 s


In [20]:
check_query(q5, sql_q5, sort_by=['v1', 'v2', 'v3'])

In [21]:
%%time
ddf_q7 = ddf[["id3", "v1", "v2"]]
q7 = (
        ddf_q7.groupby("id3", dropna=False, observed=True)
        .agg({"v1": "max", "v2": "min"})
        .assign(range_v1_v2=lambda x: x["v1"] - x["v2"])[["range_v1_v2"]]
        .compute()
    )

CPU times: user 3.13 s, sys: 1.19 s, total: 4.32 s
Wall time: 38.2 s


In [22]:
%%time 
sql_q7 = c.sql("""SELECT id3, max(v1)-min(v2) AS range_v1_v2 
                  FROM h2oData 
                  GROUP BY 
                  id3""").compute()

CPU times: user 3.45 s, sys: 1.45 s, total: 4.9 s
Wall time: 46.4 s


In [23]:
check_query(q7, sql_q7, sort_by=['id3', 'range_v1_v2'])

In [24]:
%%time
ddf_q8 = ddf[["id6", "v1", "v2", "v3"]]
q8 = (
        ddf_q8[~ddf_q8["v3"].isna()][["id6", "v3"]]
        .groupby("id6", dropna=False, observed=True)
        .apply(
            lambda x: x.nlargest(2, columns="v3"),
            meta={"id6": "Int64", "v3": "float64"},
        )
        .compute()
    )

CPU times: user 13.9 s, sys: 4.19 s, total: 18.1 s
Wall time: 3min 3s


In [25]:
%%time 
sql_q8 = c.sql("""SELECT id6, v3 AS largest2_v3 
               FROM 
               (SELECT id6, v3, row_number()
               OVER 
               (PARTITION BY id6 ORDER BY v3 DESC) AS order_v3 
               FROM h2oData 
               WHERE v3 IS NOT NULL) 
               sub_query WHERE order_v3 <= 2""").compute()
q8 = q8.rename(columns={'v3': 'largest2_v3'})

CPU times: user 15.3 s, sys: 5.39 s, total: 20.7 s
Wall time: 3min 27s


In [45]:
check_query(q8, sql_q8, sort_by=['id6', 'largest2_v3'])