# Query GC data
- Create a hive table('wdi_gs') against the gs wdi_2016 data.
- Count number of rows from the wdi_gs table

In [1]:
DROP TABLE IF EXISTS wdi_gs

In [2]:
CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jrvs-data-eng-olena/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

In [3]:
DESCRIBE FORMATTED wdi_gs

In [4]:
SELECT count(countryName) FROM wdi_gs

In [5]:
DROP TABLE IF EXISTS wdi_csv_text;

In [6]:
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/olena/hive/wdi/wdi_csv_text';

In [7]:
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs

In [8]:
SELECT count(countryName) FROM wdi_csv_text

In [9]:
SELECT count(countryName) FROM wdi_csv_text
--comparing runtime calling it twice
--cached data is gathered faster

# Hive vs Bash
We cleared the caches and execures the query.
Every next query will take less time comparing to the first execution (data is gathered from cache)

* Bash is faster than Hive
* Hive need to load the files from nodes into the Hive tables. Then it can perform Tez tasks with YARN
* Bash can access and perform operations with local files

In [11]:
%sh
#SSH to master node
cd ~
hdfs  dfs -get  hdfs:///user/olena/hive/wdi/wdi_csv_text .
cd wdi_csv_text
#calculate current directory size
du -ch .
#1.8G	total

#clear fs cache
echo 3 | sudo tee /proc/sys/vm/drop_caches
#bash row count
date +%s && cat * | wc && date +%s


# Parsing Issues
* Delimiters (such as commas) can produce unexpected/unwanted results
To check if there are unwanted delimiters in the data, import the data again, without delimiting the fields


In [13]:
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;


In [14]:
DROP TABLE IF EXISTS wdi_gs_debug;

In [15]:
--creating a debug table to display entire rows, instead of separate the data into columns
CREATE EXTERNAL TABLE wdi_gs_debug
(line STRING)
LOCATION 'gs://jrvs-data-eng-olena/datasets/wdi_2016'

In [16]:
--show all lines with wrong indicator
SELECT DISTINCT(line) FROM wdi_gs_debug
WHERE line like "%\(\% of urban population\)\"%"

## Table with OpenCSV SerDe
The OpenCSV SerDe can solve the parsing problem. Hovewer, it will treat all column's data as STRING

In [18]:
DROP TABLE IF EXISTS wdi_opencsv_gs;
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year STRING, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jrvs-data-eng-olena/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count" = "1")

In [19]:
DROP TABLE IF EXISTS wdi_opencsv_text;
CREATE EXTERNAL TABLE wdi_opencsv_text
(year STRING, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/olena/hive/wdi/wdi_opencsv_text'

In [20]:
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

In [21]:
SELECT distinct(indicatorcode) 
FROM wdi_opencsv_text
ORDER BY indicatorcode

In [22]:
SELECT COUNT(countryName) FROM wdi_opencsv_text

In [23]:
SELECT count(countryName) FROM wdi_csv_text

# OpenCSVSerde Limitation
OpenCSVSerde comes with some limintations. One of them is the way Serde treats all data in columns as String (even if the data in the csv is a non-string). To solve it, we need to create a view and cast string columns as other datatypes.

In [25]:
DESCRIBE FORMATTED wdi_opencsv_text


In [26]:
DESCRIBE FORMATTED wdi_csv_text

In [27]:
DROP VIEW IF EXISTS wdi_opencsv_text_view

In [28]:
CREATE VIEW IF NOT EXISTS wdi_opencsv_text_view
AS
SELECT CAST(year AS INTEGER), countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT)
FROM wdi_opencsv_text

In [29]:
DESCRIBE FORMATTED wdi_opencsv_text_view

# 2015 Canada GDP Growth HQL

In [31]:
-- find 2015 Canada GDP growth
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_opencsv_text_view
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada"


The runtime is slow (1 min 32 sec.) because we are going through each row ina huge dataset to find matches, so it will take O(n) time. 
To optimize it, partitioning and bucketing of the table can be used. It will help us to go through less data to get our results.

# Hive Partitions

In [34]:

DROP TABLE IF EXISTS ;

CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED BY (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/olena/hive/wdi/wdi_opencsv_text_partitions'


* To avoid the fatal error raised, when there are too many dynamic partitions created we need to group the rows by the dynamic partition columns in the mapper
and distribute them to the reducers where the dynamic partitions will be created. In this case the number of distinct dynamic partitions will be significantly reduced. 

In [36]:
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION (year)
SELECT countryName, countryCode, indicatorName, indicatorCode, indicatorValue, year FROM wdi_opencsv_text


In [37]:
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_opencsv_text_partitions
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada"

Using partitioning we managed to achieve 4 times faster execution (23 sec.) time. It happened because we splitted the data into partitions that contain what we need,
and no longer need to through all data at once.

# Columnar File Optimization

In [40]:
DROP TABLE IF EXISTS wdi_csv_parquet;

CREATE EXTERNAL TABLE wdi_csv_parquet
(year INT, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/olena/hive/wdi/wdi_csv_parquet'



In [41]:
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT * FROM wdi_opencsv_gs

In [42]:
%sh
hdfs dfs -du -h /user/olena/hive/wdi/wdi_csv_parquet
hdfs dfs -du -h /user/olena/hive/wdi/wdi_opencsv_text

In [43]:
SELECT count(countryName) FROM wdi_csv_parquet

In [44]:
SELECT count(countryName) FROM wdi_opencsv_text

In [45]:
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_csv_parquet
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada"

In [46]:
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_opencsv_text
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada"

The wdi_csv_parquet table is smaller (approx. 20 times) than the wdi_opencsv_text table. It was achieved because we used columnar file (it stores data by columns rather than by rows).
Created columnar database can write and read more efficiently from the hard disk storage.

The columnar file optimizations reduced the runtime of all our querries by 3 times. 
This happened because we only need to check a single column with needed information and not each entry in the table.

# Highest GDP Growth

In [49]:
-- Hive Tez query
SELECT distinct t1.indicatorValue as GDP_growth_value, t1.year, t1.countryName
FROM wdi_csv_parquet t1
INNER JOIN (
    SELECT max(indicatorValue) AS GDP_growth_value, countryName
    FROM wdi_csv_parquet
    WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
    GROUP BY countryName
    ) t2 
ON t1.indicatorValue = t2.GDP_growth_value AND t1.countryName = t2.countryName
ORDER BY GDP_growth_value DESC;


In [50]:
%spark.sql
SELECT distinct t1.indicatorValue as GDP_growth_value, t1.countryName, t1.year
FROM wdi_csv_parquet t1
INNER JOIN (
    SELECT max(indicatorValue) AS GDP_growth_value, countryName
    FROM wdi_csv_parquet
    WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
    GROUP BY countryName
    ) t2 
ON t1.indicatorValue = t2.GDP_growth_value AND t1.countryName = t2.countryName
ORDER BY GDP_growth_value DESC;

Hive TEZ query (1 min 37 sec.) is almost twice slower than Spark.SQL (55 sec.)

# Sort GDP by country and year

In [53]:
SELECT countryName, year, indicatorCode, indicatorValue
FROM wdi_csv_parquet
WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
SORT BY countryName, year