# Query GS data
- Create a hive table (`wdi_gs`) against the gs wdi_2016 data.
- Count number of rows from the wdi_gs table

In [1]:
-- NOTE semicolon is not allowed in Zeppelin
DROP TABLE IF EXISTS wdi_gs;
CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_julienguyen/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");

-- show table meta data
DESCRIBE FORMATTED wdi_gs;

In [2]:
DROP TABLE IF EXISTS wdi_csv_text;
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/julienguyen/hive/wdi/wdi_csv_text';

In [3]:
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs

In [4]:
SELECT count(countryName) FROM wdi_csv_text

In [5]:
%sh

cd ~
hdfs  dfs -get  hdfs:///user/julienguyen/hive/wdi/wdi_csv_text .
cd wdi_csv_text

#calculate current directory size

du -ch .
#1.8G	total

echo 3 | sudo tee /proc/sys/vm/drop_caches
date +%s && cat * | wc && date +%s



# Parsing Issue

- Find the parsing issue within wdi_csv_text by creating a debug table
- Create a table using OpenCSV Serde

In [7]:
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;

In [8]:
DROP TABLE IF EXISTS wdi_gs_debug;
CREATE EXTERNAL TABLE wdi_gs_debug
(line STRING) 
LOCATION 'hdfs:///user/julienguyen/hive/wdi/wdi_csv_text'
TBLPROPERTIES ("skip.header.line.count"="1")

In [9]:
SELECT line FROM wdi_gs_debug
WHERE line like "%\(\% of urban population\)\"%"

### Parsing Issue

The CSV parsing showed issues with incomplete column names, containing columns that did not exist. In order to debug the problem, it is efficient to output the whole row to see where the parsing issue arises. The CSV parsing failed because the SerDe parser splits columns by looking for a comma. The IndicatorCode attribute contains a comma between the code (ex. "Population living in slums, (% of urban population)") which causes the SerDe parser to split one column into two different ones. In order to fix this issue, it would be done by being more specific with specific cases when parsing data.

In [11]:
DROP TABLE IF EXISTS wdi_opencsv_gs;
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_julienguyen/datasets/wdi_2016'

In [12]:
DROP TABLE IF EXISTS wdi_opencsv_text;
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/julienguyen/hive/wdi/wdi_opencsv_text'

In [13]:
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

In [14]:
SELECT distinct(indicatorcode)
FROM wdi_opencsv_text
ORDER BY indicatorcode
LIMIT 20;

In [15]:
SELECT count(countryName) FROM wdi_opencsv_text

In [16]:
SELECT count(countryName) FROM wdi_csv_text

### Compare execution time between `wdi_opencsv_text` and `wdi_csv_text`

When comparing `wdi_opencsv_text` and `wdi_csv_text` execution times, it is observed that `wdi_opencsv_text` is significantly slower to compile. The reason for this is that Serde parsing is slow to compile as it generates significant amounts of generic code. It is typically complex and costly to run Serde for serialization/deserialization. Additionally, the parser splits columns more accurately by considering more specific cases, such as commas within quotation marks not being counted. This limits the performance in order to provide more accuracy.

# OpenCSVSerde Limitation

- Create an OpenCSV Serde view and cast the column data_type to be correct.

In [19]:
DESCRIBE FORMATTED wdi_opencsv_text

In [20]:
DESCRIBE FORMATTED wdi_csv_text

### Compare `wdi_opencsv_text` and `wdi_csv_text` metadata

The data type is deserialized to all strings in `wdi_opencsv_text`, despite that some columns such as `year` and `indicatorvalue` should be int or float data types. Meanwhile, the `wdi_csv_text` converts all columns to their expected data type. The data can be converted to their respective types for `wdi_opencsv_text` by creating a View over the table that does CAST to the desired type.

In [22]:
DROP VIEW IF EXISTS wdi_opencsv_text_view;

CREATE VIEW IF NOT EXISTS wdi_opencsv_text_view
AS
SELECT CAST(year AS INTEGER), countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT) FROM wdi_opencsv_text

In [23]:
DESCRIBE FORMATTED wdi_opencsv_text_view

# Write a HiveQL to find the 2015 GDP growth (annual %) for Canada

1. Write an HQL to find out the correct indicator name/code.
2. Write an HQL to find the 2015 Canada GDP growth.

In [25]:
DROP VIEW IF EXISTS wdi_opencsv_gdp;

CREATE VIEW IF NOT EXISTS wdi_opencsv_gdp
AS
    SELECT CAST(indicatorvalue AS FLOAT) AS GDP_growth_value, CAST(year AS INTEGER), countryname
    FROM wdi_opencsv_text
    WHERE countryname = 'Canada' and indicatorname LIKE ('%GDP growth (annual \%)%') and year = '2015'


In [26]:

SELECT * FROM wdi_opencsv_gdp

### 2015 Canada GDP Growth HQL

The above query is inefficient and takes a long time to compile because there's too much data. The query is reading from the entire table of over a million records in order to find a single one. In order to optimize the query, it may be more efficient to partition the tables into groupings such as records from 2015. This will allow for a smaller dataset to be queried, as well as allow us to reuse the table for future queries.

# Hive Partitions

- Optimize the previous query using Hive Partitions

In [29]:
DROP TABLE IF EXISTS wdi_opencsv_text_partitions;
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions 
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT) PARTITIONED BY (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/julienguyen/hive/wdi/wdi_opencsv_text_partition'

In [30]:
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
set hive.stats.column.autogather=false;
FROM wdi_opencsv_text
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION (year)
    SELECT countryName, countryCode, indicatorName, indicatorCode, indicatorValue, year

In [31]:
%sh

hdfs dfs -du -h hdfs:///user/julienguyen/hive/wdi/wdi_opencsv_text_partition
#hdfs dfs -du -h hdfs:///user/julienguyen/hive/wdi/wdi_opencsv_text

In [32]:
DROP VIEW IF EXISTS wdi_opencsv_gdp_partitions;

CREATE VIEW IF NOT EXISTS wdi_opencsv_gdp_partitions
AS
    SELECT CAST(indicatorvalue AS FLOAT) AS GDP_growth_value, CAST(year AS INTEGER), countryname
    FROM wdi_opencsv_text_partitions
    WHERE countryname = 'Canada' and indicatorname LIKE ('%GDP growth (annual \%)%') and year = '2015'


In [33]:
SELECT * FROM wdi_opencsv_gdp_partitions

### 2015 GDP Growth With Hive Partitions

The time taken to execute the 2015 Canada GDP Growth HQL has significantly decreased when using `wdi_opencsv_text_partitions` instead of `wdi_opencsv_text`. This was because there is less data in the tables, therefore it was only required to query data where `year=2015`, cutting the amount of data by at least 50x as there are more than 50 years listed in the dataset.

# Columnar File Optimization

- Create a table that is stored as `parquet`
- Load data from `wdi_opencsv_gs` to `wdi_csv_parquet`
- Compare runtime by executing a query that counts the number of records in each table.

In [36]:
DROP TABLE IF EXISTS wdi_csv_parquet;
CREATE EXTERNAL TABLE wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/julienguyen/hive/wdi/wdi_csv_parquet'

In [37]:
set hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE wdi_csv_parquet
    SELECT year, countryName, countryCode, indicatorName, indicatorCode, indicatorValue FROM wdi_opencsv_gs

In [38]:
%sh

hdfs dfs -du hdfs:///user/julienguyen/hive/wdi/wdi_csv_parquet
hdfs dfs -du hdfs:///user/julienguyen/hive/wdi/wdi_opencsv_text

### Compare file sizes betwen `wdi_csv_parquet` and `wdi_opencsv_text`


The file size of `wdi_opencsv_text` is significantly larger than the file size of `wdi_csv_parquet`, with 1066622664 and 62772966 respectively for the first part of the dataset. This is almost 16 times greater than when stored with parquet. Using parquet optimizes the file through file compression, making it smaller with several encoding methods such as dictionary encoding, bit packing and run-length encoding (RLE).

In [40]:
SELECT count(countryName) FROM wdi_csv_parquet;

In [41]:
SELECT count(countryName) FROM wdi_opencsv_text;

In [42]:
DROP VIEW IF EXISTS wdi_csv_parquet_gdp;

CREATE VIEW IF NOT EXISTS wdi_csv_parquet_gdp
AS
    SELECT CAST(indicatorvalue AS FLOAT) AS GDP_growth_value, CAST(year AS INTEGER), countryname
    FROM wdi_csv_parquet
    WHERE countryname = 'Canada' and indicatorname LIKE ('%GDP growth (annual \%)%') and year = '2015';

In [43]:

SELECT * FROM wdi_csv_parquet_gdp

In [44]:
SELECT * FROM wdi_opencsv_gdp

### Compare GDP growth performance between `wdi_openscv_text` and `wdi_csv_parquet`

The dataset when queried with `wdi_csv_parquet` is significantly faster than `wdi_opencsv_text`. The parquet-based filesystem allows for the query to focus on relevant data faster. The amount of data scanned will also be much smaller and allow for less I/O usage.

# Highest GDP Growth

- Find the year with the highest GDP growth using the `NY.GDP.MKTP.KD.ZG` indicator code for each country.
- Use Spark SQL to compare with Hive

In [47]:
DROP VIEW IF EXISTS wdi_csv_parquet_highest_gdp;

CREATE VIEW IF NOT EXISTS wdi_csv_parquet_highest_gdp
AS
    SELECT MAX(CAST(indicatorvalue AS FLOAT)) AS GDP_growth_value, countryName
    FROM wdi_csv_parquet
    WHERE indicatorCode LIKE '%NY.GDP.MKTP.KD.ZG%' and indicatorvalue > 0 --and (countryName LIKE 'Sierra Leone' OR countryName LIKE 'Kyrgyz Republic' OR countryName LIKE 'Estonia' OR countryName LIKE 'Belize' OR countryName LIKE 'Iraq')
    GROUP BY countryname
    ORDER BY countryname

In [48]:
SELECT DISTINCT w1.GDP_growth_value, w1.countryName, w2.year FROM wdi_csv_parquet_highest_gdp w1 JOIN wdi_csv_parquet w2 ON w1.GDP_growth_value = w2.indicatorvalue and w1.countryname = w2.countryname;

In [49]:
%spark.sql
show tables;

In [50]:
%spark.sql
SELECT DISTINCT w1.GDP_growth_value, w1.countryName, w2.year FROM wdi_csv_parquet_highest_gdp w1 JOIN wdi_csv_parquet w2 ON w1.GDP_growth_value = w2.indicatorvalue and w1.countryname = w2.countryname

### Compare Highest GDP Growth execution time between Spark and Hive interpreter

The Hive interpreter is significantly slower when running the Highest GDP Growth query, taking about a minute while Spark only took about 20 seconds. Spark has much faster query execution than Hive.


# Sort GDP by country and year

- Write a query that returns the GDP growth for all countries. Sort by countryName and year.

In [53]:
DROP VIEW IF EXISTS wdi_csv_parquet_gdp_all;

CREATE VIEW IF NOT EXISTS wdi_csv_parquet_gdp_all
AS
    SELECT countryName, CAST(year AS INTEGER), indicatorCode, CAST(indicatorvalue AS FLOAT) AS GDP_growth_value
    FROM wdi_csv_parquet
    WHERE indicatorCode LIKE '%NY.GDP.MKTP.KD.ZG%' --and (countryName LIKE 'South Asia')
    GROUP BY countryname, year, indicatorCode, indicatorValue
    ORDER BY countryname, year, indicatorCode, indicatorValue

In [54]:
SELECT * FROM wdi_csv_parquet_gdp_all