# Parquet Pushdown Filters

Filters can be applied to parquet files to reduce the volume of the data loaded. In particular parquet objects support partition filters and regular row filtering. Spark dags if proprerly constructed can push down some of the filters to the parquet object reader. Here below you will fine a number of test cases when this works correctly and a number of scenario's where filters pushdown does not apply.  

In [16]:
import datafaucet as dfc

In [17]:
engine = dfc.engine('spark')
spark = engine.context

### Create a sample dataframe

In [19]:
df = dfc.range(10000).cols.create('g').randchoice([0,1,2,3])
df.cols.groupby('g').agg('count').data.grid()

Unnamed: 0,g,id
0,0,2520
1,1,2544
2,3,2432
3,2,2504


### Save data as a parquet object

In [20]:
df.repartition('g').save('local', 'groups.parquet');

In [21]:
dfc.list('data/save/groups.parquet').data.grid()

Unnamed: 0,name,type
0,g=2,DIRECTORY
1,g=1,DIRECTORY
2,g=3,DIRECTORY
3,g=0,DIRECTORY
4,_SUCCESS,FILE
5,._SUCCESS.crc,FILE


### Read data parquet objects

In [22]:
df = dfc.load('data/save/groups.parquet')

#### Debugging the physical query plan

Here below we are going to debug the query plan. This can be done with the dataframe method `.explain()`

In [25]:
df.explain()

To keep things simple let's focus only on the Parquet File Reader. In particular the function `explainSource(obj)` here below parses and prints out only some of the file reader parameters relevant for parquet filter and partition filter pushdown

In [26]:
def explainSource(obj):
    for s in obj._jdf.queryExecution().simpleString().split('\n'):
        if 'FileScan' in s:
            params = [
                'Batched', 
                'Format', 
                'Location',
                'PartitionCount', 
                'PartitionFilters', 
                'PushedFilters',
                'ReadSchema']
            
            # (partial) parse the Filescan string
            res = {}
            # preamble
            first, _, rest = s.partition(f'{params[0]}:')
            # loop
            for i in range(len(params[1:])):
                first, _, rest = rest.partition(f'{params[i+1]}:')
                res[params[i]]=first[1:-2]
            # store last
            res[params[-1]]=rest[1:]
            
            # hide location data, not relevant here
            del res['Location']
            
            return dfc.yaml.YamlDict(res)

### Testing Pushdown

This first test does not filter anything. However as you see the partitionj variable `g` is materialized in directories and does not appear in the readSchema, which only includes those columns which are not partitions

In [30]:
### No pushdown on the physical plan
explainSource(df)

Batched: 'true'
Format: Parquet
PartitionCount: '4'
PartitionFilters: '[]'
PushedFilters: '[]'
ReadSchema: struct<id:bigint>

Counting does not require any column, therefore the next one effectely just count data-less rows

In [31]:
### Pushdown only column selection
res = df.groupby('g').count()
explainSource(res)

Batched: 'true'
Format: Parquet
PartitionCount: '4'
PartitionFilters: '[]'
PushedFilters: '[]'
ReadSchema: struct<>

Filtering on a column which is not a partition triggers a columnar filter during read

In [32]:
# push down row filter only but take all partitions
res = df.filter('id>100')
explainSource(res)

Batched: 'true'
Format: Parquet
PartitionCount: '4'
PartitionFilters: '[]'
PushedFilters: '[IsNotNull(id), GreaterThan(id,100)]'
ReadSchema: struct<id:bigint>

Filters can be combined. For example here below a partition and a row (columnar) filter are part of the same filter statement

In [14]:
# pushdown partition filters and row (columnar) filters
res = df.filter('id>100 and g=1').groupby('g').count()
explainSource(res)

Batched: 'true'
Format: Parquet
PartitionCount: '1'
PartitionFilters: '[isnotnull(g#92), (g#92 = 1)]'
PushedFilters: '[IsNotNull(id), GreaterThan(id,100)]'
ReadSchema: struct<id:bigint>

Filters can combined with logical operators

In [36]:
# pushdown partition filters and row (columnar) filters
res = df.filter('id>100 and (g=2 or g=3)').groupby('g').count()
explainSource(res)

Batched: 'true'
Format: Parquet
PartitionCount: '2'
PartitionFilters: '[((g#265 = 2) || (g#265 = 3))]'
PushedFilters: '[IsNotNull(id), GreaterThan(id,100)]'
ReadSchema: struct<id:bigint>

Partition filters can also be greater-than and less-than predicates

In [37]:
# pushdown partition filters and row (columnar) filters
res = df.filter('id>100 and g>1').groupby('g').count()
explainSource(res)

Batched: 'true'
Format: Parquet
PartitionCount: '2'
PartitionFilters: '[isnotnull(g#265), (g#265 > 1)]'
PushedFilters: '[IsNotNull(id), GreaterThan(id,100)]'
ReadSchema: struct<id:bigint>

Filters can be heaped up and cascaded. Effectively adding more filters will `AND`'ed together

In [38]:
# pushdown partition filters and row (columnar) filters can be added up
res = df.filter('id>100 and g>1').filter('id<500 and g=2').groupby('g').count()
explainSource(res)

Batched: 'true'
Format: Parquet
PartitionCount: '1'
PartitionFilters: '[isnotnull(g#265), (g#265 > 1), (g#265 = 2)]'
PushedFilters: '[IsNotNull(id), GreaterThan(id,100), LessThan(id,500)]'
ReadSchema: struct<id:bigint>

### When pushdown filters are NOT applied.

#### Avoid caching and actions of read data 
Avoid cache(), count() or other action on data, as they will act as a "wall" for filter operations to be pushed down the parquet reader. On the contrary, registering the dataframe as a temorary table is OK. Please be aware that these operation could be hidden in your function call stack, so be always sure that the filters are as close as possible to the read operation.

#### Spark will only read the same data once per session
Once a parquet file has been read in a cached/unfiltered way, any subsequent read operation will fail to push down the filters, as spark assumes that the data has already been loaded once.

In [39]:
df = dfc.load('data/save/groups.parquet')
df.cache()

DataFrame[id: bigint, g: int]

In [40]:
# pushdown partition filters and row (columnar) filters are ignored after cache, count, and the like
res = df.filter('id>100 and g=1').groupby('g').count()
explainSource(res)

Batched: 'true'
Format: Parquet
PartitionCount: '4'
PartitionFilters: '[]'
PushedFilters: '[]'
ReadSchema: struct<id:bigint>

In [41]:
# re-read will not push down the filters ...
df = dfc.load('data/save/groups.parquet')

In [42]:
# pushdown partition filters and row (columnar) filters are ignored after cache, count, and the like
res = df.filter('id>100 and g=1').groupby('g').count()
explainSource(res)

Batched: 'true'
Format: Parquet
PartitionCount: '4'
PartitionFilters: '[]'
PushedFilters: '[]'
ReadSchema: struct<id:bigint>