# Query GS data
- Create a hive table `wdi_gs` against the gs wdi_2016 data.
- Count number of rows from the wdi_gs table

In [1]:
%hive
DROP TABLE IF EXISTS wdi_gs

In [2]:
%hive
CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, 
    indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_tarek/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.count"="1")

In [3]:
%hive
DESCRIBE FORMATTED wdi_gs

In [4]:
%hive
SELECT COUNT(countryName) AS countRows 
FROM wdi_gs

# Load GS Data to HDFS
### Create `wdi_csv_text` table

In [6]:
%hive
DROP TABLE IF EXISTS wdi_csv_text;

In [7]:
%hive
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/ewang/hive/wdi/wdi_csv_text'

### Load data from `wdi_gs` table to `wdi_csv_text` table

In [9]:
%hive
INSERT OVERWRITE TABLE wdi_csv_text
SELECT *
FROM wdi_gs

### Check HDFS file size for `wdi_csv_text` file 

In [11]:
%sh
hdfs dfs -du -s -h /user/tarek/hive/wdi/wdi_csv_text

### Counting total number of rows of the table


In [13]:
%hive
SELECT count(countryName) 
FROM wdi_csv_text

- If you excute the count query again, you would notice that it took less excution time because the file system cache holds data that was recently read from the disk, making it possible for subsequent requests to obtain data from cache rather than having to read it again from the disk

### Clear the filesystem cache and count the number of rows

In [16]:
%sh
echo 3 | sudo tee /proc/sys/vm/drop_caches

In [17]:
%hive
SELECT count(countryName) 
FROM wdi_csv_text

- It took less excution time after clearing the filesystem cache which makes sense since the cache has only the data we won't which makes the searching process faster.  

# Hive vs Bash
### Copy `wdi_csv_text` HDFS file to the master node and write a command that counts number of rows

In [20]:
%sh

#SSH to master node
cd ~
hdfs  dfs -get  hdfs:///user/ewang/hive/wdi/wdi_csv_text .
cd wdi_csv_text
#calculate current directory size
du -ch .
#1.8G	total

#clear fs cache
echo 3 | sudo tee /proc/sys/vm/drop_caches
#bash row count
date +%s && cat * | wc && date +%s


 
- Bash needed less time than Hive to excute the query.


# Parsing issue

In [23]:
%hive
SELECT distinct(indicatorCode)
FROM wdi_csv_text
ORDER BY indicatorCode
LIMIT 20;

### Creating a debug table to identify issue

In [25]:
%hive
DROP TABLE IF EXISTS wdi_gs_debug


In [26]:
%hive
CREATE EXTERNAL TABLE wdi_gs_debug
(text STRING)
LOCATION 'gs://jarvis_data_eng_tarek/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

In [27]:
%hive
SELECT text 
FROM wdi_gs_debug
WHERE text like "%\(\% of urban population\)\"%"

 - We can see from the previous queries that we have a parsing issue. It happened because we are seperating fields by comma, when we created our table. Some quoted values have commas inside of them and they are getting seperated. To overcome this problem, we can use SerDe format as we will see bellow

### Create `wdi_opencsv_gs` table with OpenCSV SerDe

In [30]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_gs

In [31]:
%hive
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, 
    indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_tarek/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

### Create `wdi_opencsv_text` destination table


In [33]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text


In [34]:
%hive
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, 
    indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/tarek/hive/wdi/wdi_opencsv_text'

### Load data from `wdi_opencsv_gs` table to `wdi_opencsv_text` table

In [36]:
%hive
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * 
FROM wdi_opencsv_gs

In [37]:
%hive
SELECT distinct(indicatorcode)
FROM wdi_opencsv_text
LIMIT 20

- It solved the parsing issue but it's taking more time to excute.


### Compare execution time between `wdi_csv_text` and `wdi_opencsv_text`

In [40]:
%hive
SELECT count(countryName) 
FROM wdi_csv_text

In [41]:
%hive
SELECT count(countryName) 
FROM wdi_opencsv_text

- It took 31 seconds to excute the query in wdi_csv_text table, while it took 1 minutes and 36 seconds in wdi_opencsv_text table. wdi_csv_text uses LazySimpleSerDe which is faster than OpenCSVSerde (wdi_opencsv_text). LazySimpleSerDe does not work well when the data does not have values enclosed in quotes (As we saw before), OpenCSVSerde does extra work in fix the parsing issue that we had, which means more excuting time needed

# OpenCSVSerde limitaion
- This SerDe treats all columns to be of type String. Even if you create a table with non-string column types using this SerDe, the DESCRIBE TABLE output would show string column type. The type information is retrieved from the SerDe. To convert columns to the desired type in a table, you can create a view over the table

In [44]:
%hive
DESCRIBE FORMATTED wdi_opencsv_text


In [45]:
%hive
DESCRIBE FORMATTED wdi_csv_text

### Create a view on top of `wdi_opencsv_text`


In [47]:
%hive
DROP VIEW IF EXISTS wdi_opencsv_text_view

In [48]:
%hive
CREATE VIEW IF NOT EXISTS wdi_opencsv_text_view
AS
SELECT CAST (year AS INTEGER), countryName, countryCode, indicatorName, indicatorCode, 
    CAST(indicatorValue AS FLOAT)
FROM wdi_opencsv_text

In [49]:
%hive
DESCRIBE FORMATTED wdi_opencsv_text_view

# 2015 Canada GDP Growth HQL

### Finding the correct indicator name/code

In [52]:
%hive
SELECT countryName, indicatorCode
FROM wdi_opencsv_text_view
WHERE year=2015 AND countryName='Canada' AND indicatorName LIKE "%GDP growth (annual %)%" 


### Finding 2015 Canada `GDP growth` based on NY.GDP.MKTP.KD.ZG


In [54]:
%hive
SELECT indicatorValue AS GDP_growth_value, year, countryName 
FROM wdi_opencsv_text_view
WHERE year=2015 AND countryName='Canada' AND indicatorCode='NY.GDP.MKTP.KD.ZG'

- It takes around 1 minutes and 41 seconds to excute the query. That's because it needs to search over the whole data, which means it has a linear time (`O(N)`).
- We can use Columnar File, Partitions or buckets in order to optimize the excution time by searching over part of the data.


# Partition optimization

In [57]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text_partitions;

In [58]:
%hive
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, 
    indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED BY (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/tarek/hive/wdi/wdi_opencsv_text_partitions'

### Load data from `wdi_opencsv_text` to `wdi_opencsv_text_partitions`

In [60]:
%hive
set hive.exec.dynamic.partition.mode=nonstrict;
FROM wdi_opencsv_text
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions 
PARTITION(year)
SELECT countryName, countryCode, indicatorName, indicatorCode, indicatorValue, year

### Inspect how many partitions have been created for `wdi_opencsv_text_partitions` table

In [62]:
%sh
hdfs dfs -ls /user/tarek/hive/wdi/wdi_opencsv_text_partitions


### Re-run 2015 GDP Growth HQL against the `wdi_opencsv_text_partitions` table 

In [64]:
%hive
SELECT indicatorValue AS GDP_growth_value, year, countryName 
FROM wdi_opencsv_text_partitions
WHERE year=2015 AND countryName='Canada' AND indicatorCode='NY.GDP.MKTP.KD.ZG'


- We managed to reduce the excution time of the query from 1 minutes and 41 seconds to 24 seconds by using partitions. We got a partition for each year (total 57 partitions) which speeds up the excution time since we are scanning the partition of year 2015 only.


# Columnar File Optimization


In [67]:
%hive
DROP TABLE IF EXISTS wdi_csv_parquet


In [68]:
%hive
CREATE EXTERNAL TABLE IF NOT EXISTS wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, 
    indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/tarek/hive/wdi/wdi_csv_parquet'

### Load data from `wdi_opencsv_gs` to `wdi_csv_parquet`

In [70]:
%hive
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT * 
FROM wdi_opencsv_gs 

### Compare file sizes between `wdi_csv_parquet` and `wdi_opencsv_text`

In [72]:
%sh
hdfs dfs -du -s -h /user/tarek/hive/wdi/wdi_opencsv_text


In [73]:
%sh
hdfs dfs -du -s -h /user/tarek/hive/wdi/wdi_csv_parquet

- The file size of wdi_csv_parquet (137.2 MB) is significantly smaller than wdi_opencsv_text (2.3 GB)


### Compare runtime

In [76]:
%hive
SELECT count(countryName) FROM wdi_opencsv_text

In [77]:
%hive
SELECT count(countryName) FROM wdi_csv_parquet

- The runtime to excute the query in wdi_opencsv_text table is 1 minutes and 37 seconds, while it took 29 seconds to run the same query in wdi_csv_parquet table


### Execute `2015 GDP Growth` HQL against `wdi_csv_parquet` and `wdi_opencsv_text` tables

In [80]:
%hive
SELECT indicatorValue AS GDP_growth_value, year, countryName 
FROM wdi_opencsv_text
WHERE year=2015 AND countryName='Canada' AND indicatorCode='NY.GDP.MKTP.KD.ZG'

In [81]:
%hive
SELECT indicatorValue AS GDP_growth_value, year, countryName 
FROM wdi_csv_parquet
WHERE year=2015 AND countryName='Canada' AND indicatorCode='NY.GDP.MKTP.KD.ZG'

- The performance improved by using Columnar File Optimization since it took 31 seconds to excute the query in wdi_csv_parquet.

# Highest GDP Growth
### Find the highest `GDP growth` (NY.GDP.MKTP.KD.ZG) year for each country.

In [84]:
%hive
SELECT MAX(indicatorValue) AS GDP_growth_value, year, countryName 
FROM wdi_csv_parquet
WHERE indicatorCode='NY.GDP.MKTP.KD.ZG' AND indicatorName LIKE "%GDP growth (annual %)%"
GROUP BY countryName,year
ORDER BY GDP_growth_value DESC




### Execute the same query using SparkSQL

In [86]:
%spark.sql
SELECT MAX(indicatorValue) AS GDP_growth_value, year, countryName 
FROM wdi_csv_parquet
WHERE indicatorCode='NY.GDP.MKTP.KD.ZG' AND indicatorName LIKE "%GDP growth (annual %)%"
GROUP BY countryName,year
ORDER BY GDP_growth_value DESC

- The execution time with the Hive Tez query is much faster than Spark interpreter, which it took 33 seconds to run the query with Hive Tez and 4 minutes and 49 seconds to run the same query with Spark


# Sort GDP by country and year
### Write a query that returns `GDP Growth` for all coutries

In [89]:
%hive
SELECT countryName, year, indicatorCode, indicatorValue AS GDP_growth_value
FROM wdi_csv_parquet
WHERE indicatorCode='NY.GDP.MKTP.KD.ZG'
SORT BY countryName, year
