# Data format overview
## 1.0 Introduction
In this tutorial, we will overview and evaluate the following data formats
1. avro (structured)
2. csv (semi-structured)
3. json (semi-structured)
4. orc (structured)
5. parquet (structured) 

We evaluate the data formats by measuring the latency of the following data operations :
1. Disk usage
2. Read/Write latency
3. Random data lookup
4. Filtering/GroupBy(column-wise)
5. Distinct(row-wise)

## Important notes:
1. To avoid erreur, you need to execute the code block one by one from head to tail.
2. Some code block need to be modified. If you see a comment "#### Modify", you need to follow the instruction to modify the default value.


## 2.0 Spark environment configuration
1. Create a spark session for running spark SQL operations
2. Setup a spark history server for monitoring and logging spark operations after the spark operations end.

In this section you need to modify the **bucket_name** value.

In [2]:
import os
import s3fs
endpoint = "https://"+os.environ['AWS_S3_ENDPOINT']
fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': endpoint})

#### Modify, you need to change the bucket_name to your own minio bucket name 
bucket_name="pengfei"

event_log_path="{}/tmp/spark-history".format(bucket_name)
fs.touch('s3://'+event_log_path+'/.keep')
fs.info(event_log_path)

{'Key': 'pengfei/tmp/spark-history',
 'name': 'pengfei/tmp/spark-history',
 'type': 'directory',
 'Size': 0,
 'size': 0,
 'StorageClass': 'DIRECTORY'}

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
       .builder.master("k8s://https://kubernetes.default.svc:443") \
       .appName("Evaluate data format") \
       .config("spark.kubernetes.container.image", "inseefrlab/jupyter-datascience:master") \
       .config("spark.kubernetes.authenticate.driver.serviceAccountName", os.environ['KUBERNETES_SERVICE_ACCOUNT']) \
       .config("spark.executor.instances", "5") \
       .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE']) \
       .config("spark.eventLog.enabled","true") \
       .config("spark.eventLog.dir","s3a://"+event_log_path) \
       .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.0.1") \
       .getOrCreate()


Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://kubernetes.default.svc/api/v1/namespaces/user-pengfei/configmaps. Message: Forbidden! User user doesn't have permission. configmaps is forbidden: User "system:serviceaccount:user-pengfei:jupyter-1620304224" cannot create resource "configmaps" in API group "" in the namespace "user-pengfei".
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:589)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:526)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:492)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:451)
	at io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:252)
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:879)
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:341)
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:84)
	at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.setUpExecutorConfigMap(KubernetesClusterSchedulerBackend.scala:77)
	at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.start(KubernetesClusterSchedulerBackend.scala:99)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:579)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


## 2.1 Setup data path
The data source is from https://www.kaggle.com/netflix-inc/netflix-prize-data?select=combined_data_1.txt. For saving some time, we just covert it to different format and uploaded it to our data lake. Because writing data takes much more time than reading. But we recoreded the latence of write operations, so we can analyze them after.  

In [None]:
json_data_path="s3a://pengfei/diffusion/data_format/netflix.json"
parquet_data_path="s3a://pengfei/diffusion/data_format/netflix.parquet"
avro_data_path="s3a://pengfei/diffusion/data_format/netflix.avro"
orc_data_path="s3a://pengfei/diffusion/data_format/netflix.orc"
csv_data_path="s3a://pengfei/diffusion/data_format/netflix.csv"

## 2.2 Some useful functions for evaluating data format

In this section, we define some useful functions which we will use after. You don't need to understand them to finish this tutorial. 

### 2.2.1 The read function read the source data file and convert it to a spark data frame

In [1]:
# file path for storing operation stats
data_format_op_stats_path="../tmp/op-stats.csv"

# file path for storing size stats
data_format_size_stats_path="../tmp/size-stats.csv"
def write_stats(line):
    file1 = open(data_format_op_stats_path,"a")
    file1.write(line+"\n")

In [46]:
import time
def read(fmt):
    start = time.time()
    if fmt == "json":
        sdf = spark.read.option("header", "true").json(json_data_path)
    elif fmt == "csv":
        sdf = spark.read.option("header", "true").csv(csv_data_path)
    elif fmt == "avro":
        sdf = spark.read.format("avro").load(avro_data_path)
    elif fmt == "parquet":
        sdf = spark.read.parquet(parquet_data_path)
    elif fmt == "orc":
        sdf = spark.read.orc(orc_data_path)
    sdf.show(5,False)
    stats="{}, {}, {}".format(fmt, "read", time.time() - start)
    write_stats(stats)
    print(stats)
    return sdf

### 2.2.2 The get_shape function prints the shape(e.g. row number and column number) of the data frame

In [37]:
def get_shape(df,fmt):
    start = time.time()
    row_num=df.count()
    col_num=len(df.columns)
    stats="{}, {}, {}".format(fmt, "get_shape", time.time() - start)
    write_stats(stats)
    print("The data frame has {} rows and {} columns".format(row_num,col_num))
    print(stats)

### 2.2.3 The stats function prints the min, max and numbers of a column of the data frame

In [38]:
def stats(df,fmt, field="rating"):
    start = time.time()
    max=df.agg({field: "max"})
    min=df.agg({field: "min"})
    count=df.agg({field: "count"})
    min.show(5,False)
    max.show(5,False)
    count.show(5,False)
    stats="{}, {}, {}".format(fmt, "stats", time.time() - start)
    write_stats(stats)
    print(stats)

### 2.2.4 The random_batch function randomly select rows from the data frame. It can evaluate the ability of random data lookup

In [39]:
def random_batch(df,fmt):
    start = time.time()
    result=df.sample(False, 0.05).collect()
    stats="{}, {}, {}".format(fmt, "random_batch", time.time() - start)
    write_stats(stats)
    print(stats)
   # return result

### 2.2.5 The distinct function count distinct rows of the data frame

In [40]:
def distinct(df,fmt):
    start = time.time()
    result = df.distinct().count()
    stats="{}, {}, {}".format(fmt, "distinct", time.time() - start)
    write_stats(stats)
    print(stats)
    return result

### 2.2.6 The group_by function group and count the data frame by a specific column

In [41]:
def group_by(df,fmt):
    start = time.time()
    result=df.groupBy("rating").count()
    result.show(5,False)
    stats="{}, {}, {}".format(fmt, "group_by", time.time() - start)
    write_stats(stats)
    print(stats)
    #return result

### 2.2.7 The filtering function filter data by using a specific boolean condition

In [42]:
def filtering(df, fmt, date="2005-11-15"):
    start = time.time()
    result = df.filter(df.date > date).count()
    stats="{}, {}, {}".format(fmt, "filtering", time.time() - start)
    write_stats(stats)
    print(stats)
    return result

### 2.2.8 The remove_space function removes space in string of a column to avoid filtering fail.

In [None]:
from pyspark.sql.functions import ltrim,rtrim,trim
def remove_space(df,col_name,position):
    # remove left side space
    if position =="l":
        return df.withColumn("tmp",ltrim(col(col_name))).drop(col_name).withColumnRenamed("tmp", col_name)
    # remove right side space
    elif position =="r":
        return df.withColumn("tmp",rtrim(col(col_name))).drop(col_name).withColumnRenamed("tmp", col_name)
    # remove all side space
    elif position =="a":
        return df.withColumn("tmp",trim(col(col_name))).drop(col_name).withColumnRenamed("tmp", col_name)

# 3. Gathering stats of each data format

## 3.1 Get CSV format evaluation stats

In [44]:
csv_df=read("csv")

+-------+------+----------+
|user_id|rating|date      |
+-------+------+----------+
|1488844|3     |2005-09-06|
|822109 |5     |2005-05-13|
|885013 |4     |2005-10-19|
|30878  |4     |2005-12-26|
|823519 |3     |2004-05-03|
+-------+------+----------+
only showing top 5 rows

csv, read, 8.758424282073975


In [13]:
csv_df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- date: string (nullable = true)



In [15]:
get_shape(csv_df,"csv")

The data frame has 24058262 rows and 3 columns
csv, get_shape, 6.950009822845459


In [16]:
# get min, max and row number of column rating
stats(csv_df,"csv",field="rating")

+-----------+
|min(rating)|
+-----------+
|1          |
+-----------+

+-----------+
|max(rating)|
+-----------+
|5          |
+-----------+

+-------------+
|count(rating)|
+-------------+
|24053764     |
+-------------+

csv, stats, 22.299869537353516


In [17]:
random_batch(csv_df,"csv")

csv, random_batch, 15.357799291610718


In [18]:
distinct(csv_df,"csv")

csv, distinct, 19.20894479751587


12168704

In [19]:
group_by(csv_df,"csv")

+------+-------+
|rating|count  |
+------+-------+
|3     |6904181|
|null  |4498   |
|5     |5506583|
|1     |1118186|
|4     |8085741|
+------+-------+
only showing top 5 rows

csv, group_by, 8.280380249023438


In [20]:
filtering(csv_df,"csv")

In [28]:
name="netflix"
csv_df.write.mode("overwrite").option("header", "true").csv("{}.csv".format(name))

## 3.2 Get Json format evaluation stats

In [98]:
json_df=read("json")

+----------+------+-------+
|date      |rating|user_id|
+----------+------+-------+
|2005-09-06|3     |1488844|
|2005-05-13|5     |822109 |
|2005-10-19|4     |885013 |
|2005-12-26|4     |30878  |
|2004-05-03|3     |823519 |
+----------+------+-------+
only showing top 5 rows

json, read, 9.966949701309204


In [99]:
json_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- user_id: string (nullable = true)



In [100]:
get_shape(json_df,"json")

In [111]:
# get min, max and row number of column rating
stats(json_df,"json",field="rating")

+-----------+
|min(rating)|
+-----------+
|1          |
+-----------+

+-----------+
|max(rating)|
+-----------+
|5          |
+-----------+

+-------------+
|count(rating)|
+-------------+
|24053764     |
+-------------+

json, stats, 35.458019971847534


In [101]:
random_batch(json_df,"json")

json, random_batch, 17.13827896118164


In [102]:
distinct(json_df,"json")

json, distinct, 21.003305673599243


12168704

In [115]:
group_by(json_df,"json")

+------+-------+
|rating|count  |
+------+-------+
|3     |6904181|
|null  |4498   |
|5     |5506583|
|1     |1118186|
|4     |8085741|
+------+-------+
only showing top 5 rows

json, group_by, 12.402800798416138


DataFrame[rating: string, count: bigint]

In [103]:
filtering(json_df,"json")

json, filtering, 11.020655632019043


850269

## 3.3 Get Avro format evaluation stats

In [47]:
avro_df=read("avro")

Py4JJavaError: An error occurred while calling o196.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 9) (10.233.118.242 executor 5): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.dataReader$1 of type scala.Function1 in instance of org.apache.spark.sql.execution.datasources.FileFormat$$anon$1
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2411)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.dataReader$1 of type scala.Function1 in instance of org.apache.spark.sql.execution.datasources.FileFormat$$anon$1
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2411)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
get_shape(avro_df,"avro")

In [None]:
stats(avro_df,"avro",field="rating")

In [None]:
random_batch(avro_df,"avro")

In [None]:
distinct(avro_df,"avro")

In [None]:
group_by(avro_df,"avro")

In [None]:
filtering(avro_df,"avro")

## 3.4 Get Parquet format evaluation stats

In [141]:
parquet_df=read("parquet")

+-------+------+----------+
|user_id|rating|date      |
+-------+------+----------+
|1488844|3     |2005-09-06|
|822109 |5     |2005-05-13|
|885013 |4     |2005-10-19|
|30878  |4     |2005-12-26|
|823519 |3     |2004-05-03|
+-------+------+----------+
only showing top 5 rows

parquet, read, 1.6400139331817627


In [105]:
get_shape(parquet_df,"parquet")

The data frame has 24058262 rows and 3 columns
parquet, get_shape, 1.010782241821289


In [106]:
random_batch(parquet_df,"parquet")

parquet, random_batch, 6.159161567687988


In [112]:
stats(parquet_df,"parquet",field="rating")

+-----------+
|min(rating)|
+-----------+
|1          |
+-----------+

+-----------+
|max(rating)|
+-----------+
|5          |
+-----------+

+-------------+
|count(rating)|
+-------------+
|24053764     |
+-------------+

parquet, stats, 4.589794635772705


In [137]:
distinct(parquet_df,"parquet")

parquet, distinct, 164.0264663696289


12168704

In [117]:
group_by(parquet_df,"parquet")

+------+-------+
|rating|count  |
+------+-------+
|3     |6904181|
|null  |4498   |
|5     |5506583|
|1     |1118186|
|4     |8085741|
+------+-------+
only showing top 5 rows

parquet, group_by, 1.5566697120666504


In [119]:
filtering(parquet_df,"parquet")

parquet, filtering, 1.3744680881500244


850269

## 3.5 Get ORC format evaluation stats

In [130]:
orc_df=read("orc")

+-------+------+----------+
|user_id|rating|date      |
+-------+------+----------+
|1488844|3     |2005-09-06|
|822109 |5     |2005-05-13|
|885013 |4     |2005-10-19|
|30878  |4     |2005-12-26|
|823519 |3     |2004-05-03|
+-------+------+----------+
only showing top 5 rows

orc, read, 2.3085367679595947


In [132]:
get_shape(orc_df,"orc")

The data frame has 24058262 rows and 3 columns
orc, get_shape, 1.4532253742218018


In [133]:
random_batch(orc_df,"orc")

orc, random_batch, 6.4762678146362305


In [134]:
stats(orc_df,"orc",field="rating")

+-----------+
|min(rating)|
+-----------+
|1          |
+-----------+

+-----------+
|max(rating)|
+-----------+
|5          |
+-----------+

+-------------+
|count(rating)|
+-------------+
|24053764     |
+-------------+

orc, stats, 4.612210035324097


In [138]:
distinct(orc_df,"orc")

orc, distinct, 185.58755350112915


12168704

In [135]:
group_by(orc_df,"orc")

+------+-------+
|rating|count  |
+------+-------+
|3     |6904181|
|null  |4498   |
|5     |5506583|
|1     |1118186|
|4     |8085741|
+------+-------+
only showing top 5 rows

orc, group_by, 1.680478811264038


In [136]:
filtering(orc_df,"orc")

orc, filtering, 1.186652421951294


850269

# 4. Evaluate the performence of different data formats 

## 4.1 Disk Usage for different  file format

In [3]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, DoubleType,StringType,LongType

# define a schema

op_schema = StructType([
    StructField("format", StringType(), True),
    StructField("command", StringType(), True),
    StructField("Latency", DoubleType(), True)])

size_schema = StructType([
    StructField("format", StringType(), True),
    StructField("size_kb", LongType(), True)])


In [4]:
# read size stats file

rawSizeDf=spark.read.option("header", "true").csv(data_format_size_stats_path,schema=size_schema)
# generate a new column to make data humain readable
sizeDf=rawSizeDf.withColumn("size_mb",col("size_kb")/1024)
sizeDf.orderBy(col("size_kb").asc()).show(5)

NameError: name 'spark' is not defined

In [None]:
# convert spark df to pandas df
pd_size_df=sizeDf.toPandas()

In [None]:
# show the stats in a catplot
g_size = sns.catplot(
    data=pd_size_df, kind="bar",
    x="format", y="size_mb", 
    order=['orc','parquet','avro','csv','json'],
    ci="sd", palette="dark", alpha=0.8, height=8
)
g_size.despine(left=False)
g_size.set_axis_labels("", "Size (kb) ")
g_size.set(ylim=(150, 1300))

The main objective of any file is to store data. It is necessary to store large volumes of different data types with less spaces. Because the disk usage can cost you a lot of money.

Based on the above graphe, we can have the obvious conclusion **do not use JSON or csv to store the raw data, and orc, parquet are the best solution for storing data.Now we want to know why?** 

Note that we used the default configuration for all format during the entire benchmark. So we do not specify any compression codec anywhere. As a result, we have the following table

| format | size(mb) | compression |
|--------|----------|-------------|
| json   | 1261.90  | None        |
| csv    | 472.10   | None        |
| avro   | 279.91   | None        |
| parquet| 198.01   | SNAPPY      |
| orc    | 186.84   | ZLIB        |


1. JSON uses the most space, because it has huge overhead on storing "schema". Each row must repeat the column name value, which are the same for all the rows (in our example, it represents 24058262 rows). 

2. CSV only has one line header for storing column names, so it uses much less space than JSON  
3. Avro does not do any compression, but it still uses less space than CSV. Because it saves raw data in binary. And the binary codec do a bit compression with storing int and long leveraging variable-length zig-zag coding.
4. Parque alson stores raw data in binary, and it does compress data. In spark, it uses SNAPPY as the default compressing codec.
5. Orc uses ZLIB as compression codec. As ZLIB can offer better compression ratio, so Orc use less space than Parquet. But Snappy is faster. 


You could argu that, you can gzip CSV and json which will reduce the disk usage. But, json and csv are not splitable after compression. As a result, we can not benefit from the parallel processing of Spark anymore. Thus, the advantage of avro, parquet and orc is obvious.

Note that avro, parquet and orc supports many compression codec. Here we only benchmark the default setting of each format. So the difference of disk usage between these three are negligible.

## 4.2 Visualize the data processing latency for each format

In this section, we will examine the latency of several the most common data processing operations to determine which format is more optimal in certain context. We will first examine the common operations one by one. At last, we will give you an overview and conclusion

In [None]:


# read stats file
rawDf=spark.read.option("header", "false").csv(data_format_op_stats_path,schema=op_schema)
rawDf.show(5)
rawDf.count()

In [None]:
# remove duplicated rows
tmpDf=rawDf.dropDuplicates(["format","command"])
# remove space in command column
statsDf=remove_space(tmpDf,"command","l")
statsDf.count()

In [None]:
# convert to pandas df
pd_df=statsDf.toPandas()
pd_df.shape

### 4.2.1 Read data

In [2]:
read_op_df=pd_df[pd_df.command.eq("read")]
read_op_df.head(5)

NameError: name 'pd_df' is not defined

In [None]:
# show the stats in a catplot
g_read_op = sns.catplot(
    data=read_op_df, kind="bar",
    x="format", y="Latency", 
    order=['orc','parquet','avro','csv','json'],
    ci="sd", palette="dark", alpha=0.8, height=8
)
g_read_op.despine(left=False)
g_read_op.set_axis_labels("", "Latency (seconds) ")
g_read_op.set(ylim=(0, 10))

You could notice that the reading speed of orc, parquet and avro are much faster than csv and json. Because they store raw data in binary, which are optimized for performence.

### 4.2.2 Get basic stats such as min, max, column numbers

The basic stats of a data set such as min, max, row numbers are the basic information we will gather when we exploring un data set. Below figure shows the latency of each format.

In [None]:
meta_op_df=pd_df.loc[(pd_df.command.eq("stats"))|(pd_df.command.eq("get_shape"))]
meta_op_df.head(5)

In [None]:
g_op_meta = sns.catplot(
    data=meta_op_df, kind="bar",
    x="format", y="Latency", hue="command",
    order=['orc','parquet','avro','csv','json'],
    ci="sd", palette="dark", alpha=0.8, height=10
)
g_op_meta.despine(left=False)
g_op_meta.set_axis_labels("", "Latency (seconds)")
g_op_meta.legend.set_title(" ")

Orc and Parquet saves the min, max and row numbers as metadata. So the basic stats operations do not require the process of the entire dataset.  

### 4.2.3 Random batch

Random batch select randomly a subset of rows in a dataframe. If checks the performence of radom access operations of a data format. Unlike sequential access, random access requires more sophisticated optimization to avoid read unnecessary data. 

In [None]:
random_op_df=pd_df[pd_df.command.eq("random_batch")]
random_op_df.head(5)

In [None]:
# show the stats in a catplot
g_random_op = sns.catplot(
    data=random_op_df, kind="bar",
    x="format", y="Latency", 
    order=['orc','parquet','avro','csv','json'],
    ci="sd", palette="dark", alpha=0.8, height=8
)
g_random_op.despine(left=False)
g_random_op.set_axis_labels("", "Latency (seconds) ")
g_random_op.set(ylim=(0, 10))

orc, parquet, and avro split data in small chunks, and each chunk has a header which contains the metadata of the data in this chunk. As a result, when a certain row is required, the reader will check first the metadata in the header, if the condition is not matched, this chunk will be omitted. And this will avoid to read all unnecessary data 

### 4.2.4 column-wise operations (Filtering/GroupBy)

Many operations such as filtering or groupBy only instrested in data of certain columns. If we can avoid reading unnecessary data. We can improve dramatiquely the performence. Below figure shows the latency of filtering and groupBy operations. 

In [None]:
col_op_df=pd_df.loc[(pd_df.command.eq("group_by"))|(pd_df.command.eq("filtering"))]
col_op_df.head(5)

In [None]:
g_col_op = sns.catplot(
    data=col_op_df, kind="bar",hue="command",
    x="format", y="Latency", 
    order=['orc','parquet','avro','csv','json'],
    ci="sd", palette="dark", alpha=0.8, height=8
)
g_col_op.despine(left=False)
g_col_op.set_axis_labels("", "Latency (seconds) ")
g_col_op.set(ylim=(0, 10))

We can notice orc and parquet are much quicker than avro, csv, and json. Because orc, and parquet are columnar-based data formats, and avro, csv, and json are row-based data formats. We will discuss the difference between columnar-based and row-based in another tutorial. 

### 4.2.5.row-wise operation (Distinct)

In [None]:
distinct_op_df=pd_df[pd_df.command.eq("distinct")]
distinct_op_df.head(5)

In [None]:
g_distinct_op = sns.catplot(
    data=distinct_op_df, kind="bar",
    x="format", y="Latency", 
    order=['orc','parquet','avro','csv','json'],
    ci="sd", palette="dark", alpha=0.8, height=8
)
g_distinct_op.despine(left=False)
g_distinct_op.set_axis_labels("", "Latency (seconds) ")
g_distinct_op.set(ylim=(0, 50))

# 5. Overview and Conclusion

In [None]:
g_op = sns.catplot(
    data=pd_df, kind="bar",
    x="format", y="Latency", hue="command",
    order=['orc','parquet','avro','csv','json'],
    ci="sd", palette="dark", alpha=0.8, height=10
)
g_op.despine(left=False)
g_op.set_axis_labels("", "Latency (seconds)")
g_op.legend.set_title(" ")


The above figure shows the overall operation latency of all data formats. You can easily spot the most appropriate format for you based on your use case.


## 5.1 Some basic properties of data format

We have seen the operation latency of all data formats. But there are other properties that may impact the choice of the data format. The below table shows some basic properties of data formats when we evaluate them. 


|Property |CSV |Json|Parquet|Avro|ORC|
|---------|----|----|-------|----|---|
|Human Readable|YES|YES|NO|NO|NO|
|Compressable|YES|YES|YES|YES|YES|
|Splittable|YES*|YES*|YES|YES|YES|
|Complex data structure|NO|YES|YES|YES|YES|
|Schema evolution|NO|NO|YES|YES|YES|
|Columnar|NO|NO|YES|NO|YES|

Note:

1. CSV is splittable when it is a raw, uncompressed file or using a splittable compression format such as BZIP2 or LZO (note: LZO needs to be indexed to be splittable!)
2. JSON has the same conditions about splittability when compressed as CSV with one extra difference. When “wholeFile” option is set to true in Spark(re: SPARK-18352), JSON is NOT splittable.

## 5.2 Best data format for OLAP: Parquet vs Orc

After the above analysis, we can say that Orc and Parquet are the best data formats for OLAP applications. They both support various compression algorithms which reduce significantly disk usage. They are both very efficient on columnar-oriented data analysis operations. 

Parquet has better support on nested data types than Orc. Orc loses compression ratio and analysis performance when data contains complex nested data types.

Orc supports data update and ACID (atomicity, consistency, isolation, durability). Parquet does not, so if you want to update a Parquet file, you need to create a new one based on the old one.

Parquet has better interoperability compare to Orc. Because almost all data analysis tools and framework supports parquet. Orc is only supported by Spark, Hive, Impala, MapReduce.

## 5.3 Conclusion

If you are building a data lake, warehouse, or mart. We strongly recommend you use Parquet as the default data format for all your data.
If you need to constantly update(write) your data, do not use columnar-based data formats such as ORC, Parquet.
If you need to update your data schema, use Avro.

As you know, when we load data from disk to memory via a tool (Spark, Hive, Pandas, etc), the tool needs to convert the file format(disk) to object models(memory). This conversion is very expensive, and each tool provides its own object models. To avoid the conversion of object models of different tools, a standard model has been proposed(i.e. Apache Arrow). Want to know more? Stay tuned for our next tutorial.


# Stop the spark cluster

In [29]:
# stop sparksession
spark.sparkContext.stop()

### Check if the spark cluster is well closed. You should not see any python-spark pods

In [5]:
! kubectl get pods

The connection to the server api-server.static.lab.sspcloud.fr was refused - did you specify the right host or port?
