# Spark 
## Optimal Partitioning for Spark Parquet Files 
Parquet is a columnar data format commonly used in Big Data contexts. It's columnn-wise data structure can be partitioned allowing for parallelism and optimized query performance through an SQL like interface through the Spark API. 

An informal literature search revealed a number of strategies to optimize read/write and query performance. This is a late-night summary of the broad strokes. 

### Motivation
The Alibaba Ad Display Click Dataset expands from 255 MB compressed to 24 Gb across the four files summarized below.

{numref}`ali_display_summary`: Alibaba Ad Display / Click Summary

```{table} Dataset Summary
:name: ali_display_summary

|        Users        |    User Profiles    |     Impressions      |            Advertising Campaigns           |       Behaviors       |
|:-------------------:|:-------------------:|:--------------------:|:------------------------------------------:|:---------------------:|
|          1,140,000  |          1,061,768  |          26,557,961  |                                   846,811  |          723,268,134  |
```
### Dynamic Repartition
This captures one approach for processing and persisting medium to big data using Spark and Parquet. Spark documentation describes the DataFrame API and a series of objects, methods and configuration parameters for creating performant, partitioned and parallelizable file representations. Three parameters are central to task:

| Property   Name                   | Default                   | Meaning                                                                                                                                                                                                                                                                                                                                                                                                                        | Since Version |   |
|-----------------------------------|---------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|---|
| spark.sql.files.maxPartitionBytes | 134217728 (128 MB)        | The maximum number of bytes to pack into a single partition   when reading files. This configuration is effective only when using   file-based sources such as Parquet, JSON and ORC.                                                                                                                                                                                                                                          | 2.0.0         |   |
| spark.sql.files.openCostInBytes   | 4194304 (4 MB)            | The estimated cost to open a file, measured by the number of   bytes could be scanned in the same time. This is used when putting multiple   files into a partition. It is better to over-estimated, then the partitions   with small files will be faster than partitions with bigger files (which is   scheduled first). This configuration is effective only when using file-based   sources such as Parquet, JSON and ORC. | 2.0.          |   |
| spark.default.parallelism         | Default   number of cores | For distributed shuffle operations   like reduceByKey and join, the largest number of   partitions in a parent RDD. For operations like parallelize with no parent   RDDs, it depends on the cluster manager:                                                                                                                                                                                                                  |               |   |
In addition, the maxRecordsPerFile option, added in Spark 2.2, allows one to limit the size of the files produced during the partitioning process. This method dynamically scales the file storage configuration to the size of the data partitions. Let's walk through the numbers.

#### Configuration Parameters
The Spark defaults are:

- default.parallelism: The number of CPU Cores (24)
- maxPartitionBytes: 128 MB. 
- openCostInBytes: 4 MB : An estimated cost to open a file. Used to as sort of a minimum filesize for partitioning calculations

The maxRecordsPerFile limits the combinatorial explosion of files that can occur with large datasets and repartitions. It is computed as follows:     

```{python}
maxRecordsPerFile = fileSizeInBytes / maxFileBytes 

and

maxFileBytes = min(maxPartitionBytes, max(openCostInBytes,  bytesPerCore))

where

bytesPerCore = fileSizeInBytes / default.parallelism   
```

#### Partition on Disk with partitionBy
Spark allows one to partition data on disk based upon the value of a column in the dataset using the petitionBy method. Ideally, we would choose a column with high cardinality and reasonably balanced distribution so that the partitions are of roughly the same size. 

Our impressions dataset has a timestamp, from which we can extract the day of the month as a partitioning variable. 

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

    def execute(self, data: Any = None, context: dict = None) -> pd.DataFrame:
        """Extracts temporal data from timestamp and adds as columns to data."""
        data = data.withColumn("day", dayofmonth(timestamp_seconds(col("timestamp"))))

        print(data.show())

        return data

#### Compute Number of Rows per Partition File
Next, we create a function to compute maxRecordsPerFile

In [None]:
def records_per_file(filesize: int, num_rows: int, cores: int = 24, open_cost: int = 4194304, max_partition_size: int = 134217728) -> int:
    """Computes the maximum records per parquet partition file

    Args:
        filesize (int): The size of the file to be partitioned in bytes
        num_rows (int): The number of rows in the file
        cores (int): Number of CPU Cores. 
        open_cost (int): Cost to open a partition
        max_partition_size (int): Maximum size of a partition in bytes.
    Returns (int): maximum number of records per partition file
    """
    # Compute memory per core
    bytes_per_core = filesize / cores
    # Maximum number of bytes per file
    max_file_bytes = min(max_partition_size, max(open_cost, bytes_per_core))
    # Row size in bytes
    row_size_in_bytes = num_rows / filesize
    
    return  int(filesize / row_size_in_bytes)


#### Write Partitioned File
We have the maximum number of records per partition file and the partition column.  Next, we repartition the dataset on the partition column and a repartition seed. The product of the number of partitions and a random float [0,1] to control the size of and the number of records in the output files.

In [None]:

filesize = os.stat(data).st_size
num_rows = data.count()
rows_per_file = records_per_file(filesize=filesize, num_rows=num_rows)
partition_columns = ['day']

spark = SparkSession.builder.getOrCreate()
partition_count = data.groupBy(partition_columns).count()

data = (
    data
    .join(partition_count, on=partition_columns)
    .withColumn('repartition_seed',(rand() * partition_count['count'] / rows_per_file).cast('int'))
    .repartition(*partition_columns, 'repartition_seed')
    .write.mode("overwrite").parquet(filepath)
)