#### Nariman Alimuradov
Jarvis Hive Project Notebook

Load Data to HDFS

In [2]:
DROP TABLE IF EXISTS wdi_gs;

CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' 
LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_nariman/datasets/wdi_2016'

In [3]:
DROP TABLE IF EXISTS wdi_csv_text;

CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' 
LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/nariman/hive/wdi/wdi_csv_text'

In [4]:
%sh
hdfs dfs -ls hdfs:///user/nariman/hive/wdi/wdi_csv_text

In [5]:
%sh 
hdfs dfs -cat  hdfs:///user/nariman/hive/wdi/wdi_csv_text/000000_0| head -100


In [6]:
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs

In [7]:
-- Cache Observations
-- Below is a query that gets the number of entries in the wdi_csv_text table
SELECT count(countryName) FROM wdi_csv_text;

-- After clearing master and worker caches, fetching the size of the table took: 35.948 seconds
-- On a subsequent run, it took: 15.472 seconds
-- This is because a run will store data into the cache, so that subsequent runs don't take as long. It is the reason caches even exist.


In [8]:
-- Hive vs Bash Performance Results
-- The below cell contains a Bash approach to counting the rows in the data. The Bash solution took approximately 21 seconds when cache is cleared (compare this to 36 seconds in Hive).
-- This is likely because of the overhead required by Hive to set up the worker nodes and push the computation through the whole pipeline whereas the Bash solution simply reads the file without any of that extra work.

In [9]:
%sh

cd ~
hdfs  dfs -get  hdfs:///user/nariman/hive/wdi/wdi_csv_text .

cd wdi_csv_text
du -ch .

echo 3 | sudo tee /proc/sys/vm/drop_caches

date +%s && cat * | wc && date +%s

Parsing Issue

In [11]:
SELECT *
FROM wdi_csv_text
LIMIT 100

In [12]:
-- We are separating by commas and because there are commas within entries, it is splitting it into separate lines. Had we not selected for distinct values, we would get lots of '% of export of goods' lines as they appear in every entry.

In [13]:
DROP TABLE IF EXISTS wdi_gs_debug;

CREATE EXTERNAL TABLE wdi_gs_debug
(line STRING)
LOCATION 'gs://jarvis_data_eng_nariman/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");

In [14]:
SELECT line
FROM wdi_gs_debug
LIMIT 100

In [15]:
DROP TABLE IF EXISTS wdi_opencsv_gs;

CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_nariman/datasets/wdi_2016'

In [16]:
DROP TABLE IF EXISTS wdi_opencsv_text;

CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/nariman/hive/wdi/wdi_opencsv_text';

In [17]:
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

In [18]:
SELECT *
FROM wdi_opencsv_text
LIMIT 10

In [19]:
SELECT distinct(indicatorcode)
FROM wdi_opencsv_text
ORDER BY indicatorcode
LIMIT 20

In [20]:
SELECT count(countryName) 
FROM wdi_opencsv_text

In [21]:
SELECT count(countryName)
FROM wdi_csv_text

In [22]:
-- 1 min 26 sec for wdi_opencsv_text, 12 sec for wdi_csv_text
-- It's slower because we are using the SerDe that must go through each line and change it to fit our needs. This adds a massive amount of overhead.

OpenCSVSerde Limitation

In [24]:
DROP VIEW IF EXISTS wdi_opencsv_text_view;

CREATE VIEW wdi_opencsv_text_view
AS
SELECT year, countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT)
FROM wdi_opencsv_text;

2015 Canada GDP Growth HQL

In [26]:
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_opencsv_text_view
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada"

In [27]:
-- Searching through each row to find matches will take linear time, and this can be problematic when dealing with such large datasets. 
-- We can speed this up by potentially using a columnar file, or by partitioning and bucketing our view table. This will allow us to look through less data to get our results.

Hive Partitions

In [29]:
SET hive.exec.dynamic.partition.mode = nonstrict;
DROP TABLE IF EXISTS wdi_opencsv_text_partitions;

CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED BY (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/nariman/hive/wdi/wdi_opencsv_text_partitions';


In [30]:
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION (year)
SELECT countryName, countryCode, indicatorName, indicatorCode, indicatorValue, year
FROM wdi_opencsv_text

In [31]:
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_opencsv_text_partitions
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada"

In [32]:
%sh
hdfs dfs -ls /user/nariman/hive/wdi/wdi_opencsv_text_partitions

In [33]:
-- 57 partitions were created.
-- Our previous execution time was 1 minute 23 seconds, but partitioning the data brought that down to 13 seconds.
-- This is because we no longer need to check all our data, but instead we can check the partition(s) that contain what we need.

Columnar File Optimization

In [35]:
DROP TABLE IF EXISTS wdi_csv_parquet;

CREATE EXTERNAL TABLE wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/nariman/hive/wdi/wdi_csv_parquet';

In [36]:
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT * FROM wdi_opencsv_gs

In [37]:
%sh
hdfs dfs -du -h /user/nariman/hive/wdi/wdi_csv_parquet
hdfs dfs -du -h /user/nariman/hive/wdi/wdi_opencsv_text

In [38]:
SELECT count(countryName)
FROM wdi_csv_parquet;


In [39]:
SELECT count(countryName)
FROM wdi_opencsv_text;

In [40]:
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_csv_parquet
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada";

In [41]:
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_opencsv_text
WHERE indicatorName LIKE "%GDP growth%" AND year = 2015 AND countryName = "Canada";

In [42]:
-- The wdi_csv_parquet table is approximately 20 times smaller than the wdi_opencsv_text table, due to the columnar format which groups columns beside each other.
-- For both the count and GDP queries, our columnar file optimization reduced runtime by approximately 5 times. Since the data is partitioned by column, our queries that require us to find certain values from each row become faster.
-- This is because we no longer need to check each entry in the table, rather we check the singular column that contains our information.

Highest GDP Growth

In [44]:
SELECT distinct table1.indicatorValue AS GDP_growth_value, table1.countryName, table1.year
FROM wdi_csv_parquet table1
INNER JOIN (
    SELECT max(indicatorValue) AS GDP_growth_value, countryName
    FROM wdi_csv_parquet
    WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
    GROUP BY countryName
    ) table2
ON table1.indicatorValue = table2.GDP_growth_value AND table1.countryName = table2.countryName
ORDER BY GDP_growth_value DESC;

In [45]:
%spark.sql
SELECT distinct table1.indicatorValue AS GDP_growth_value, table1.countryName, table1.year
FROM wdi_csv_parquet table1
INNER JOIN (
    SELECT max(indicatorValue) AS GDP_growth_value, countryName
    FROM wdi_csv_parquet
    WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
    GROUP BY countryName
    ) table2
ON table1.indicatorValue = table2.GDP_growth_value AND table1.countryName = table2.countryName
ORDER BY GDP_growth_value DESC;

In [46]:
-- Execution time for Tez: 1 minute 29 seconds
-- Execution time for Spark: 1 minute 5 seconds

Sort GDP by Country and Year

In [48]:
SELECT countryName, year, indicatorCode, indicatorValue
FROM wdi_csv_parquet
WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
SORT BY countryName, year;