# Work with wdi data by importing into tables, storing in HDFS and apply Hadoop MapReduce in order to solve queries.

# Query GS data
- Create a hive table (`wdi_gs`) against the gs wdi_2016 data.
- Count number of rows from the wdi_gs table


In [2]:
DROP TABLE IF EXISTS wdi_gs

In [3]:
CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_shadab/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

In [4]:
-- show table meta data
DESCRIBE FORMATTED  wdi_gs

In [5]:
-- Count countries
SELECT count(countryName) as count FROM wdi_gs

Clear caches.

In [7]:
%sh

cd ~
hdfs dfs -get hdfs:///user/sshaikh/hive/wdi/wdi_csv_text .
cd wdi_csv_text

du -ch .

echo 3 | sudo tee /proc/sys/vm/drop_caches
date +%s && cat * | wc && date +%s

### Parsing data

Show distinct indicatorCodes to understand how the column was parsed.

In [10]:
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20

In [11]:
SELECT * 
FROM wdi_csv_text
LIMIT 10

Due to the fact that we are delimiting columns by commas the indicator name is spilling over to the indicatorcode value. In order to see this parsing issue another table will be created `wdi_gs_debg` without any SerDe parsing.

In [13]:
DROP TABLE IF EXISTS wdi_gs_debug;

In [14]:
CREATE EXTERNAL TABLE wdi_gs_debug (fullLine STRING)
LOCATION 'gs://jarvis_data_eng_shadab/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");


In [15]:
SELECT fullLine FROM wdi_gs_debug
WHERE fullLine like "%\(\% of urban population\)\"%"
LIMIT 100

 
The query above shows that Indicator name is being parsed as indicator code because of the SerDe delimiter looking for commas. Let's apply OpenCSVSerde which better parses the columns.

In [17]:
DROP TABLE IF EXISTS wdi_opencsv_gs;


In [18]:
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   "separatorChar" = ","
)  
STORED AS TEXTFILE
LOCATION 'gs://jarvis_data_eng_shadab/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");

In [19]:
DROP TABLE IF EXISTS wdi_opencsv_text;

In [20]:
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   "separatorChar" = ","
)  
STORED AS TEXTFILE
LOCATION 'hdfs:///user/sshaikh/hive/wdi/wdi_opencsv_text';

In [21]:
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

 
### Verify parsing

In [23]:
SELECT distinct(indicatorcode)
FROM wdi_opencsv_text
ORDER BY indicatorcode
LIMIT 20;


 
### Comparing execution times

In [25]:
SELECT count(countryName) FROM wdi_opencsv_text

In [26]:
SELECT count(countryName) FROM wdi_csv_text

 

The query against wdi_csv_text performs much faster than against wdi_opencsv_test because it is simply using SerDe and a basic comma delimitation. wdi_opencsv uses OpenCSVSerde which parses the indicator codes properly but also requires more computational overhead for its parsing mechanisms.

### Compare data types between wdi_csv_text and wdi_opencsv_text to see that OpenCSV has all data types as String.

In [29]:
DESCRIBE FORMATTED wdi_opencsv_text 


In [30]:
DESCRIBE FORMATTED wdi_csv_text

In [31]:
DROP VIEW IF EXISTS wdi_opencsv_text_view;

In [32]:
CREATE VIEW IF NOT EXISTS wdi_opencsv_text_view
AS
SELECT CAST (year AS INT), countryName, countryCode, indicatorName, indicatorCode, CAST (indicatorValue AS FLOAT)
FROM wdi_opencsv_text;

In [33]:
DESCRIBE FORMATTED wdi_opencsv_text_view

 

Task: Find GDP growth for Canada in 2015.
Try to see indicator columns which contain GDP information in 2015.

In [35]:
SELECT indicatorName, indicatorValue, year, countryname
FROM wdi_opencsv_text_view
WHERE countryname = "Canada" AND indicatorname LIKE  "%GDP%" AND year = "2015"

Narrow down the search terms by the desired indicatorName.

In [37]:
SELECT indicatorValue AS GDP_growth_value, year, countryname
FROM wdi_opencsv_text_view
WHERE countryname = "Canada" AND indicatorname = "GDP growth (annual %)" AND year = "2015"

The query takes long to complete due to the numerous where clause conditions which are finding a specific metric in the table. This process can be sped up by taking advantage of partitions.

## Hive Partitioning
Partion the data by year to speed up the query result. Create a new table called wdi_opencsv_text_partitions. And check the partitions with the ls command.

In [40]:
set hive.exec.dynamic.partition = true

In [41]:
set hive.exec.dynamic.partition.mode=nonstrict

In [42]:
DROP TABLE IF EXISTS wdi_opencsv_text_partitions

In [43]:
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions 
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED BY (year INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   "separatorChar" = ","
)  
STORED AS TEXTFILE
LOCATION 'hdfs:///user/sshaikh/hive/wdi/wdi_opencsv_text_partitions';

In [44]:
show tables

In [45]:
FROM wdi_opencsv_text
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions 
PARTITION(year)
SELECT countryname, countrycode, indicatorname, indicatorcode, indicatorvalue, year

In [46]:
SELECT indicatorValue AS GDP_growth_value, year, countryname
FROM wdi_opencsv_text_partitions
WHERE countryname = "Canada" AND indicatorname = "GDP growth (annual %)" AND year = "2015"

In [47]:
%sh
hdfs dfs -ls /user/sshaikh/hive/wdi/wdi_opencsv_text_partitions

 
## File optimization using Columnar file. Store table as parquet.

In [49]:
DROP TABLE IF EXISTS wdi_csv_parquet

In [50]:
CREATE EXTERNAL TABLE wdi_csv_parquet
(year INT, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/sshaikh/hive/wdi/wdi_opencsv_parquet';

In [51]:
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT * FROM wdi_opencsv_gs

In [52]:
%sh
hdfs dfs -du -s -h /user/sshaikh/hive/wdi/wdi_opencsv_parquet

In [53]:
%sh
hdfs dfs -du -s -h /user/sshaikh/hive/wdi/wdi_opencsv_text

 
The filesize of the parquet table is smaller compared to the wdi_opencsv_text because it is storing binary data in a column oreiented way. The column data is orgazined in a way that they are all adjacent which makes for better compression.


In [55]:

SELECT count(countryName) FROM wdi_csv_parquet;

In [56]:
SELECT count(countryName) FROM wdi_opencsv_text

In [57]:
SELECT indicatorValue AS GDP_growth_value, year, countryname
FROM wdi_csv_parquet
WHERE countryname = "Canada" AND indicatorname = "GDP growth (annual %)" AND year = "2015"

In [58]:
SELECT indicatorValue AS GDP_growth_value, year, countryname
FROM wdi_opencsv_text
WHERE countryname = "Canada" AND indicatorname = "GDP growth (annual %)" AND year = "2015"

 
Comparing the performance of wdi_csv_parquet and wdi_opencsv_text we see that the above queries run faster for the columnar file table than the regular table. The time difference between the count query takes 22 seconds vs. 1 min 42 seconds. The 2015 GDP query took 16 seconds vs. 1 min22 seconds.

## Find highest GDP growth for each country and the year it achieved it.
Run in both Tez and Spark.

In [60]:
SELECT v.indicatorValue as GDP_growth_value, min(v.year), v.countryName
FROM(SELECT MAX(indicatorValue) as indicatorValue, countryName
FROM wdi_csv_parquet
WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG"
GROUP BY countryName) AS gd, wdi_opencsv_text_view as v
WHERE v.countryName = gd.countryName AND v.indicatorValue = gd.indicatorValue
GROUP BY v.countryName, v.indicatorValue
ORDER BY v.countryName




In [61]:
%spark.sql
SELECT v.indicatorValue as GDP_growth_value, min(v.year), v.countryName
FROM(SELECT MAX(indicatorValue) as indicatorValue, countryName
FROM wdi_csv_parquet
WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG"
GROUP BY countryName) AS gd, wdi_opencsv_text_view as v
WHERE v.countryName = gd.countryName AND v.indicatorValue = gd.indicatorValue
GROUP BY v.countryName, v.indicatorValue
ORDER BY v.countryName

The query for finding the highest GDP growth was run through both Tez and spark. The execution was faster for Spark (2min21sec) than for Spark (2min51sec). Perhaps Spark benefits from processing this query within the optimized parquet form.


### Find GDP growth (annual %) for each country by year.


In [64]:
SELECT countryName, year, indicatorCode, indicatorValue
FROM wdi_opencsv_text_view
WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG"
ORDER BY countryName, year
