<link rel='stylesheet' href='../assets/css/main.css'/>

[<< back to main index](../README.md) 

# Lab 4.6 : Data formats (JSON vs. Parquet vs. ORC)


### Overview
Comparing different data formats for Dataframes.  We will evaluate JSON, Parquet and ORC format.

Background reads:
- [Spark data frames](https://spark.apache.org/docs/latest/sql-programming-guide.html)
- JSON format 
    - [wikipedia](https://en.wikipedia.org/wiki/JSON)
    - [json.org](http://json.org/)
- Parquet format
    - [Parquet project](https://parquet.apache.org/)
    - [parquet github](https://github.com/Parquet/parquet-format)
    - [presentation](http://www.slideshare.net/larsgeorge/parquet-data-io-philadelphia-2013)
- ORC format
    + [ORC project](https://orc.apache.org/)
    + [ORC explained](http://www.semantikoz.com/blog/orc-intelligent-big-data-file-format-hadoop-hive/)
    + [ORC performance](http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.3/bk_performance_tuning/content/hive_perf_best_pract_use_orc_file_format.html)

### Depends On 
None

### Run time
20-30 mins


## STEP 1: Clickstream data
There is about 1G+ clickstream data stored in `/data/click-stream/json` directory.  

They look like this

```json
{"timestamp": 1420070400000, "ip": "ip_557", "user": "user_13011", "action": "blocked", "domain": "npr.org", "campaign": "campaign_13", "cost": 116, "session": "session_43"}

{"timestamp": 1420070400043, "ip": "ip_129", "user": "user_58773", "action": "clicked", "domain": "flickr.com", "campaign": "campaign_7", "cost": 170, "session": "session_23"}

{"timestamp": 1420070400086, "ip": "ip_704", "user": "user_71191", "action": "viewed", "domain": "foxnews.com", "campaign": "campaign_20", "cost": 47, "session": "session_48"}

```

#### [Optional] If you need to generate more data....
```bash
    $    cd   /data/click-stream/
    $    python   gen-clickstream-json.py
```

## STEP 2: Benchmarking Spreadsheet
Download and inspect [Benchmarking_Dataformats.xlsx](Benchmarking_Dataformats.xlsx).  
**We will be filling out the values in this spreadsheet, as we execute commands on Spark Shell.**

It will look like this (click on the image for larger version)

<a href="../assets/images/5.3a.png"><img src="../assets/images/5.3a-small.png" style="border: 5px solid grey; max-width:100%;"/></a>



In [1]:
# initialize Spark Session
import os
import sys
top_dir = os.path.abspath(os.path.join(os.getcwd(), "../"))
if top_dir not in sys.path:
    sys.path.append(top_dir)

from init_spark import init_spark
spark = init_spark()
sc = spark.sparkContext

Initializing Spark...
Spark found in :  /home/ubuntu/spark
Spark config:
	 spark.app.name=TestApp
	spark.master=local[*]
	executor.memory=2g
	spark.sql.warehouse.dir=/tmp/tmp5_vdi3ry
	some_property=some_value
Spark UI running on port 4043


## STEP 3: ATOP

Also open another terminal and run **atop**.  
If you are using Jupyter Labs, launch a terminal from 'Launcher'.  
We will use this to monitor CPU / IO usage 


## STEP 4: Load Clickstream data
**==> While the import is running take a look at `atop` terminal.  Which of the resources are we maxing out?**  
**==> Measure the time taken to load JSON data; record it in the spreadsheet**  

In [2]:
import time

# load all the files in the dir
t1 = time.perf_counter()

clicksJson = spark.read.json("/data/click-stream/json/")

t2 = time.perf_counter()
print ("Read JSON in {:,.2f} ms ".format( (t2-t1)*1000))
print(clicksJson)

Read JSON in 14,570.65 ms 
DataFrame[action: string, campaign: string, cost: bigint, domain: string, ip: string, session: string, timestamp: bigint, user: string]


### 4.5 - Query 
**==> Find the max value of cost**   
**==> While the query is running, check `atop`**


Sample output
```
    +---------+
    |MAX(cost)|
    +---------+
    |      180|
    +---------+
```

**==> Note the time it took to run the query, and record it in spreadsheet**

In [3]:
import time
from pyspark.sql import *

clicksJson.createOrReplaceTempView("clicks_json")

t1 = time.perf_counter()
spark.sql("SELECT MAX(cost) FROM clicks_json").show()
t2 = time.perf_counter()

print ("MAX in JSON in {:,.2f} ms ".format( (t2-t1)*1000))

+---------+
|max(cost)|
+---------+
|      180|
+---------+

MAX in JSON in 6,059.38 ms 


## STEP 5 : Save the logs in Parquet format

We are going to use Spark's built-in parquet support to save the dataframe into parquet format

**==> Inspect `atop` terminal**  
**==> Measure the time taken to 'save as parquet' and record it in spreadsheet**  

In [4]:
import time
t1 = time.perf_counter()

clicksJson.write.parquet("/data/click-stream/my-parquet")

t2 = time.perf_counter()
print ("Wrote Parquet in {:,.2f} ms ".format( (t2-t1)*1000))

Wrote Parquet in 21,754.87 ms 


## Step 6 : Saving ORC

**==> Measure the time taken to save as ORC and record in spreadsheet**  

In [5]:
import time
t1 = time.perf_counter()

clicksJson.write.orc("/data/click-stream/my-orc")

t2 = time.perf_counter()
print ("Wrote ORC in {:,.2f} ms ".format( (t2-t1)*1000))

Wrote ORC in 23,731.34 ms 


## STEP 7 : Querying Parquet Data

**==> Note how quickly the data is loaded; measure this time and record in spreadsheet**   
**==> and schema is inferred!**  

Parquet format has built-in schema, so Spark doesn't have to parse the files as needed in JSON format

In [6]:
import time
t1 = time.perf_counter()

clicksParquet = spark.read.parquet("/data/click-stream/my-parquet")

t2 = time.perf_counter()
print ("Read Parquet in {:,.2f} ms ".format( (t2-t1)*1000))

clicksParquet.createOrReplaceTempView("clicks_parquet")

Read Parquet in 141.38 ms 


### 7.5  - Query Parquet

**==> Notice the time took and record in spreadsheet**    
**==> Why parquet is so quick to process?** 

In [7]:
import time
t1 = time.perf_counter()

spark.sql("SELECT MAX(cost) FROM clicks_parquet").show()

t2 = time.perf_counter()
print ("MAX Parquet in {:,.2f} ms ".format( (t2-t1)*1000))

+---------+
|max(cost)|
+---------+
|      180|
+---------+

MAX Parquet in 447.72 ms 


## STEP 8 : Load ORC
**==> Note the load time and record in spreadsheet**   

In [8]:
import time
t1 = time.perf_counter()

clicksORC = spark.read.orc("/data/click-stream/my-orc")

t2 = time.perf_counter()
print ("Read ORC in {:,.2f} ms ".format( (t2-t1)*1000))

clicksORC.createOrReplaceTempView("clicks_orc")

Read ORC in 42.09 ms 


### 8.5 - Query ORC

**==> Measure query time and record in spreadsheet**

In [9]:
import time
t1 = time.perf_counter()

spark.sql("SELECT MAX(cost) FROM clicks_orc").show()

t2 = time.perf_counter()
print ("MAX ORC in {:,.2f} ms ".format( (t2-t1)*1000))

+---------+
|max(cost)|
+---------+
|      180|
+---------+

MAX ORC in 708.59 ms 


## Step 9 : Compare Data Sizes

sample output 
```
1.3G	/data/click-stream/json
118M	/data/click-stream/my-parquet
101M	/data/click-stream/my-orc
```

**==> Record the byte sizes in spreadsheet**  

In [10]:
# for human readable format
!  du -skh  /data/click-stream/json  /data/click-stream/my-parquet  /data/click-stream/my-orc

1.4G	/data/click-stream/json
113M	/data/click-stream/my-parquet
100M	/data/click-stream/my-orc


In [11]:
# size in bytes for spreadsheet
! du -b /data/click-stream/json  /data/click-stream/my-parquet  /data/click-stream/my-orc

# in Mac use `du -k`
# ! du -k /data/click-stream/json  /data/click-stream/my-parquet  /data/click-stream/my-orc

1415191006	/data/click-stream/json
118405632	/data/click-stream/my-parquet
104521112	/data/click-stream/my-orc


## BONUS : Compressed JSON

We are going to store JSON files in compressed gzip format

**==> Compress the files**

```bash
$    cd   ~/data/click-stream
$   ./compress-json.sh
```

This will create compressed JSON in `json-gz` directory

**==> Inspect directory sizes**

```bash
    # bytes for spreadsheet
    $    du -b json    json-gz   parquet 

    # human readable format
    $    du -skh  json    json-gz   parquet 
```

Sample output

```
1.3G    json
154M    json-gz
 77M    parquet
```

**==> Load compressed json files in Spark shell and do the same processing**  
**==> Look at `atop` window to see resource usage**

In [14]:
#note the parsing time
import time


clicksJgz = spark.read.json("/data/click-stream/json-gz")


clicksParquet.createOrReplaceTempView("clicks_jsongz")


t1 = time.perf_counter()
# calculate the max cost
#notice the time took
spark.sql("SELECT MAX(cost) FROM clicks_jsongz").show()
t2 = time.perf_counter()
print ("Json-gz quiery in {:,.2f} ms ".format( (t2-t1)*1000))
# output : Job 7 finished: show at console:22, took 8.066727 s


+---------+
|max(cost)|
+---------+
|      180|
+---------+

Json-gz quiery in 156.55 ms 


### STEP 9 : Analyze / discuss results

Here are numbers from my run:

```
|format   | storage size |  loading time | query time : max(cost)|
|---------|:-------------|:--------------|:---------------------:|
| json    |  1.3 G       |  8.3 s        |   4.6 s               |
| json.gz |  154 M       |  8.5 s        |   4.1 s               | 
| parquet |  101 M       |    0 s        |   0.23 s              | 
| ORC     |  113 M       |    0 s        |   0.76 s              | 
```

**==> Also discuss your findings from `atop`.  Which resource 'ceiling' we are hitting first?  CPU / Memory / Disk ?**