# Hussain Fathi

## Jarvis Hive Project Notebook

### Load Data to HDFS

In [2]:
%hive
DROP TABLE IF EXISTS wdi_gs


In [3]:
%hive
CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode String, indicatorName String, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_hussain/datasets/wdi_2016'
TBLPROPERTIES("skip.header.line.count"="1")

In [4]:
%hive
DESCRIBE FORMATTED wdi_gs

In [5]:
%hive
SELECT count(countryName) as count FROM wdi_gs

In [6]:
%hive
DROP TABLE IF EXISTS wdi_csv_text

In [7]:
%hive
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode String, indicatorName String, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/hussain/hive/wdi/wdi_csv_text'
TBLPROPERTIES("skip.header.line.count"="1")

In [8]:
%sh
hdfs dfs -ls hdfs:///user/hussain/hive/wdi/wdi_csv_text

In [9]:
%hive
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs

In [10]:
%sh
hdfs dfs -du -s -h /user/hussain/hive/wdi/wdi_csv_text

In [11]:
%hive
SELECT count(countryName) FROM wdi_csv_text


When clearning the maste and worker nodes caches, the execution of the count query took 19.83 seconds.
On the subsequent run, it took 14.83 seconds to run the query. This is because the filesystem cache holds data that was recently read from the disk, making it possible for 
the subsequent request to obtain the data from the cache instead of having to read it again from disk


In [13]:
%sh

#SSH to master node
cd ~
hdfs dfs -get hdfs:///user/hussain/hive/wdi/wdi_csv_text .
cd wdi_csv_text

#calculate current directory size
du -ch .

#clear fs cache
echo 3 | sudo tee /proc/sys/vm/drop_caches

#bash row count
date +%s && cat * | wc && date +%s



#### Hive vs Bash
Counting the number of rows using a Bash script took around 17 seconds which is faster compared to the Hive query which took around 20 seconds. This is due to the time 
required by Hive to setup the worker nodes and allocate the jobs into those worker nodes. On the contrary, through the use of a Bash script, the extra work is not required


### Parsin Issue

In [16]:
%hive
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;

In this case separating using a comma can produce unexpected results as can be seen from the output data.


In [18]:
%hive
DROP TABLE IF EXISTS wdi_gs_debug


In [19]:
%hive
CREATE EXTERNAL TABLE wdi_gs_debug
(line STRING)
LOCATION 'gs://jarvis_data_eng_hussain/datasets/wdi_2016'

In [20]:
%hive
SELECT DISTINCT(line) FROM wdi_gs_debug
WHERE line like "%\(\% of urban population\)\"%"

### Table with OpenCSV SerDe


In [22]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_gs

In [23]:
%hive
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode String, indicatorName String, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_hussain/datasets/wdi_2016'

In [24]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text

In [25]:
%hive
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode String, indicatorName String, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/hussain/hive/wdi/wdi_opencsv_text'

In [26]:
%hive
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

In [27]:
%hive
SELECT distinct(indicatorCode)
FROM wdi_opencsv_text
ORDER BY indicatorCode

#### Open CSV SerDe Limitation
One of the limitations associated with this approach is that Serde treats all the data in the columns as Strings. To Solve  this issue we need to create a view over the table that does the CAST to the desired type


In [29]:
%hive
DESCRIBE FORMATTED wdi_opencsv_text


In [30]:
%hive
DROP VIEW IF EXISTS wdi_opencsv_text_view

In [31]:
%hive
CREATE VIEW wdi_opencsv_text_view
AS
SELECT year, countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT)
FROM wdi_opencsv_text

In [32]:
%hive
DESCRIBE FORMATTED wdi_opencsv_text_view

### 2015 Canada GDP Growth HQL 


In [34]:
%hive
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_opencsv_text_view
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada"


The runtime of this approach took 84.21 seconds which is very slow. This is because the query searches through every line in the data, thus having a time complexity of O(n). 
To optimize the performace partitioning and bucketing needs to be employed.


### Hive Partitions

In [37]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text_partitions


In [38]:
%hive
CREATE EXTERNAL TABLE IF NOT EXISTS wdi_opencsv_text_partitions
(CountryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED BY (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/hussain/hive/wdi/wdi_opencsv_text_partitions/'

In [39]:
%hive
SET hive.exec.dynamic.partition.mode=nonstrict;

FROM wdi_opencsv_text
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions 
PARTITION (year)
SELECT countryName, countryCode, indicatorName, indicatorCode, indicatorValue, year


In [40]:
%hive
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_opencsv_text_partitions
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada"


### Columnar File Optimization


In [42]:
%hive
DROP TABLE IF EXISTS wdi_csv_parquet


In [43]:
%hive
CREATE EXTERNAL TABLE wdi_csv_parquet
(year INTEGER, CountryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/hussain/hive/wdi/wdi_csv_parquet' 



In [44]:
%hive
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT * FROM wdi_opencsv_gs

In [45]:
%sh
hdfs dfs -du -h /user/hussain/hive/wdi/wdi_csv_parquet
hdfs dfs -du -h /user/hussain/hive/wdi/wdi_opencsv_text


In [46]:
%hive
SELECT COUNT(countryName)
FROM wdi_csv_parquet


In [47]:
%hive
SELECT COUNT(countryName)
FROM wdi_opencsv_text

In [48]:
%hive
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_csv_parquet
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada"

In [49]:
%hive
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_opencsv_text
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada"

Using the Columnar file format alowed us to reduce the table size by almost 20 times when compared to using the OpenCSVSerDe.

In addition to reducing the table size, runtime was also reduced by about 4 times. This is because the data is clustered by column, allowing queries to run faster by skipping unwanted columns.


### Highest GDP Growth

In [52]:
%hive
SELECT distinct t1.indicatorValue AS GDP_growth_value, t1.year, t1.countryName
FROM wdi_csv_parquet t1
INNER JOIN (
    SELECT max(indicatorValue) AS GDP_growth_value, countryName
    FROM wdi_csv_parquet
    WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
    GROUP BY countryName
) t2
ON t1.indicatorValue = t2.GDP_growth_value AND t1.countryName = t2.countryName
ORDER BY GDP_growth_value DESC

In [53]:
%spark.sql
SELECT distinct t1.indicatorValue AS GDP_growth_value, t1.year, t1.countryName
FROM wdi_csv_parquet t1
INNER JOIN (
    SELECT max(indicatorValue) AS GDP_growth_value, countryName
    FROM wdi_csv_parquet
    WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
    GROUP BY countryName
) t2
ON t1.indicatorValue = t2.GDP_growth_value AND t1.countryName = t2.countryName
ORDER BY GDP_growth_value DESC

The Hive query took 1 minute and 33 seconds to execute
The Spark query took 54 seconds to execute


### Sort GDP by Country and Year

In [56]:
%hive
SELECT countryName, year, indicatorCode, indicatorValue
FROM wdi_csv_parquet
WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
SORT BY countryName, year
