# Query GS data
- Create an external Hive table
- Write a query that counts the number of rows


In [1]:
DROP TABLE IF EXISTS wdi_gs;

In [2]:
CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_zach/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");

In [3]:
DESCRIBE FORMATTED wdi_gs;

In [4]:
SELECT count(countryName) FROM wdi_gs;

# Hive Project

## Load GS to HDFS
- HDFS home directory: `/user/ztbarlow66`

### External Table `wdi_csv_text`

In [7]:
DROP TABLE IF EXISTS wdi_csv_text;

In [8]:

CREATE EXTERNAL TABLE wdi_csv_text
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/ztbarlow66/hive/wdi/wdi_csv_text'

In [9]:
FROM wdi_gs
INSERT OVERWRITE TABLE wdi_csv_text
SELECT *;

**Foundings** `SELECT COUNT(countryName) FROM wdi_csv_text;`
- first select: 161.386 seconds
- second time: 8.236 seconds

*clearing filesystem cache* [on master and both workers]
- running query: 51.29 seconds

*Note:* these times are from overall job time and not just map reduce time.

## Hive vs Bash
-Copy `wdi_csv_text` HDFS file to the master node and count the number of rows

In [12]:
%sh
#SSH to master node
cd ~
hdfs  dfs -get  hdfs:///user/ztbarlow66/hive/wdi/wdi_csv_text .
cd wdi_csv_text
#calculate current directory size
du -ch .
#1.8G	total

#clear fs cache
echo 3 | sudo tee /proc/sys/vm/drop_caches
#bash row count
date +%s && cat * | wc && date +%s

# 1621001307
# 1621001328 -> 19 seconds

## Performance Analysis
As we can see there is a difference in running Hive and Bash commands to check the number of rows in the files.
The fastest **Hive** time being *8.236 secs* after the data has been cached is the quickest and slightly behind is the **bash** check at *19 secs*. This is where Hive will be so extremely useful because the more data will slow down bash but not really affect Hive's speed.

## Parsing Issue

In [15]:
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;

A possible issue to this problem is that the indicatorname column has a limit on it's characters and is being split to the two columns and overriding the data already in the indicatorcode column.

Below is the creation of a debug table and then the output of the table to find an issue.

In [17]:
DROP TABLE IF EXISTS wdi_gs_debug;

CREATE EXTERNAL TABLE wdi_gs_debug 
    (line String)
ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_zach/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");

DESCRIBE FORMATTED wdi_gs_debug;

In [18]:
SELECT line 
FROM wdi_gs_debug
WHERE line LIKE "%\(\% of urban population\)\"%"

From this it seems to be parsed on the comma within the indicatorname. For example "Access to electricity, urban (% of urban population)", it would possibly be split at the comma after electricity.

In [20]:
DROP TABLE IF EXISTS wdi_opencsv_gs;
CREATE EXTERNAL TABLE IF NOT EXISTS wdi_opencsv_gs
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_zach/datasets/wdi_2016'

In [21]:
DROP TABLE IF EXISTS wdi_opencsv_text;

CREATE EXTERNAL TABLE IF NOT EXISTS wdi_opencsv_text
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION "hdfs:///user/ztbarlow66/hive/wdi/wdi_opencsv_text"

In [22]:
INSERT OVERWRITE TABLE wdi_opencsv_text SELECT * FROM wdi_opencsv_gs;

In [23]:
SELECT distinct(indicatorcode) FROM wdi_opencsv_text;

### Difference between run times of queries
- Block 1: 20secs
- Block 2: 2 min 54 sec

The non-OpenCSVSerDe is a lot faster than the OpenCSVSerDe. This could be due to OpenCSVSerDe being a more complicated SerDe, and when it is run on each line in the files, it will cause an increase in job execution time.

In [25]:
SELECT count(countryName) from wdi_csv_text;

In [26]:
SELECT count(countryname) FROM wdi_opencsv_text;

## OpenCSVSerde limitaion

As we can see from the descriptions the OpenCSVSerDe turns each column into a string from a deserializer, the only way to get the desired type is to create a view of the table. This is what is causing the time difference between the two queries.

In [28]:
DESCRIBE FORMATTED wdi_opencsv_text;

In [29]:
DESCRIBE FORMATTED wdi_csv_text;

Create a view of the text view to query off of and show the data in their actual format.

In [31]:
DROP VIEW IF EXISTS wdi_opencsv_text_view;

CREATE VIEW IF NOT EXISTS wdi_opencsv_text_view
AS
SELECT CAST(year AS INT), countryname, countrycode, indicatorname, indicatorcode, CAST(indicatorvalue AS FLOAT) 
FROM wdi_opencsv_text

In [32]:
DESCRIBE FORMATTED wdi_opencsv_text_view;

## 2015 Canada GDP Growth HQL

The query took 2 min 1 secs. The reason is due to the conditions on the date, having to check each line that has a name that is like what we are looking for, slowing down the process. This can be optimized using partitions, if we partition by date and country, we only have to scan those files with the partitions we are looking for instead 

In [34]:
SELECT indicatorvalue AS GDP_growth_value, year, countryname
FROM wdi_opencsv_text_view
WHERE year = 2015 
    AND countrycode = 'CAN' 
    AND indicatorname LIKE "GDP growth \(annual \%\)%";

## Hive Partitions
- Block 1: Creation of partitions table using OpenCSVSerDe
- Block 2: Setting modes and restrictions on the mode
- Block 3: Inserting data into the partitions table using dynamic partitions
- Block 4: Running query

In [36]:
DROP TABLE IF EXISTS wdi_opencsv_text_partitions;
CREATE TABLE wdi_opencsv_text_partitions
    (countryname STRING, countrycode STRING, indicatorname STRING, indicatorcode STRING, indicatorvalue FLOAT)
PARTITIONED BY (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION "hdfs:///user/ztbarlow66/hive/wdi/wdi_opencsv_text_partitions"

In [37]:
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.stats.column.autogather=false; 
SET hive.optimize.sort.dynamic.partition=true;
-- SET hive.auto.convert.join=false;

In [38]:
FROM wdi_opencsv_text wdt
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION(year)
SELECT wdt.countryname, wdt.countrycode, wdt.indicatorname, wdt.indicatorcode, wdt.indicatorvalue, wdt.year;


In [39]:
SELECT indicatorvalue AS GDP_growth_value, year, countryname
FROM wdi_opencsv_text_partitions
WHERE year = 2015 
    AND countrycode = 'CAN' 
    AND indicatorname LIKE "GDP growth \(annual \%\)%";

Without partitions : 2 min 1 secs
With partitions : 20 secs.

This performance improvement is entirely due to partitioning on the year which filters the amount of data we need to read in order to run the job. We could potentially partition by country too but it is not really necessary at the moment.

In [41]:
%sh
hdfs dfs -ls hdfs:///user/ztbarlow66/hive/wdi/wdi_opencsv_text_partitions | wc -l

## Columnar File Optimization

In [43]:
DROP TABLE IF EXISTS wdi_csv_parquet;
CREATE EXTERNAL TABLE wdi_csv_parquet
    (year INTEGER, countryName STRING, countryCode STRING, 
    indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION "hdfs:///user/ztbarlow66/hive/wdi/wdi_csv_parquet"

In [44]:
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT * FROM wdi_opencsv_gs;

In [45]:
%sh
hdfs dfs -du -h hdfs:///user/ztbarlow66/hive/wdi/

In [46]:
SELECT count(countryName) FROM wdi_csv_parquet;

In [47]:
SELECT count(countryName) FROM wdi_opencsv_text;

In [48]:
SELECT indicatorvalue AS GDP_growth_value, year, countryname
FROM wdi_csv_parquet
WHERE year = 2015 
    AND countrycode = 'CAN' 
    AND indicatorname LIKE "GDP growth \(annual \%\)%";

In [49]:
SELECT indicatorvalue AS GDP_growth_value, year, countryname
FROM wdi_opencsv_text
WHERE year = 2015 
    AND countrycode = 'CAN' 
    AND indicatorname LIKE "GDP growth \(annual \%\)%";

From both of the queries, we see that the parquet runs them both in 15 and 16 seconds whereas the text file is ran in 1min 24sec for both. If we look at the file sizes of the two, the parquet is size of 137.2 M and the text is of size 2.3 G which is a huge reason for the perfomance difference.

## Highest GDP Growth
- Block 1: Query to find max gdp growth for each country using Hive
- Block 2: Query to find max gdp growth for each country using Spark SQL

In [52]:
SELECT T1.indicatorvalue AS GDP_growth_value, T1.year, T1.countryname
FROM wdi_csv_parquet T1
INNER JOIN (
    SELECT MAX(indicatorvalue) AS GDP_growth_value, countryname
    FROM wdi_csv_parquet
    WHERE indicatorcode = 'NY.GDP.MKTP.KD.ZG'
    GROUP BY countryname
) as T2
ON T1.indicatorvalue = T2.GDP_growth_value AND T1.countryname = T2.countryname
ORDER BY T1.indicatorvalue DESC

In [53]:
%spark.sql
SELECT T1.indicatorvalue AS GDP_growth_value, T1.year, T1.countryname
FROM wdi_csv_parquet T1
INNER JOIN (
    SELECT MAX(indicatorvalue) AS GDP_growth_value, countryname
    FROM wdi_csv_parquet
    WHERE indicatorcode = 'NY.GDP.MKTP.KD.ZG'
    GROUP BY countryname
) as T2
ON T1.indicatorvalue = T2.GDP_growth_value AND T1.countryname = T2.countryname
ORDER BY T1.indicatorvalue DESC

## Sort GDP by country and year

In [55]:
SELECT countryname, year, indicatorcode, indicatorvalue as GDP_growth_value
FROM wdi_csv_parquet
WHERE indicatorcode = 'NY.GDP.MKTP.KD.ZG'
ORDER BY countryname DESC, year ASC