###### Data partitioning is critical to data processing performance especially for large volume of data processing in Spark.
###### pyspark : How to write dataframe partition by year/month/ sub-directory?

In [0]:
df_emp_csv  = spark.read.option("nullValue","null").csv("/FileStore/tables/emp.csv",header=True,inferSchema=True)
display(df_emp_csv)

EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO
7369.0,SMITH,CLERK,7902.0,17-12-1980,800.0,,20.0
7499.0,ALLEN,SALESMAN,7698.0,20-02-1981,1600.0,300.0,30.0
7521.0,WARD,SALESMAN,7698.0,22-02-1981,1250.0,500.0,30.0
7566.0,JONES,MANAGER,7839.0,04-02-1981,2975.0,,20.0
7654.0,MARTIN,SALESMAN,7698.0,21-09-1981,1250.0,1400.0,30.0
7698.0,SGR,MANAGER,7839.0,05-01-1981,2850.0,,30.0
7782.0,RAVI,MANAGER,7839.0,06-09-1981,2450.0,,10.0
7788.0,SCOTT,ANALYST,7566.0,19-04-1987,3000.0,,20.0
7839.0,KING,PRESIDENT,,01-11-1981,5000.0,,10.0
7844.0,TURNER,SALESMAN,7698.0,09-08-1981,1500.0,0.0,30.0


In [0]:
from pyspark.sql.functions import to_date
#Change string to Date DataType
df_emp_csv = df_emp_csv.withColumn("HIREDATE",to_date("HIREDATE",'dd-MM-yyyy')).fillna({"HIREDATE":'9999-12-31'})
df_emp_csv.show()

+-----+------+---------+----+----------+----+----+------+
|EMPNO| ENAME|      JOB| MGR|  HIREDATE| SAL|COMM|DEPTNO|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-02-04|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-09-21|1250|1400|    30|
| 7698|   SGR|  MANAGER|7839|1981-01-05|2850|null|    30|
| 7782|  RAVI|  MANAGER|7839|1981-09-06|2450|null|    10|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-01|5000|null|    10|
| 7844|TURNER| SALESMAN|7698|1981-08-09|1500|   0|    30|
| 7876| ADAMS|    CLERK|7788|1987-05-23|1100|null|    20|
| 7900| JAMES|    CLERK|7698|1981-03-12| 950|null|    30|
| 7902|  FORD|  ANALYST|7566|1981-03-12|3000|null|    20|
| 7934|MILLER|    CLERK|7782|1982-03-01|1300|null|    10|
| 1234|SEKHAR|

In [0]:
from pyspark.sql.functions import date_format
#creating two YEAR and MONTH new columns based on hiredate date field
df_emp_csv = df_emp_csv.withColumn("YEAR",date_format("HIREDATE",'yyyy')).withColumn("MONTH",date_format("HIREDATE",'MM'))
df_emp_csv.show()

+-----+------+---------+----+----------+----+----+------+----+-----+
|EMPNO| ENAME|      JOB| MGR|  HIREDATE| SAL|COMM|DEPTNO|YEAR|MONTH|
+-----+------+---------+----+----------+----+----+------+----+-----+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|1980|   12|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|1981|   02|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250| 500|    30|1981|   02|
| 7566| JONES|  MANAGER|7839|1981-02-04|2975|null|    20|1981|   02|
| 7654|MARTIN| SALESMAN|7698|1981-09-21|1250|1400|    30|1981|   09|
| 7698|   SGR|  MANAGER|7839|1981-01-05|2850|null|    30|1981|   01|
| 7782|  RAVI|  MANAGER|7839|1981-09-06|2450|null|    10|1981|   09|
| 7788| SCOTT|  ANALYST|7566|1987-04-19|3000|null|    20|1987|   04|
| 7839|  KING|PRESIDENT|null|1981-11-01|5000|null|    10|1981|   11|
| 7844|TURNER| SALESMAN|7698|1981-08-09|1500|   0|    30|1981|   08|
| 7876| ADAMS|    CLERK|7788|1987-05-23|1100|null|    20|1987|   05|
| 7900| JAMES|    CLERK|7698|1981-

In [0]:
# using PartitionBy with two columns creating partitions
df_emp_csv.write.format("delta").partitionBy("YEAR","MONTH").mode("overwrite").saveAsTable("emp_part")

In [0]:
%fs ls /user/hive/warehouse/emp_part/YEAR=1980/MONTH=12/

path,name,size,modificationTime
dbfs:/user/hive/warehouse/emp_part/YEAR=1980/MONTH=12/part-00000-0bc18f54-e676-4c7f-9759-4199136e1e2b.c000.snappy.parquet,part-00000-0bc18f54-e676-4c7f-9759-4199136e1e2b.c000.snappy.parquet,2396,1655366694000


In [0]:
%sql
explain select * from emp_part where year='1980'

plan
"== Physical Plan == *(1) ColumnarToRow +- FileScan parquet default.emp_part[EMPNO#1187,ENAME#1188,JOB#1189,MGR#1190,HIREDATE#1191,SAL#1192,COMM#1193,DEPTNO#1194,YEAR#1195,MONTH#1196] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[dbfs:/user/hive/warehouse/emp_part], PartitionFilters: [isnotnull(YEAR#1195), (YEAR#1195 = 1980)], PushedFilters: [], ReadSchema: struct"
