# Query GS data
- Create a hive table (`wdi_gs`) against the gs wdi_2016 data.
- Count number of rows from the wdi_gs table

In [1]:
DROP TABLE IF EXISTS wdi_gs

In [2]:
CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_tina/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

In [3]:
-- show table meta data
DESCRIBE FORMATTED wdi_gs

In [4]:
SELECT count(countryName) as count FROM wdi_gs

## Load GS Data to HDFS
The `wdi_2016.csv` file stored in Google Cloud Storage is copied and stored in HDFS. Then a query that count the number of rows in the newly created Hivet able (`wdi_csv_text`) is executed twice. The first execution is appoximately 2 times slower than the second execution, becuase the  file system cache holds data that was recently read from the disk, making it possible for subsequent requests to obtain data from cache rather than having to read it again from the disk.

In [6]:
DROP TABLE IF EXISTS wdi_csv_text

In [7]:
--Create an external table, called wdi_csv_text
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/yijen28lee/hive/wdi/wdi_csv_text'

In [8]:
--loads data from wdi_gs table to wdi_csv_text table
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs

In [9]:
--run select statement twice and compare the run time
SELECT count(countryName) FROM wdi_csv_text

In [10]:
SELECT count(countryName) FROM wdi_csv_text
--second run was 3 sec faster than the first one, because the file system cache holds data that was recelty read from the disk.

## Parsing
The indicatorname column contains string that may contain commas and this is a problem when we use default SimpleLazySerDe to convert our data into Hive table. Instead we can use OpenCSV SerDe to parse our data into Hive columns.

In [12]:
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20

In [13]:
DROP TABLE IF EXISTS wdi_gs_debug

In [14]:
--create a debug table
CREATE EXTERNAL TABLE wdi_gs_debug
(line STRING)
LOCATION 'gs://jarvis_data_eng_tina/datasets/wdi_2016'

In [15]:
--query all lines contain wrong indicator
SELECT DISTINCT(line) FROM wdi_gs_debug 
WHERE line like "%(\% of urban population)\"%"

In [16]:
DROP TABLE IF EXISTS wdi_opencsv_gs

In [17]:
--#1 create wdi_opencsv_gs and load GS data with OpenCSVSerde
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_tina/datasets/wdi_2016'

In [18]:
SELECT distinct(indicatorCode) from wdi_opencsv_gs

In [19]:
DROP TABLE IF EXISTS wdi_opencsv_text

In [20]:
--#2 create wdi_opencsv_text destination table
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/yijen28lee/hive/wdi/wdi_opencsv_text'

In [21]:
--#3 load datafrom wdi_opencsv_gs to wdi_opencsv_text
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

In [22]:
--#4 verify parsing
SELECT distinct(indicatorCode) from wdi_opencsv_text

In [23]:
--#5 compare execution time 
SELECT count(countryName) FROM wdi_opencsv_text

## OpenCSVSerde limitation
There are some limitations using OpenCSVSerde and one of them is that OpenCSVSerde treats every column datatype as string even when the column datatype in csv is non-string. We could solve this by creating a view and cast string columns as other datatype.

In [25]:
--compare two table
DESCRIBE FORMATTED wdi_opencsv_text 

In [26]:
DESCRIBE FORMATTED wdi_csv_text

In [27]:
DROP VIEW IF EXISTS wdi_opencsv_view

In [28]:
--create a view
CREATE VIEW wdi_opencsv_view AS
SELECT cast(year AS INTEGER), countryname, countrycode, indicatorcode, cast(indicatorvalue AS FLOAT)
FROM wdi_opencsv_text

## 2015 Canada GDP Growth
A query is executed to find 2015 GDP growth for Canada.

In [30]:
-- find the correct indicator/code
SELECT distinct(indicatorcode), indicatorname
FROM wdi_opencsv_text
WHERE indicatorcode LIKE '%GDP%'

In [31]:
-- find 2015 Canada GDP
SELECT indicatorvalue, year, countryname
FROM wdi_opencsv_text
WHERE countryname = 'Canada' AND indicatorcode = 'NY.GDP.MKTP.KD.ZG' AND year = '2015'

## Hive Partition
In the previous section a query is executed to find the GDP growth of Canada in 2015. However the runtime is more than one minute, which is slow.To solve this problem, we partition the table by year, so when we specify a certain year in the WHERE clause, the system will only go through year specified instead of all the rows in the table.

In [33]:
DROP TABLE IF EXISTS wdi_opencsv_text_partitions

In [34]:
-- create table which partitioned by year
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING ,indicatorValue FLOAT)
PARTITIONED BY (year INTEGER)
ROW FORMAT DELIMITED
LOCATION 'hdfs:///user/yijen28lee/hive/wdi/wdi_opencsv_text_partitions'

In [35]:
set hive.exec.dynamic.partition.mode=nonstrict 
--set dynamic partition, so we don't have to manage which partition have to be create and inserted into from the input table

In [36]:
--load data
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION (year)
SELECT countryname, countrycode, indicatorname, indicatorcode, indicatorvalue, year
FROM wdi_opencsv_text

In [37]:
%sh
# check how many partition created
hdfs dfs -ls hdfs:///user/yijen28lee/hive/wdi/wdi_opencsv_text_partitions

In [38]:
--retrive 2015 GDP growth for Canada
SELECT indicatorvalue, year, countryname
FROM wdi_opencsv_text_partitions
WHERE countryname = 'Canada' AND indicatorcode = 'NY.GDP.MKTP.KD.ZG' AND year = '2015'

## Columnar File Optimization
A columnar database stores data by columns rather than by rows. Columnar database can efficiently write and read data from hard disk storage in order to speed up the time it takes to return a query. The following section optimize HQL query using columnar method.

In [40]:
DROP TABLE IF EXISTS wdi_csv_parquet

In [41]:
--create table stored as PARQUET
CREATE EXTERNAL TABLE IF NOT EXISTS wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING ,indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/yijen28lee/hive/wdi/wdi_csv_parquet'

In [42]:
--load data from wdi_opencsv_gs
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT * FROM wdi_opencsv_gs

In [43]:
%sh
# compare file size between wdi_csv_parquet and wdi_opencsv_text
hdfs dfs -du -s -h hdfs:///user/yijen28lee/hive/wdi/wdi_csv_parquet

In [44]:
%sh
hdfs dfs -du -s -h hdfs:///user/yijen28lee/hive/wdi/wdi_opencsv_text

In [45]:
--compare runtime
SELECT count(countryname) FROM wdi_opencsv_text

In [46]:
SELECT count(countryname) FROM wdi_csv_parquet

In [47]:
-- execute 2015 Canada GDP Growth HQL against two table
SELECT indicatorvalue, year, countryname
FROM wdi_opencsv_text
WHERE countryname = 'Canada' AND indicatorcode = 'NY.GDP.MKTP.KD.ZG' AND year = '2015'

In [48]:
SELECT indicatorvalue, year, countryname
FROM wdi_csv_parquet
WHERE countryname = 'Canada' AND indicatorcode = 'NY.GDP.MKTP.KD.ZG' AND year = '2015'

## Highest GDP growth
Here we find the highest GDP_growth_value for each country and its corresponding year.

In [50]:
-- find highest GDP Growth year for each country
SELECT wcp.indicatorvalue as GDP_growth_value, wcp.year, wcp.countryname
FROM (
    SELECT max(indicatorvalue) as value, countryname 
    FROM wdi_csv_parquet
    WHERE indicatorcode = 'NY.GDP.MKTP.KD.ZG' AND indicatorvalue != 0
    GROUP BY countryname 
) m
INNER JOIN wdi_csv_parquet  wcp ON m.value = wcp.indicatorvalue AND m.countryname = wcp.countryname
ORDER BY GDP_growth_value DESC

In [51]:
%spark.sql
SELECT wcp.indicatorvalue as GDP_growth_value, wcp.year, wcp.countryname
FROM (
    SELECT max(indicatorvalue) as value, countryname 
    FROM wdi_csv_parquet
    WHERE indicatorcode = 'NY.GDP.MKTP.KD.ZG' AND indicatorvalue != 0
    GROUP BY countryname 
) m
INNER JOIN wdi_csv_parquet  wcp ON m.value = wcp.indicatorvalue AND m.countryname = wcp.countryname
ORDER BY GDP_growth_value DESC

## Sort GDP
Displays GDP_growth_value for every country in the table and sort the result based on countryname and year.

In [53]:
SELECT countryname, year, indicatorcode, indicatorvalue
FROM wdi_csv_parquet
WHERE indicatorcode = 'NY.GDP.MKTP.KD.ZG' 
DISTRIBUTE BY countryname
SORT BY countryname ASC, year ASC