# Spark compatibility check
In this section, we use spark to read parquet file that are generated by arrow

In [10]:
from pyspark.sql import SparkSession
import os
import numpy as np
from pyspark.sql import functions as f
from pyspark.sql.types import *
import io
import time
from pyspark.sql import Row


In [4]:
spark = SparkSession \
    .builder.master("k8s://https://kubernetes.default.svc:443") \
    .appName("SparkReadArrow") \
    .config("spark.kubernetes.container.image", "inseefrlab/jupyter-datascience:master") \
    .config("spark.kubernetes.authenticate.driver.serviceAccountName", os.environ['KUBERNETES_SERVICE_ACCOUNT']) \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.memory","8g") \
    .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE']) \
    .getOrCreate()

In [23]:
! kubectl get pods

I0915 14:22:37.084522    2169 request.go:655] Throttling request took 1.168941502s, request: GET:https://kubernetes.default/apis/rbac.authorization.k8s.io/v1beta1?timeout=32s
NAME                                                        READY   STATUS      RESTARTS   AGE
argo-workflows-315671-server-7d5cf97d7-rft67                1/1     Running     0          2d7h
argo-workflows-315671-workflow-controller-f9545ff99-nmzlj   1/1     Running     0          2d7h
flume-test-agent-df8c5b944-j47lw                            1/1     Running     0          3d17h
jupyter-107079-85887dc86c-4h46d                             1/1     Running     0          6h13m
kafka-server-0                                              1/1     Running     0          3d19h
kafka-server-1                                              1/1     Running     0          3d16h
kafka-server-2                                              1/1     Running     0          4d17h
kafka-server-zookeeper-0                             

In [21]:
parquet_input_path = "s3a://pengfei/diffusion/data_format/arrow_netflix/"
output_path="s3a://pengfei/diffusion/data_format/spark_netflix/"
csv_example="s3a://pengfei/diffusion/data_format/Fire_Department_Calls_for_Service.csv"

In [25]:
def check_spark_parquet_read_time(path):
    t1=time.time()
    df=spark.read.parquet(path)
    print(f"data frame has {df.count()} rows, {len(df.columns)} columns")
    t2=time.time()
    print(f"Spark read time spents: {t2 - t1} s")
    return df

# read parquet generated by arrow    
check_spark_parquet_read_time(parquet_input_path)

# read parquet generated by spark
check_spark_parquet_read_time("s3a://pengfei/diffusion/data_format/netflix.parquet")

data frame has 24058262 rows, 3 columns
Spark read time spents: 0.8141007423400879 s
data frame has 24058262 rows, 3 columns
Spark read time spents: 0.8090753555297852 s


In [22]:
def check_spark_write_time(df,path):
    t1=time.time()
    df.write.parquet(path)
    t2=time.time()
    print(f"Spark write time spents: {t2 - t1} s")
    
df=spark.read.parquet(parquet_input_path)
check_spark_write_time(df,output_path)

Spark write time spents: 26.110023498535156 s


In [16]:
# merge schema example
# In this example, we create two data frame, 
# df1, we have two columns: single, double.
# df2, we have two columns: single, triple
sc = spark.sparkContext
schema_output_path1= "s3a://pengfei/diffusion/data_format/merge_schema/test_table/key=1"
schema_output_path2= "s3a://pengfei/diffusion/data_format/merge_schema/test_table/key=2"
df1 = spark.createDataFrame(sc.parallelize(range(1, 6)).map(lambda i: Row(single=i, double=i ** 2)))
df1.show()
df2 = spark.createDataFrame(sc.parallelize(range(6, 11)).map(lambda i: Row(single=i, triple=i ** 3)))
df2.show()
# then we write the two data frame in a partition folder, here we put key=1, key=2.
df1.write.parquet(schema_output_path1)
df2.write.parquet(schema_output_path2)

+------+------+
|single|double|
+------+------+
|     1|     1|
|     2|     4|
|     3|     9|
|     4|    16|
|     5|    25|
+------+------+

+------+------+
|single|triple|
+------+------+
|     6|   216|
|     7|   343|
|     8|   512|
|     9|   729|
|    10|  1000|
+------+------+



AnalysisException: path s3a://pengfei/diffusion/data_format/merge_schema/test_table/key=1 already exists.

In [17]:
# if we read the parent folder that contains the partitioned folder. The partition key become a column name, we call it partitioned column
parent_path="s3a://pengfei/diffusion/data_format/merge_schema/test_table"
# as the data frame in each partition folder has different schema, we need to set mergeSchema to true. Otherwise it will only use the schema
# of the first parquet file which it reads. 
# set the below value to false to check the output data frame.
mergedDF = spark.read.option("mergeSchema", "true").parquet(parent_path)
mergedDF.printSchema()

mergedDF.show()

root
 |-- single: long (nullable = true)
 |-- double: long (nullable = true)
 |-- key: integer (nullable = true)

+------+------+---+
|single|double|key|
+------+------+---+
|     9|  null|  2|
|    10|  null|  2|
|     4|    16|  1|
|     5|    25|  1|
|     6|  null|  2|
|     7|  null|  2|
|     8|  null|  2|
|     1|     1|  1|
|     2|     4|  1|
|     3|     9|  1|
+------+------+---+



In [None]:
def check_spark_read_csv_time(path):
    t1=time.time()
    df=spark.read.csv(path)
    print(f"data frame has {df.count()} rows, {len(df.columns)} columns")
    t2=time.time()
    print(f"Spark read time spents: {t2 - t1} s")
    return df

df_fire=check_spark_read_csv_time(csv_example)


In [None]:
#
fire_output_path1="s3a://pengfei/diffusion/data_format/Fire_Department1.parquet"
# use gzip compression instead of snappy
fire_output_path2="s3a://pengfei/diffusion/data_format/Fire_Department2.parquet"
# change default dictionary size to 512KB
fire_output_path3="s3a://pengfei/diffusion/data_format/Fire_Department3.parquet"
# this parquet has default partition 8, so it has 8 parquet file
fire_output_path0="s3a://pengfei/diffusion/data_format/Fire_Department.parquet"

# check_spark_write_time(df_fire,fire_output_path)

# set the block size to 128MB, this does not work, because the number of parquet partition is set by the number of dataframe partition.
# df_fire.write.option("parquet.block.size",256 * 1024 * 1024).parquet(fire_output_path)


In [None]:

# we can set the
df_fire.coalesce(1).write \
.option("parquet.block.size",256 * 1024 * 1024) \
.option("parquet.page.size",3*1024*1024) \
.option("parquet.dictionary.page.size",512 * 1024) \
.option("parquet.enable.dictionary", "true") \
.option("parquet.compression","gzip") \
.parquet(fire_output_path0)


# Parquet configuration

- parquet.block.size: It defines the default size of a parquet partition file. If you use hdfs to store these file, the Parquet file block size should be no larger than the HDFS block size for the file so that each Parquet block can be read from a single HDFS block (and therefore from a single datanode). It is common to set them to be the same, and indeed both defaults are for 128 MB block sizes. Note the default size does not mean that all parquet partition files will have the exact same size. But their size will be approximate to this value. For example, if the default size is 128MB, then one can have 127.66MB, one can have 126.98MB.

- parquet.page.size: It defines the default size of a page in a column. A page is the smallest unit of storage in a Parquet file, so retrieving an arbitrary row requires that the page containing the row be decompressed and decoded. Thus, for single-row lookups, it is more efficient to have smaller pages, so there are fewer values to read through before reaching the target value.

- parquet.dictionary.page.size: The maximum allow size in byte of a dictionary before falling back to plain encoding for a page

- parquet.enable.dictionary: Enable dictionary encoding or not.

- parquet.compress: choose the compression type.



# Some tips:
1. Dictionary encoding:
Smaller files means there will be less I/O involved. Dictionary encoding will ensure that there is improvement in storage and accessing. For one column chunk there will be single dictionary. Most types are encoded using dictionary en‐ coding by default; however, a plain encoding will be used as a fallback if the dictionary becomes too large. The threshold size at which this happens is referred to as the dictionary page size and is the same as the page size by default. Please refer to parquet configuration section for more information. One can validate whether the file is dictionary encoded by using the parquet-tools.
In order to perform better we need to decrease the row group size and increase the dictionary page size.
2. Page Compression:
The below default compression schemes while using the Parquet format.
Spark uses snappy as default.
Impala uses snappy as default.
Hive uses deflate codec as default.
Using snappy compression will reduce the size of the page and improve read time.
Using Parquet format allows for better compression, as data is more homogeneous. The space savings are very noticeable at the scale of a Hadoop cluster. I/O will be reduced as we can efficiently scan only a subset of the columns while reading the data. Better compression also reduces the bandwidth required to read the input. As we store data of the same type in each column, we can use encoding better suited to the modern processors’ pipeline by making instruction branching more predictable. Parquet format is mainly used for WRITE ONCE READ MANY applications.
I hope this blog helped you in understanding the parquet format and internal functionality. Happy Learning!!!


# Spark parquet config set to false by default
spark.sql.parquet.mergeSchema
spark.sql.parquet.respectSummaryFiles
spark.sql.parquet.binaryAsString
spark.sql.parquet.int96TimestampConversion
spark.sql.parquet.int64AsTimestampMillis
spark.sql.parquet.writeLegacyFormat
spark.sql.parquet.recordLevelFilter.enabled

# Spark parquet config set to true by default

spark.sql.parquet.int96AsTimestamp
spark.sql.parquet.filterPushdown
spark.sql.parquet.filterPushdown.date
spark.sql.parquet.filterPushdown.timestamp
spark.sql.parquet.filterPushdown.decimal
spark.sql.parquet.filterPushdown.string.startsWith
spark.sql.parquet.enableVectorizedReader

# These properties need value and listing it with defaults-

spark.sql.parquet.outputTimestampType = INT96
spark.sql.parquet.compression.codec = snappy
spark.sql.parquet.pushdown.inFilterThreshold = 10
spark.sql.parquet.output.committer.class = org.apache.parquet.hadoop.ParquetOutputCommitter
spark.sql.parquet.columnarReaderBatchSize = 4096

Regarding parquet.enable.dictionary, it is not supported by Spark yet. But it can be set in sqlContext as -

sqlContext.setConf("parquet.enable.dictionary", "false")
Default value is of this property is true in parquet. Therefore, it should be true when parquet code is called from Spark.