# Query GS data
- Create a hive table (`wdi_gs`) against the gs wdi_2016 data
- Count number of rows from `wdi_gs` table

In [1]:
%hive
DROP TABLE IF EXISTS wdi_gs


In [2]:
%hive
CREATE EXTERNAL TABLE wdi_gs (
year INTEGER,
countryName STRING, 
countryCode STRING, 
indicatorName STRING, 
indicatorCode STRING,
indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_dataeng_nkiruka/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

In [3]:
-- show table meta data
DESCRIBE FORMATTED wdi_gs

In [4]:
-- counts the number of rows from wdi_gs table
SELECT count(countryName) FROM wdi_gs

# Load GS Data to HDFS
- Create an external table, called wdi_csv_text
- Run and insert overwrite query that loads data from wdi_gs table to wdi_csv_text table
- Check HDFS file size for wdi_csv_text file using bash command

In [6]:
DROP TABLE IF EXISTS wdi_csv_text

In [7]:
-- Create Table wdi_csv_text
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/nkiruka/hive/wdi/wdi_csv_text'
TBLPROPERTIES ("skip.header.line.count"="1")

In [8]:
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs

In [9]:
-- Count the rows on wdi_csv_text table
SELECT count(countryName)
FROM wdi_csv_text

In [10]:
%sh
hdfs dfs -ls -h /user/nkiruka/hive/wdi/wdi_csv_text

In [11]:
%sh
hdfs dfs -du -s -h /user/nkiruka/hive/wdi/wdi_csv_text

# Running the count statement(observation) and Reason for change in Execution time
- Running the query for the first time, took 1min and 13sec
- Running the same count statement again, took 0.599sec because this task is already saved on the file system cache so it doesn't need to be read from the disk
- After clearning the cache for both master and the workers node from ssh, the count statement was ran again and this time, it took 28secs

In [13]:
%sh
free -m

In [14]:
-- After clearing the Cache
SELECT count(countryName) 
FROM wdi_csv_text


# Hive vs Bash
### Observation on performance result between the bash and Hive approaches 
- Copy wdi_csv_text HDFS file to the master node and count number of rows
- It took longer time using Hive appraoch than using bash command

In [16]:
%sh
cd ~
hdfs dfs -get hdfs:///user/nkiruka/hive/wdi/wdi_csv_text
cd wdi_csv_text

du -ch
echo 3 | sudo tee /proc/sys/vm/drop_caches

date +%s && cat * | wc && date +%s


# Parsing Issue
- SerDe treats all columns to be of type String
- The output from the query is not showing the desired result
- Creating another table without formatting with Serde

In [18]:
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;

In [19]:
DROP TABLE IF EXISTS wdi_gs_debug

In [20]:
-- Create an external table
CREATE EXTERNAL TABLE wdi_gs_debug
(line STRING)
LOCATION 'gs://jarvis_dataeng_nkiruka/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")


In [21]:
%hive
Select * from wdi_gs_debug
WHERE line like "%\(\% of urban population\)\"%"

 
# Creating a Table with OpenCSV SerDe

In [23]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_gs

In [24]:
%hive
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_dataeng_nkiruka/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

In [25]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text


In [26]:
%hive
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/nkiruka/hive/wdi/wdi_opencsv_text'
TBLPROPERTIES ("skip.header.line.count"="1")

In [27]:
%hive
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

In [28]:
%hive
SELECT distinct(indicatorcode)
FROM wdi_opencsv_text
ORDER BY indicatorcode
LIMIT 20;

In [29]:
%hive
SELECT count(countryName)
FROM wdi_opencsv_text

In [30]:
%hive
SELECT count(countryName)
FROM wdi_csv_text

# Compare execution time between wdi_opencsv_text and wdi_csv_text
-  wdi_opencsv_text took 1min27 sec to execute while wdi_csv_textz took 27sec to execute
-  wdi_opencsv_text was slower 


In [32]:
%hive
DESCRIBE FORMATTED wdi_opencsv_text


In [33]:
%hive
DESCRIBE FORMATTED wdi_csv_text

# compare describe formatted wdi_csv_text and describe formatted wdi_opencsv_text
- This SerDe treats all columns to be of type String. Even if a table is created with non-string column types using this SerDe, the DESCRIBE TABLE output would show string column type
- That's the reason behined the change in the column types
- To convert columns to the desired type in a table, you can create a view over the table that does the CAST to the desired type.

In [35]:
%hive
DROP VIEW IF EXISTS wdi_opencsv_text_view

In [36]:
%hive
CREATE VIEW IF NOT EXISTS wdi_opencsv_text_view
AS
SELECT CAST(year as integer), countryName, countryCode, indicatorName, indicatorCode,CAST(indicatorvalue as float)
from wdi_opencsv_text


# 2015 Canada GDP Growth HQL
- Running the query to select 2015 Canada GDP growth took 1min 21sec and can be improved using dynamic partition
- Partitioning wdi table executed faster for 22sec


In [38]:
%hive
Select indicatorValue As GDP_growth_value, year, countryName
from wdi_opencsv_text_view
where indicatorName like '%GDP growth%'  and year = '2015' and countryName = 'Canada'

# why 2015 Canada GDP query took so long to execute?
- It has to query a lot of rows to get a match from the select statement
- Resolved by creating a partition

In [40]:
%hive
-- drop TABLE wdi_opencsv_text_partitions
drop TABLE IF EXISTS wdi_opencsv_text_partitions


In [41]:
%hive
create EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED BY (YEAR INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   "separatorChar" = "\t",
   "quoteChar"     = "'",
   "escapeChar"    = "\\"
) 
LOCATION 'hdfs:///user/nkiruka/hive/wdi/wdi_opencsv_text_partitions'

In [42]:
%hive
set hive.exec.dynamic.partition.mode=nonstrict;

In [43]:
%hive
insert overwrite table wdi_opencsv_text_partitions
select countryName, countryCode, indicatorName, indicatorCode, indicatorValue, year
from wdi_opencsv_text

In [44]:
%hive
Select indicatorValue As GDP_growth_value, year, countryName
from  wdi_opencsv_text_partitions
where indicatorName like '%GDP growth%'  and year = '2015' and countryName = 'Canada'

# Columnar File Optimization
- create a table wdi_csv_parquet
- Compare file sizes between wdi_csv_parquet and wdi_opencsv_text

In [46]:
%hive
drop table if exists wdi_csv_parquet


In [47]:
%hive
create external table wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
location 'hdfs:///user/nkiruka/hive/wdi/wdi_csv_parquet'

In [48]:
%hive
-- Load data from wdi_opencsv_gs to wdi_csv_parquet
insert overwrite table wdi_csv_parquet
select * from  wdi_opencsv_gs

# Compare file sizes between wdi_csv_parquet and wdi_opencsv_text
- wdi_csv_parquet has a file size of 137.2M
- wdi_opencsv_text has a file size of 2.3G

In [50]:
%sh
hdfs dfs -du -s -h /user/nkiruka/hive/wdi/wdi_csv_parquet

In [51]:
%sh
hdfs dfs -du -s -h /user/nkiruka/hive/wdi/wdi_opencsv_text

# Compare runtime of executing a query between wdi_csv_parquet and wdi_opencsv_text
- wdi_csv_parquet took 24sec to execute
- wdi_opencsv_text took 1min 27sec to execute, shows that execution time was shorter on wdi_csv_parquet table

In [53]:
%hive
select count(countryName)
from wdi_csv_parquet

In [54]:
%hive
select count(countryName)
from wdi_opencsv_text

# Compare the performance of executing the same query to get the 2015 Canada GDP growth on both tables 
- it took 29sec to execute the query for 2015 canada GDP growth on wdi_csv_parquet table
- it took 1min 27 sec to execute the query for 2015 Canada GDP growth on wdi_opencsv_text table


In [56]:
%hive
Select indicatorValue As GDP_growth_value, year, countryName
from wdi_csv_parquet
where indicatorName like '%GDP growth%'  and year = '2015' and countryName = 'Canada'

In [57]:
%hive
Select indicatorValue As GDP_growth_value, year, countryName
from wdi_opencsv_text
where indicatorName like '%GDP growth%'  and year = '2015' and countryName = 'Canada'

 
# Highest GDP Growth
- Find the highest GDP growth (NY.GDP.MKTP.KD.ZG) year for each country
- Execute the same query using SparkSQL
- Compare the execution time with the Hive Tez query
- Hive took 1min 20sec and sparksql took 22sec, this showed better performance with sparksql

In [59]:
%hive
select parquet1.indicatorValue GDP_growth_value, parquet1.year, parquet1.countryName
from wdi_csv_parquet parquet1,
(
select countryName,  max(indicatorValue) max_GDP
from wdi_csv_parquet parquet2
where indicatorcode = 'NY.GDP.MKTP.KD.ZG' 
group by countryName
) parquet2
where 
parquet1.countryName = parquet2.countryName and 
parquet1.indicatorvalue = max_GDP and  
parquet1.indicatorValue!= '0.0'

In [60]:
%spark.sql
show tables

In [61]:
%spark.sql
select parquet1.indicatorValue GDP_growth_value, parquet1.year, parquet1.countryName
from wdi_csv_parquet parquet1,
(
select countryName,  max(indicatorValue) max_GDP
from wdi_csv_parquet parquet2
where indicatorcode = 'NY.GDP.MKTP.KD.ZG' 
group by countryName
) parquet2
where 
parquet1.countryName = parquet2.countryName and 
parquet1.indicatorvalue = max_GDP and  
parquet1.indicatorValue!= '0.0'

 
# Sort GDP by country and year
- GDP Growth for all coutries, sort by countryName and year.

In [63]:
%hive
select 
 countryName,year, indicatorcode,  indicatorvalue
from wdi_csv_parquet
where indicatorcode = 'NY.GDP.MKTP.KD.ZG'
sort by countryName, year
