# Glue PySpark Job Sample with Push Down Predicate Feature & Partitioning Best Practices

This notebook provide examples on how using Push Down Predicate along with partitioned data set can reduce the time & resources required to process the data when using AWS Glue as a serverless PySpark executor. To run this sample notebook, you may either use Glue Development Endpoint along with a SageMaker notebook, or follow the blog post [here](https://aws.amazon.com/blogs/big-data/developing-aws-glue-etl-jobs-locally-using-a-container/) to run a local Jupyter notebook pre-configured with Glue libraries. To benchmark the results, as of the time running the jobs below, a container with 2GB memory available is being used.

The push down predicate feature is being used to test against the same set of data but being partitioned with two different approaches:
1. Date
2. Year/Month/Day

In [1]:
from pyspark import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *

glueContext = GlueContext(SparkContext.getOrCreate())

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
10,,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 1. Date-partitioned Data Set

Inspect the schema of the date-partitioned data set

In [7]:
spark.sql("describe devtest.date_partitioned_data").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|                bsid|   bigint|   null|
|               dtsid|   bigint|   null|
|                msid|   bigint|   null|
|                 msv|   bigint|   null|
|                   t|   bigint|   null|
|                   c|   bigint|   null|
|                   b|   bigint|   null|
|           file_date|   string|   null|
|          reading_dt|   string|   null|
|        reading_date|   string|   null|
|# Partition Infor...|         |       |
|          # col_name|data_type|comment|
|        reading_date|   string|   null|
+--------------------+---------+-------+

Trying to get the full data set first to inspect the details of the data.

In [8]:
full_data = glueContext.create_dynamic_frame.from_catalog(database = "devtest", table_name = "date_partitioned_data",
                                                                    transformation_ctx = "datasource_catalog")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
full_data.toDF().show()
print('Total records: ', full_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+------+-------+---+---+---+----------+--------------------+------------+----+-----+---+
|    bsid|dtsid|  msid|    msv|  t|  c|  b| file_date|          reading_dt|reading_date|year|month|day|
+--------+-----+------+-------+---+---+---+----------+--------------------+------------+----+-----+---+
|90020195|   13|   576|   1905|  0|  0|  0|2019-06-01|2019-05-31 13:40:...|  2019-05-31|2019|    5| 31|
|90020195|   20|   794|   1136|  0|  0|  1|2019-06-03|2019-06-02 17:02:...|  2019-06-02|2019|    6|  2|
|90020195|    3|   105|   1206|  0|  0|  0|2019-06-01|2019-05-31 02:27:...|  2019-05-31|2019|    5| 31|
|90020195|    8|   282|      0|  0|  0|  0|2019-06-02|2000-01-01 00:00:...|  2000-01-01|2000|    1|  1|
|82000086|    0|802203|5862267|  0|  0|  0|2019-06-03|2019-06-02 13:00:...|  2019-06-02|2019|    6|  2|
|90020195|    2|    47|   2457|  0|  0|  0|2019-06-02|2019-06-01 01:25:...|  2019-06-01|2019|    6|  1|
|82000086|    0|802203|5863484|  0|  0|  0|2019-06-04|2019-06-03

Found that getting the full set of data requires more than 2 minutes, and to convert the data from DynamicFrame to DataFrame would take another 3-5 minutes.

Next trying to leverage "push_down_predicate" to limit the Glue job to return 1 month (2019/5) of data using the between method.

In [10]:
exact_month_data = glueContext.create_dynamic_frame.from_catalog(database = "devtest", table_name = "date_partitioned_data", 
                                                                    push_down_predicate = "(reading_date between '2019-05-01'and '2019-05-31')", transformation_ctx = "datasource_catalog")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
exact_month_data.toDF().show()
print('Total records: ', exact_month_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+------+-------+---+---+---+----------+--------------------+------------+
|    bsid|dtsid|  msid|    msv|  t|  c|  b| file_date|          reading_dt|reading_date|
+--------+-----+------+-------+---+---+---+----------+--------------------+------------+
|90020195|    8|   294|   2466|  0|  0|  1|2019-06-01|2019-05-31 04:28:...|  2019-05-31|
|90020195|    1|    64|   2133|  0|  0|  0|2019-06-01|2019-05-31 23:29:...|  2019-05-31|
|90020195|    5|   201|   1173|  0|  0|  0|2019-06-01|2019-05-31 01:32:...|  2019-05-31|
|90020195|    4|   134|   1640|  0|  0|  0|2019-06-01|2019-05-31 23:04:...|  2019-05-31|
|90020195|   15|   616|   2201|  0|  0|  1|2019-06-01|2019-05-31 12:16:...|  2019-05-31|
|82000086|    0|802203|5860144|  0|  0|  0|2019-06-01|2019-05-31 20:00:...|  2019-05-31|
|90020195|    9|   395|   4121|  0|  0|  1|2019-06-01|2019-05-31 08:19:...|  2019-05-31|
|90020195|   24|   994|   3151|  0|  0|  1|2019-06-01|2019-05-31 23:28:...|  2019-05-31|
|90020195|    3|   14

The time required to get the data reduced from around 150 seconds to around 30 seconds now.

Then let's further try pushing down the predicate to an exact date.

In [12]:
exact_date_data = glueContext.create_dynamic_frame.from_catalog(database = "devtest", table_name = "date_partitioned_data",
                                                                push_down_predicate = "(reading_date == '2019-05-29')", transformation_ctx = "datasource_catalog")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
exact_date_data.toDF().show()
print('Total records: ', exact_date_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+----+----+---+---+---+----------+--------------------+------------+
|    bsid|dtsid|msid| msv|  t|  c|  b| file_date|          reading_dt|reading_date|
+--------+-----+----+----+---+---+---+----------+--------------------+------------+
|90020195|   24| 962|  25|  0|  0|  1|2019-06-03|2019-05-29 19:45:...|  2019-05-29|
|90020195|   16| 674|2800|  0|  0|  0|2019-06-03|2019-05-29 12:25:...|  2019-05-29|
|90020195|   17| 705|1070|  0|  0|  0|2019-06-02|2019-05-29 16:45:...|  2019-05-29|
|90020195|   17| 705|1070|  0|  0|  0|2019-06-01|2019-05-29 16:45:...|  2019-05-29|
|90020195|   17| 705|1070|  0|  0|  0|2019-06-04|2019-05-29 16:45:...|  2019-05-29|
|90020195|   24| 962|  25|  0|  0|  1|2019-06-04|2019-05-29 19:45:...|  2019-05-29|
|90020195|   17| 705|1070|  0|  0|  0|2019-06-03|2019-05-29 16:45:...|  2019-05-29|
|90020195|   16| 674|2800|  0|  0|  0|2019-06-04|2019-05-29 12:25:...|  2019-05-29|
|90020195|    3| 168|4113|  0|  0|  0|2019-06-04|2019-05-29 04:01:...|  2019

It takes just 5-6 seconds to actually get the data from an exact date.

Hence a short conclusion here is, without using "push_down_predicate", while the data is being partitioned, it may create resource wastage in running the Glue job. It is also important to select the right partition key with reference from how the expected ETL process is going to target a specific range of data by columns / features. As an illustration, we are now going to transform the data to a year/month/day partitioning approach and illustrate how to transfrom the DataFrame with PySpark.

## 2. Year/Month/Day-partitioned Data Set

There are native PySpark functions that can be leveraged for targeting date / timestamp data type, to easily capture the specific date / time values to create new columns.

Below illustrated when "reading_date" is a timestamp value, we can use **year("reading_date")** to capture the year value, and same for creating the columns for month and day.

In case your data did not come in as a date / timestamp data type, you may leverage "to_date" / "to_timestamp" PySpark functions.

For example: **to_timestamp('201905290812', 'yyyyMMddHHmm')**

In [55]:
from pyspark.sql.functions import year, month, dayofmonth

repartitioned_full_data = full_data.toDF().withColumn("Year", year("reading_date")).withColumn("Month", month("reading_date")).withColumn("Day", dayofmonth("reading_date"))
repartitioned_full_data = repartitioned_full_data.repartition("Year", "Month", "Day")
print('partitions added: ', repartitioned_full_data.rdd.getNumPartitions())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

partitions added:  200

Next is to write the data back into S3 with a new partitioning approach. In order to write data in partitions with Glue, when writing the DynamicFrame, specify the Partition Keys in "connection_options" with format of **"partitionKeys":["ColumnName"]**

In [None]:
from awsglue.dynamicframe import DynamicFrame

ymd_partitioned_dyf = DynamicFrame.fromDF(repartitioned_full_data, glueContext, "ymd_partitioned_dyf")

datasink_s3 = glueContext.write_dynamic_frame.from_options(frame = ymd_partitioned_dyf, 
                                                           connection_type = "s3", 
                                                           connection_options = {"path": "s3://glue-devtest-bucket/ymd_partitioned_data", 
                                                                                 "partitionKeys":["Year", "Month", "Day"]}, format = "parquet", transformation_ctx = "datasink_s3")

Now examine the new data set schema. We can see the Partition Information section has changed from using the reading_date as the partition key to year, month and day.

In [16]:
spark.sql("describe devtest.ymd_partitioned_data").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|                bsid|   bigint|   null|
|               dtsid|   bigint|   null|
|                msid|   bigint|   null|
|                 msv|   bigint|   null|
|                   t|   bigint|   null|
|                   c|   bigint|   null|
|                   b|   bigint|   null|
|           file_date|   string|   null|
|          reading_dt|   string|   null|
|        reading_date|   string|   null|
|                year|   string|   null|
|               month|   string|   null|
|                 day|   string|   null|
|# Partition Infor...|         |       |
|          # col_name|data_type|comment|
|                year|   string|   null|
|               month|   string|   null|
|                 day|   string|   null|
+--------------------+---------+-------+

A quick experiment here is to get a month of data out of this newly partitioned data set. Unlike previously using the between method, now we can specific an exact year & month value for the "push_down_predicate" parameter.

In [19]:
exact_month_data = glueContext.create_dynamic_frame.from_catalog(database = "devtest", table_name = "ymd_partitioned_data", 
                                                                 push_down_predicate = "(year == '2019'and month == '5')", transformation_ctx = "datasource_catalog")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Despite there are additional columns generated due to the new partitioning method, the time required to get the data is slighly faster for a second.

In [20]:
exact_month_data.toDF().show()
print('Total records: ', exact_month_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+------+-------+---+---+---+----------+--------------------+------------+----+-----+---+
|    bsid|dtsid|  msid|    msv|  t|  c|  b| file_date|          reading_dt|reading_date|year|month|day|
+--------+-----+------+-------+---+---+---+----------+--------------------+------------+----+-----+---+
|90020195|   11|   464|   1250|  0|  0|  1|2019-06-01|2019-05-31 09:48:...|  2019-05-31|2019|    5| 31|
|90020195|    9|   409|   3731|  0|  0|  1|2019-06-01|2019-05-31 06:24:...|  2019-05-31|2019|    5| 31|
|90020195|   13|   579|   3267|  0|  0|  1|2019-06-01|2019-05-31 11:16:...|  2019-05-31|2019|    5| 31|
|90020195|   13|   560|   2974|  0|  0|  1|2019-06-01|2019-05-31 13:11:...|  2019-05-31|2019|    5| 31|
|90020195|    4|   142|   3014|  0|  0|  0|2019-06-01|2019-05-31 01:31:...|  2019-05-31|2019|    5| 31|
|90020195|    8|   340|   3814|  0|  0|  1|2019-06-01|2019-05-31 15:00:...|  2019-05-31|2019|    5| 31|
|90020195|    7|   291|   1623|  0|  0|  1|2019-06-01|2019-05-31

While converting the DynamicFrame to DataFrame takes relatively longer here with a few seconds more, mainly because the additional columns are present.

## 3. Final Comparisons

To conclude, we will try one more round to compare across 3 approaches to achieve the same results of getting an exact date of data:

1. Without using "push_down_predicate" but filter the date with DynamicFrame native functions
2. Using "push_down_predicate" to filter the data returned from Glue
3. Using "push_down_predicate" on a year/month/day-parttioned data set

In [53]:
full_data = glueContext.create_dynamic_frame.from_catalog(database = "devtest", table_name = "ymd_partitioned_data",
                                                                    transformation_ctx = "datasource_catalog")
exact_date_data = Filter.apply(frame = full_data, f = lambda x: x["reading_date"] in ['2019-05-29'])
exact_date_data.toDF().show()
print('Total records: ', exact_date_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+---+--------+----+--------------------+----+-----+----+---+----------+-----+------------+---+
|  b|  t|    bsid| msv|          reading_dt|year|dtsid|msid|  c| file_date|month|reading_date|day|
+---+---+--------+----+--------------------+----+-----+----+---+----------+-----+------------+---+
|  0|  0|90020195|1070|2019-05-29 16:45:...|2019|   17| 705|  0|2019-06-02|    5|  2019-05-29| 29|
|  0|  0|90020195|2800|2019-05-29 12:25:...|2019|   16| 674|  0|2019-06-01|    5|  2019-05-29| 29|
|  0|  0|90020195|1070|2019-05-29 16:45:...|2019|   17| 705|  0|2019-06-03|    5|  2019-05-29| 29|
|  0|  0|90020195|1070|2019-05-29 16:45:...|2019|   17| 705|  0|2019-06-04|    5|  2019-05-29| 29|
|  0|  0|90020195|4113|2019-05-29 04:01:...|2019|    3| 168|  0|2019-06-04|    5|  2019-05-29| 29|
|  1|  0|90020195|  25|2019-05-29 19:45:...|2019|   24| 962|  0|2019-06-03|    5|  2019-05-29| 29|
|  1|  0|90020195|  25|2019-05-29 19:45:...|2019|   24| 962|  0|2019-06-01|    5|  2019-05-29| 29|
|  0|  0|9

In [57]:
exact_date_data = glueContext.create_dynamic_frame.from_catalog(database = "devtest", table_name = "date_partitioned_data",
                                                                push_down_predicate = "(reading_date == '2019-05-29')", transformation_ctx = "datasource_catalog")
exact_date_data.toDF().show()
print('Total records: ', exact_date_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+----+----+---+---+---+----------+--------------------+------------+
|    bsid|dtsid|msid| msv|  t|  c|  b| file_date|          reading_dt|reading_date|
+--------+-----+----+----+---+---+---+----------+--------------------+------------+
|90020195|   24| 962|  25|  0|  0|  1|2019-06-03|2019-05-29 19:45:...|  2019-05-29|
|90020195|   16| 674|2800|  0|  0|  0|2019-06-03|2019-05-29 12:25:...|  2019-05-29|
|90020195|   17| 705|1070|  0|  0|  0|2019-06-02|2019-05-29 16:45:...|  2019-05-29|
|90020195|   17| 705|1070|  0|  0|  0|2019-06-01|2019-05-29 16:45:...|  2019-05-29|
|90020195|   17| 705|1070|  0|  0|  0|2019-06-04|2019-05-29 16:45:...|  2019-05-29|
|90020195|   24| 962|  25|  0|  0|  1|2019-06-04|2019-05-29 19:45:...|  2019-05-29|
|90020195|   17| 705|1070|  0|  0|  0|2019-06-03|2019-05-29 16:45:...|  2019-05-29|
|90020195|   16| 674|2800|  0|  0|  0|2019-06-04|2019-05-29 12:25:...|  2019-05-29|
|90020195|    3| 168|4113|  0|  0|  0|2019-06-04|2019-05-29 04:01:...|  2019

In [56]:
exact_date_data = glueContext.create_dynamic_frame.from_catalog(database = "devtest", table_name = "ymd_partitioned_data",
                                                                push_down_predicate = "(year == '2019'and month == '5'and day == '29')", transformation_ctx = "datasource_catalog")
exact_date_data.toDF().show()
print('Total records: ', exact_date_data.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+----+----+---+---+---+----------+--------------------+------------+----+-----+---+
|    bsid|dtsid|msid| msv|  t|  c|  b| file_date|          reading_dt|reading_date|year|month|day|
+--------+-----+----+----+---+---+---+----------+--------------------+------------+----+-----+---+
|90020195|   16| 674|2800|  0|  0|  0|2019-06-01|2019-05-29 12:25:...|  2019-05-29|2019|    5| 29|
|90020195|   24| 962|  25|  0|  0|  1|2019-06-01|2019-05-29 19:45:...|  2019-05-29|2019|    5| 29|
|90020195|    3| 168|4113|  0|  0|  0|2019-06-03|2019-05-29 04:01:...|  2019-05-29|2019|    5| 29|
|90020195|   24| 962|  25|  0|  0|  1|2019-06-02|2019-05-29 19:45:...|  2019-05-29|2019|    5| 29|
|90020195|   17| 705|1070|  0|  0|  0|2019-06-03|2019-05-29 16:45:...|  2019-05-29|2019|    5| 29|
|90020195|    3| 168|4113|  0|  0|  0|2019-06-01|2019-05-29 04:01:...|  2019-05-29|2019|    5| 29|
|90020195|   24| 962|  25|  0|  0|  1|2019-06-04|2019-05-29 19:45:...|  2019-05-29|2019|    5| 29|
|90020195|

Results:
1. Without using "push_down_predicate" but filter the date with DynamicFrame native functions (5m55s)
2. Using "push_down_predicate" to filter the data returned from Glue (7.43s)
3. Using "push_down_predicate" on a year/month/day-parttioned data set (7.59s)

For this particular data set, using an exact date is slightly faster

The general conclusion is, if there are a lot more dates presented, the performance enhancement of getting data from a time range / specific date would be more efficient by year/month/day approach, whereas in scenario where the distinct dates are relatively small, using the date itself as a partition key may help in reducing the number of partitions created and hence may produce faster results.