### Filtered Reading with cuDF and dask-cuDF

https://gist.github.com/ayushdg/f3d96ede8c3bcfb55482148aa01750b3

In [1]:
nyt_parq_path = "./nytaxi.parquet"

In [2]:
# Pull the nytaxi parquet dataset from a Public Google cloud bucket hosted by anaconda
import gcsfs

fs = gcsfs.GCSFileSystem()
print("Downloading Data....")
fs.get("gcs://anaconda-public-data/nyc-taxi/nyc.parquet", nyt_parq_path, recursive=True)
print("Done!")

Downloading Data....
Done!


In [3]:
import cupy

# choose which GPU to use
#cupy.cuda.Device(1).use()

In [4]:
### Let's read one file and look at the data
import os
import cudf

df = cudf.read_parquet(os.path.join(nyt_parq_path, "part.0.parquet"))
df.head()

Unnamed: 0,tpep_pickup_datetime,VendorID,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,2015-01-01 00:00:00,1,2015-01-01 00:11:26,5,4.0,-73.971436,40.760201,1,N,-73.921181,40.768269,2,13.5,0.5,0.5,0.0,0.0,0.0,14.5
1,2015-01-01 00:00:00,2,2015-01-01 00:00:00,1,1.68,-73.991547,40.750069,1,N,0.0,0.0,2,10.0,0.0,0.5,0.0,0.0,0.3,10.8
2,2015-01-01 00:00:00,2,2015-01-01 00:00:00,3,1.56,-74.00132,40.729057,1,N,-74.010208,40.719662,1,7.5,0.5,0.5,0.0,0.0,0.3,8.8
3,2015-01-01 00:00:01,1,2015-01-01 00:03:49,1,0.8,-73.860847,40.757294,1,N,-73.868111,40.752285,2,5.0,0.5,0.5,0.0,0.0,0.0,6.3
4,2015-01-01 00:00:03,2,2015-01-01 00:21:48,2,2.57,-73.969017,40.754269,1,N,-73.994133,40.7616,2,14.5,0.5,0.5,0.0,0.0,0.3,15.8


Let's say we want to get the VendorID and passenger_count for all trips started in the month of january

In [5]:
from glob import glob
import numpy as np
import pandas as pd

date = pd.Timestamp("2015-02-01 00:00:00")

Let's say we want to get the `VendorID` and `passenger_count` for all trips started in the month of January.

Approach 1: Read specific columns and Filter

In [6]:
%%time

# Without predicate pushdown
df = cudf.read_parquet(nyt_parq_path, columns=["tpep_pickup_datetime", "VendorID", "passenger_count"])
print(f"Rows read: {len(df)}")
df = df[df['tpep_pickup_datetime'] < date]
print(f"Rows after filtering: {len(df)}")

Rows read: 146112989
Rows after filtering: 12748986
CPU times: user 776 ms, sys: 480 ms, total: 1.26 s
Wall time: 1.39 s


Peak Memory Usage: ~8000 MB

---
Approach 2: Filtered Reading using Predicates - The values in `tpep_pickup_datetime` are not completely sorted but roughly ordered by time. This ordering makes it a good candidate for statistics based filtering as row chunks may be skipped wherever the requested range lies outside the statistics of that chunk.

Note: Using the `filters` argument allows skipping row_groups which don't match the filter. There might still be row_groups read in that have the data to be filtered.

In [7]:
%%time

# With predicate pushdown
df = cudf.read_parquet(nyt_parq_path, 
                       columns=["tpep_pickup_datetime", "VendorID", "passenger_count"], 
                       filters=[("tpep_pickup_datetime", "<", date)])
print(f"Rows read: {len(df)}")
df = df[df['tpep_pickup_datetime'] < date]
print(f"Rows after filtering: {len(df)}")

Rows read: 12749062
Rows after filtering: 12748986
CPU times: user 312 ms, sys: 72 ms, total: 384 ms
Wall time: 382 ms


Peak Memory Usage: \~1800 MB

In this case the amount of rows read with Predicate based fitlering was \~12.7 Million rows, 11x lesser than the \~146 Million rows in the dataset. The subsequent filtering operation (at the the dataframe level), only filters a few rows (\~1000) for values outside the predicate condition present in the row_chunks read in.

#### Using Predicate Filters with dask-cuDF

In [8]:
# Startup a cluster
from dask_cuda import LocalCUDACluster
from distributed import Client, wait
import dask_cudf

cluster = LocalCUDACluster()
client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 38858 instead
  http_address["port"], self.http_server.port


0,1
Client  Scheduler: tcp://127.0.0.1:37358  Dashboard: http://127.0.0.1:38858/status,Cluster  Workers: 2  Cores: 2  Memory: 190.14 GB


In [9]:
%%time

# Without predicate pushdown
ddf = dask_cudf.read_parquet(nyt_parq_path,
                             split_row_groups=False, 
                             index=False)
ddf = ddf.persist()
wait(ddf)
print(f"Rows read: {len(ddf)}")

ddf = ddf[ddf.tpep_pickup_datetime < date]
ddf = ddf.persist()
wait(ddf)
print(f"Rows after filtering: {len(ddf)}")

Rows read: 146112989
Rows after filtering: 12748986
CPU times: user 1.54 s, sys: 156 ms, total: 1.7 s
Wall time: 20 s


Peak Memory Usage: ~6800 MB/GPU (4 GPUs total)

In [10]:
%%time

# With predicate pushdown

ddf = dask_cudf.read_parquet(nyt_parq_path, 
                             filters=[("tpep_pickup_datetime", "<", pd.Timestamp("2015-02-01 00:00:00", tz="UTC"))], 
                             index=False,
                             split_row_groups=False)
ddf = ddf.persist()
wait(ddf)
print(f"Rows read: {len(ddf)}")

ddf = ddf[ddf.tpep_pickup_datetime < date]
ddf = ddf.persist()
wait(ddf)
print(f"Rows after filtering: {len(ddf)}")

Rows read: 12749062
Rows after filtering: 12748986
CPU times: user 188 ms, sys: 24 ms, total: 212 ms
Wall time: 1.68 s


Peak Memory Usage: ~1400 MB/GPU (4 GPUs total)

---
Writing the dataset to Apache ORC format


In [11]:
nyt_orc_path = "./nytaxi.orc"

In [12]:
ddf = dask_cudf.read_parquet(nyt_parq_path, 
                             index=False,
                             split_row_groups=False)
ddf.to_orc(nyt_orc_path)
print("Done!")

Done!


In [13]:
%%time

# Without predicate pushdown
ddf = dask_cudf.read_orc(os.path.join(nyt_orc_path,"*.orc"))
ddf = ddf.persist()
wait(ddf)
print(f"Rows read: {len(ddf)}")

ddf = ddf[ddf.tpep_pickup_datetime < date]
ddf = ddf.persist()
wait(ddf)
print(f"Rows after filtering: {len(ddf)}")

Rows read: 146112989
Rows after filtering: 12748986
CPU times: user 2.9 s, sys: 316 ms, total: 3.22 s
Wall time: 24.7 s


---
Predicate filtering with Apache ORC files

In [14]:
%%time

# Without predicate pushdown
ddf = dask_cudf.read_orc(os.path.join(nyt_orc_path,"*.orc"), 
                         filters=[("tpep_pickup_datetime", "<", pd.Timestamp("2015-02-01 00:00:00", tz="UTC"))]
                        )
ddf = ddf.persist()
wait(ddf)
print(f"Rows read: {len(ddf)}")

ddf = ddf[ddf.tpep_pickup_datetime < date]
ddf = ddf.persist()
wait(ddf)
print(f"Rows after filtering: {len(ddf)}")

Rows read: 12749062
Rows after filtering: 12748986
CPU times: user 396 ms, sys: 116 ms, total: 512 ms
Wall time: 2.74 s


### More complex predicates

Filters are represented in DNF.

Filters -> List[(Tuple)], List[List(Tuple)]

- Each tuple is a predicate for a specific column
- Tuples within a list are considered to be conjunctions (AND) of predicates 
- A List of tuples are considered to be a disjunction (OR) with other list of tuples.

Eg: 

When selecting all trips in the month of January and December with less than 3 passengers could be represented as follows 

```
(tpep_pickup_datetime < 2015-02-01 | tpep_pickup_datetime >= 2015-12-01) & (passenger_count < 3)
```

In this case distributing the conjunction over the disjunctions we get the DNF:

```
((tpep_pickup_datetime < 2015-02-01) & (passenger_count < 3)) | 
((tpep_pickup_datetime >= 2015-12-01) & (passenger_count < 3))
```


would translate to the following in code:

In [15]:
%%time

date1 = pd.Timestamp("2015-02-01 00:00:00", tz="UTC")
date2 = pd.Timestamp("2015-12-01 00:00:00", tz="UTC")
passenger_filter = 3

filters = [
            [("tpep_pickup_datetime", "<" , date1),("passenger_count", "<", passenger_filter)], 
            [("tpep_pickup_datetime", ">=", date2), ("passenger_count", "<", passenger_filter)]
          ]

# Without predicate pushdown
ddf = dask_cudf.read_orc(os.path.join(nyt_orc_path,"*.orc"), 
                         filters=filters,
                        )
ddf = ddf.persist()
wait(ddf)
print(f"Rows read: {len(ddf)}")

ddf = ddf[((ddf.tpep_pickup_datetime < date1) | (ddf.tpep_pickup_datetime >= date2)) & (ddf.passenger_count < 3)]
ddf = ddf.persist()
wait(ddf)
print(f"Rows after filtering: {len(ddf)}")

Rows read: 24253006
Rows after filtering: 20523212
CPU times: user 952 ms, sys: 132 ms, total: 1.08 s
Wall time: 5.12 s
