# Optimization 3: Query Optimization


Historically Dask has focused on being flexible with smart scheduling rather than focus on query optimization

- High Level (logical plan): Spark ✅ Dask ❌
- Low Level (physical plan): Spark ❌ Dask ✅

We're building back in logical plans for Dask Array / DataFrame / Xarray now

In [3]:
import dask
import dask.dataframe as dd
import dask_expr as dx

dask.config.set({"dataframe.convert-string": True})


df = dd.read_parquet("taxi.parquet")
out = df[df.tips != 0].sum(numeric_only=True)["tips"]

In [2]:
%time out.compute()


CPU times: user 5.25 s, sys: 1.38 s, total: 6.63 s
Wall time: 6.65 s


tips    23528836.72
dtype: float64

In [9]:
df = dx.read_parquet("taxi.parquet")
out = df[df.tips != 0].sum(numeric_only=True)["driver_pay"]

In [10]:
%time out.compute()

CPU times: user 427 ms, sys: 47.2 ms, total: 474 ms
Wall time: 472 ms


89761049.07999997

In [17]:
out

<dask_expr.expr.Scalar: expr=((Filter(frame=ReadParquet(83d11fd), predicate=ReadParquet(83d11fd)['tips'] != 0)).sum(numeric_only=True))['driver_pay']>

In [16]:
out.pprint()

Projection: columns='driver_pay'
  Sum: numeric_only=True
    Filter:
      ReadParquet: path='taxi.parquet' kwargs={'dtype_backend': None}
      NE: right=0
        Projection: columns='tips'
          ReadParquet: path='taxi.parquet' kwargs={'dtype_backend': None}


In [15]:
out.simplify().pprint()

Sum: numeric_only=True
  ReadParquet: path='taxi.parquet' columns=['driver_pay'] filters=[[('tips', '!=', 0)]] kwargs={'dtype_backend': None} _series=True


NYC Uber/Lyft Rides
===================

The NYC Taxi dataset is a timeless classic.  

Interestingly there is a new variant.  The NYC Taxi and Livery Commission requires data from all ride-share services in the city of New York.  This includes private limosine services, van services, and a new category "High Volume For Hire Vehicle" services, those that dispatch 10,000 rides per day or more.  This is a special category defined for Uber and Lyft.  

This data is available here:

In [57]:
import coiled

cluster = coiled.Cluster(
    n_workers=15,
    name="uber-lyft-dask-expr-2",
    region="us-east-2",
    shutdown_on_close=False,
    account="dask-engineering",
)

client = cluster.get_client()

Output()

Output()

In [25]:
%%time

import dask
import pandas as pd
from dask_expr import read_parquet

dask.config.set({"dataframe.convert-string": True})

df = read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True},
)

df = df.replace(1, 100).fillna({"tips": 100, "driver_pay": 100, "base_passenger_fare": 100})

df["tipped"] = df.tips != 0
df["dp_gt"] = df.driver_pay > 0
df["bpf_lt"] = df.base_passenger_fare < 0

q = df[["tipped", "dp_gt", "bpf_lt", "tips", "base_passenger_fare", "driver_pay"]].sum()
q.compute()

CPU times: user 319 ms, sys: 37.1 ms, total: 356 ms
Wall time: 34.5 s


tipped                 1.245332e+08
dp_gt                  7.477930e+08
bpf_lt                 1.208497e+06
tips                   2.138438e+09
base_passenger_fare    1.581905e+10
driver_pay             1.279459e+10
dtype: float64

In [56]:
import numpy as np


df = read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True},
)
df = df.replace(1, 100).select_dtypes(include=[np.number]).fillna(100)
#df["tipped"] = df.tips != 0
#df["dp"] = df.driver_pay != 0


q = df[["tips", "driver_pay"]].sum()

len(q.optimize(fuse=True).__dask_graph__())


1442

2023-07-07 22:00:33,259 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client


In [53]:
%%time



CPU times: user 126 ms, sys: 10.3 ms, total: 137 ms
Wall time: 5.87 s


tipped        1.245332e+08
dp            7.478095e+08
tips          2.138438e+09
driver_pay    1.279459e+10
dtype: float64