# Query GS data
- Created a hive table (`wdi_gs`) against the GS wdi_2016 data.
- Counting number of rows from the wdi_gs table 


In [1]:
DROP TABLE IF EXISTS wdi_gs



In [2]:
CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_gozde/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");



In [3]:
-- show table meta data
DESCRIBE FORMATTED wdi_gs;

In [4]:
-- counts the number of rows from wdi_gs table

SELECT count(countryName) as count  FROM wdi_gs

# Load GS data to HDFS

In [6]:
DROP TABLE IF EXISTS wdi_csv_text




In [7]:

CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/garslan/hive/wdi/wdi_csv_text';

- Query that loads data from wdi_gs table to wdi_csv_text table.


In [9]:
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs


 

- Check HDFS file size for wdi_csv_text file 

In [11]:
%sh
 hdfs dfs -ls -h /user/garslan/hive/wdi/wdi_csv_text

In [12]:
%sh
 hdfs dfs -du -s -h /user/garslan/hive/wdi/wdi_csv_text


In [13]:
SELECT count(countryName) as count  FROM wdi_csv_text


 
- Clear filesystem cache and execute the count query again.

In [15]:
%sh 

echo 3 | sudo tee /proc/sys/vm/drop_caches


## Hive vs Bash

Bash took 3 seconds to execute, while Hive took 20 seconds. The total time  of  the Hive technique was longer. Because of the cost of parsing queries, developing an implementation of execution plan, and performing  Hadoop  Map Reduce task.

In [17]:
%sh
#SSH to master node
cd ~
hdfs  dfs -get  hdfs:///user/garslan/hive/wdi/wdi_csv_text .
cd wdi_csv_text
#calculate current directory size
du -ch .
#1.8G	total

#clear fs cache
echo 3 | sudo tee /proc/sys/vm/drop_caches
#bash row count
date +%s && cat * | wc && date +%s


In [18]:
%sh
cd ~
hdfs  dfs -get  hdfs:///user/garslan/hive/wdi/wdi_csv_text .

## Parsing Issue solved

The indicatorName column may contain commas that causes problem when parsing data by using LazysimpleSerDe. In this case opencsvSerDe should be used to parse data into columsn in the Hive table. 

Create an external table wdi_gs_debug with one column without any SerDe parsing



In [20]:

%hive
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20 


1. Identify issue by creating a debug table


In [22]:
%hive 

-- 1a. Create an external table wdi_gs_debug with one column without any SerDe parsing

CREATE EXTERNAL TABLE wdi_gs_debug
(line STRING)
ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION 'hdfs:///user/garslan/hive/wdi/wdi_csv_text';



In [23]:
%hive
-- 1b.Query the line that have parsing issue

SELECT line FROM wdi_gs_debug
WHERE line like "%\(\% of urban population\)\"%" 

2. Create a Table with OpenCSV SerDe

In [25]:
%hive

-- 2a.Create wdi_opencsv_gs source table (load GS data with OpenCSVSerde)

CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_gozde/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");



In [26]:
%hive
-- 2b. Create wdi_opencsv_text destination table (output table with hdfs location)

CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'hdfs:///user/garslan/hive/wdi/wdi_csv_text';

In [27]:
%hive
-- 2c.A HiveQL which Load data from wdi_opencsv_gs to 
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

In [28]:
%hive
-- 2d. Verifying the data parsing has been done correctly.

SELECT distinct(indicatorcode)
FROM wdi_opencsv_text
ORDER BY indicatorcode
LIMIT 20 


In [29]:
%hive

-- Comparation of  execution time between wdi_opencsv_text and wdi_csv_text
-- Usage of opencsvSerde makes execution slower than LazysimpleSerDe.

 SELECT count(countryName) FROM wdi_opencsv_text


# OpenCSVSerde limitaion



- Compare metadata of two file 


In [32]:
%hive 

DESCRIBE FORMATTED wdi_opencsv_text 


In [33]:
%hive
DESCRIBE FORMATTED wdi_csv_text

In [34]:
%hive
-- Create a view on top of wdi_opencsv_text to cast specific columns to correct data type.

DROP VIEW IF EXISTS wdi_opencsv_text_view;

CREATE VIEW IF NOT EXISTS wdi_opencsv_text_view
AS
SELECT cast(year as INTEGER), countryName, countryCode,indicatorName, indicatorCode,cast(indicatorValue as Float) from wdi_opencsv_text

In [35]:
%hive
--Check if the data types casted properly
DESCRIBE FORMATTED wdi_opencsv_text_view    

# 2015 Canada GDP Growth HQL


- Find 2015 GDP growth (annual %) for Canada.


In [38]:
%hive
SELECT year ,countryName , indicatorValue AS GDP_growth_value
FROM wdi_opencsv_text_view
WHERE indicatorName LIKE "%GDP growth%" AND year =2015 AND countryName ='Canada' 


# Hive Partitions

For big datasets, reading across each row may take longer, which could be an issue in the further process. Therefore, using partitions that makes a simple way to query a set of data. Tables or partitions are split into buckets to provide the data more structure and allow for more efficient searching.Bucketing is based on the value of the hash function of a table column. 

In [41]:
%hive

-- Create new table 'wdi_opencsv_text_partions' that partitioned by year
set hive.exec.dynamic.partition.mode=nonstrict;
DROP TABLE IF EXISTS wdi_opencsv_text_partitions;
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED by (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/garslan/hive/wdi/wdi_opencsv_text_partitions' ;

In [42]:
%hive
--Setting up the dynamic partition

set hive.exec.dynamic.partition.mode=nonstrict;


In [43]:
%hive

--Setting up the dynamic partition

set hive.exec.dynamic.partition.mode=nonstrict;

--Load data from wdi_opencsv_text to wdi_opencsv_text_partitions
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION (year)
SELECT countryName,countryCode,indicatorName, indicatorCode, indicatorValue,year
FROM wdi_opencsv_text


In [44]:
%sh
#Inspecting  how many partitions have been created for by using HDFS commands as partitions are sub-directory in the HDFS
 hdfs dfs -ls -h /user/garslan/hive/wdi/wdi_opencsv_text_partitions



In [45]:
%hive 
-- Retrive data 
SELECT year ,countryName , indicatorValue AS GDP_growth_value
FROM wdi_opencsv_text_view
WHERE indicatorName LIKE "%GDP growth%" AND year =2015 AND countryName ='Canada' 

# Columnar File Optimization

- Optimizing HQL query using columnar file. 


In [47]:
%hive
DROP TABLE IF EXISTS wdi_csv_parquet


In [48]:
%hive
--Create table
CREATE EXTERNAL TABLE wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/garslan/hive/wdi/wdi_csv_parquet';

In [49]:
%hive
-- Load data from wdi_opencsv_gs to wdi_csv_parquet

INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT *FROM wdi_opencsv_gs



In [50]:

%sh
#Inspecting  how many parquet have been created for by using HDFS commands as partitions are sub-directory in the HDFS
 hdfs dfs -ls -h /user/garslan/hive/wdi/wdi_csv_parquet


In [51]:
%sh

 hdfs dfs -ls -h /user/garslan/hive/wdi/wdi_opencsv_text



                                                                                                     Runtime comparison


In [53]:
%hive

SELECT count(countryName) FROM wdi_csv_parquet;




In [54]:
%hive
SELECT count(countryName) FROM wdi_opencsv_text;





In [55]:
%hive 
-- Retrive data from wdi_csv_parquet
SELECT year ,countryName , indicatorValue AS GDP_growth_value
FROM wdi_csv_parquet
WHERE indicatorName LIKE "%GDP growth%" AND year =2015 AND countryName ='Canada' 



In [56]:
%hive
-- Retrive data from wdi_opencsv_text
SELECT year ,countryName , indicatorValue AS GDP_growth_value
FROM wdi_opencsv_text
WHERE indicatorName LIKE "%GDP growth%" AND year =2015 AND countryName ='Canada' 

while wdi_csv_parquet took 31 seconds, wdi_opencsv_text took 1 min 39 seconds. Columnar file optimization drastically reduced the runtime.

# Highest GDP Growth


In [59]:
%hive 

--Finding  the highest GDP growth (NY.GDP.MKTP.KD.ZG) year for each country.
SELECT w.year ,w.countryName , w.indicatorValue AS GDP_growth_value
FROM
( 
   SELECT max(indicatorValue) as value, countryName
   FROM wdi_csv_parquet
   WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG' AND indicatorValue!=0
   GROUP BY countryName )t
 INNER JOIN wdi_csv_parquet w ON t.value = w.indicatorValue AND t.countryname=w.countryName
 ORDER BY GDP_growth_value DESC;





In [60]:
%spark.sql

--Executing the query using Spark to compare execution time 

SELECT w.year ,w.countryName , w.indicatorValue AS GDP_growth_value
FROM
( SELECT max(indicatorValue) as value, countryName
   FROM wdi_csv_parquet
   WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG' AND indicatorValue!=0
   GROUP BY countryName
   )m
 INNER JOIN wdi_csv_parquet w ON m.Value = w.indicatorValue AND w.countryname =m.countryName
 ORDER BY GDP_growth_value DESc




# Sort GDP by country and year



In [62]:

-- Retrieve GDP growth for all countries sorted by country name and year.

SELECT countryName, year,indicatorCode, indicatorValue
FROM wdi_csv_parquet
WHERE  indicatorCode ='NY.GDP.MKTP.KD.ZG'
DISTRIBUTE  BY countryName
SORT BY countryName , year