 
# Quention 1: Load GS Data to HDFS

## Create External Table `wdi_csv_text`
```hive
DROP TABLE IF EXISTS wdi_csv_text;
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/tedor18/hive/wdi/wdi_csv_text';
```

## Load Data from `wdi_gs` to `wdi_csv_text`
```hive
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs
```

## Check HDFS File Size
```cmd
hdfs dfs -ls -h /user/ewang/hive/wdi/wdi_csv_text
```

## Execute Query
```hive
SELECT count(countryName) FROM wdi_csv_text
```
- Run twice consecutively
- Intial run: 13.40 s
- Second run: 4.12 s

## Clear File System Cache
```cmd
echo 3 | sudo tee /proc/sys/vm/drop_caches
```
- Clear cache in both the master and worker nodes
- Execute query again
- Third run: 13.45 s


In [1]:
DROP TABLE IF EXISTS wdi_csv_text;
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/tedor18/hive/wdi/wdi_csv_text';

In [2]:
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs

In [3]:
SELECT count(countryName) FROM wdi_csv_text

 
# Questions 2: Monitor Hadoop/Yarn Job
- __Yarn Application Timer__: List of application executed through yarn currently and historically. Holds generic information such as individual containers used from the worker nodes 
- __Tez__: View Tez applications generated by Hive. Access each individual queries and view their respective mapper and reducer usage. 

 
# Question 3: Hive Vs. Bash
### Perform Row Count Using Bash
```sh
cd ~
hdfs  dfs -get  hdfs:///user/tedor18/hive/wdi/wdi_csv_text .
cd wdi_csv_text

#calculate current directory size
du -ch .

#clear fs cache
echo 3 | sudo tee /proc/sys/vm/drop_caches
#bash row count
date +%s && cat * | wc && date +%s
```
- __Execution Time__: 31.0 s

### Performance: Hive vs Bash
- Hive is able to compute much faster than bash, because it leverages the file system caching which allows it to read from memory instead of disk.


In [6]:
%sh 
cd ~
hdfs  dfs -get  hdfs:///user/tedor18/hive/wdi/wdi_csv_text .
cd wdi_csv_text

#calculate current directory size
du -ch .

#clear fs cache
echo 3 | sudo tee /proc/sys/vm/drop_caches
#bash row count
date +%s && cat * | wc && date +%s


 
# Question 4: Parsing Issue
## Execute Following Query 
```hive 
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;
```
- Parsing issue: contains invalid entires in the `inidicatorCode` column

## Debug Table 
### Step 1: Create Debug Table 
```hive
DROP TABLE IF EXISTS wdi_gs_debug;
CREATE EXTERNAL TABLE wdi_gs_debug
(line STRING)
ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_dorjee/datasets/wdi_2016';
```
- Removed field delimiter 

### Step 2: Query Debug Table
```hive 
SELECT * FROM wdi_gs_debug
WHERE line like "%\(\% of urban population\)\"%"
LIMIT 20
```
- The parsing issue is noticable in the `indicatorName` column where there are `,` which is set as the delimiter for different values.

## OpenCSV SerDe
### Step 1: Create New External Table `wdi_opencsv_gs` in GS
```hive
DROP TABLE IF EXISTS wdi_opencsv_gs;
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_dorjee/datasets/wdi_2016';
```

### Step 2: Create New External Table `wdi_opencsv_text` in HDFS
```hive 
DROP TABLE IF EXISTS wdi_opencsv_text;
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/tedor18/hive/wdi/wdi_opencsv_text';
```

### Step 3: Export and Overwrite data from GS to HDFS
```hive 
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs
```

### Step 4: Query `wdi_opencsv_text`
```hive 
SELECT distinct(indicatorcode) FROM wdi_opencsv_text
LIMIT 20
```

### Step 5: Comparing Execution Time `LazySimpleSerDe` vs `OpenCSV SerDe`
```hive 
SELECT count(countryName) FROM wdi_opencsv_text;
SELECT count(countryName) FROM wdi_csv_text;
```

- `wdi_csv_text` is faster than `wdi_opencsv_text` because it uses `LazySimpleSerde` which is faster for dataset that's appropiately formatted. 


In [8]:
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;


In [9]:
DROP TABLE IF EXISTS wdi_gs_debug;
CREATE EXTERNAL TABLE wdi_gs_debug
(line STRING)
ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_dorjee/datasets/wdi_2016';

In [10]:
SELECT * FROM wdi_gs_debug
WHERE line like "%\(\% of urban population\)\"%"
LIMIT 20

In [11]:
DROP TABLE IF EXISTS wdi_opencsv_gs;
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_dorjee/datasets/wdi_2016';

In [12]:
DROP TABLE IF EXISTS wdi_opencsv_text;
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/tedor18/hive/wdi/wdi_opencsv_text';

In [13]:
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

In [14]:
SELECT distinct(indicatorcode) FROM wdi_opencsv_text
LIMIT 20

In [15]:
SELECT count(countryName) FROM wdi_opencsv_text;
SELECT count(countryName) FROM wdi_csv_text;

 
# Question 5: OpenCsvSerde Limitaion
## Table Metadata 
```hive
DESCRIBE FORMATTED wdi_opencsv_text;
DESCRIBE FORMATTED wdi_csv_text;
```
- OpenCSVSerde will treat every field as String by default, as a result the `year` and `indicatorValue` are incorrectly typed

## Create a View
```hive
DROP VIEW IF EXISTS wdi_opencsv_text_view;
CREATE VIEW wdi_opencsv_text_view AS 
SELECT CAST(year AS INTEGER), countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT)
FROM wdi_opencsv_text;
```
## Metadata
```hive
DESCRIBE FORMATTED wdi_opencsv_text_view;
```

In [17]:
DESCRIBE FORMATTED wdi_opencsv_text;
DESCRIBE FORMATTED wdi_csv_text;

In [18]:
DROP VIEW IF EXISTS wdi_opencsv_text_view;
CREATE VIEW wdi_opencsv_text_view AS 
SELECT CAST(year AS INTEGER), countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT)
FROM wdi_opencsv_text;

In [19]:
DESCRIBE FORMATTED wdi_opencsv_text_view;

 
# Question 6: 2015 Canada GDP Growth HQL
## GDP Growth for Canada in 2015
```hive 
SELECT year, countryName, indicatorValue
FROM wdi_opencsv_text
WHERE UPPER(countryName)="CANADA" AND indicatorCode="NY.GDP.MKTP.KD.ZG" AND YEAR="2015";
```

## Improvements
- Could index/partition the table to improve the query performance 


In [21]:
SELECT year, countryName, indicatorValue
FROM wdi_opencsv_text
WHERE UPPER(countryname)="CANADA" AND indicatorcode="NY.GDP.MKTP.KD.ZG" AND YEAR="2015";


 
# Question 7: Hive Partitions
Use paritions to optimize query performance 
## Create Partition Table
```hive
DROP TABLE IF EXISTS wdi_opencsv_text_partitions;
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED BY (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/tedor18/hive/wdi/wdi_opencsv_text_partitions';
```

## Export and Overwite Data from `wdi_opencsv_text` to `wdi_opencsv_text_partitions`
```hive
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions.pernode=100;
FROM wdi_opencsv_text
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions 
PARTITION(year)
SELECT countryName, countryCode, indicatorName, indicatorcode, indicatorValue, year;
```

## Number of Partitions (sub-directories)
```sh
hdfs dfs -count -h /user/tedor18/hive/wdi/wdi_opencsv_text_partitions 
```

## GDP Growth for Canada in 2015 (paritioned ver.)
```hive
SELECT year, countryName, indicatorValue
FROM wdi_opencsv_text_partitions
WHERE UPPER(countryName)="CANADA" AND indicatorCode="NY.GDP.MKTP.KD.ZG" AND YEAR="2015";
```
- Significantly faster query


In [23]:
DROP TABLE IF EXISTS wdi_opencsv_text_partitions;
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED BY (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/tedor18/hive/wdi/wdi_opencsv_text_partitions';

In [24]:
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions.pernode=100;
FROM wdi_opencsv_text
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions 
PARTITION(year)
SELECT countryName, countryCode, indicatorName, indicatorCode, indicatorValue, year;

In [25]:
%sh
hdfs dfs -count -h /user/tedor18/hive/wdi/wdi_opencsv_text_partitions 

In [26]:
SELECT year, countryName, indicatorValue
FROM wdi_opencsv_text_partitions
WHERE UPPER(countryName)="CANADA" AND indicatorCode="NY.GDP.MKTP.KD.ZG" AND YEAR="2015";

 
# Question 8: Columnar File Optimization

## Create Parquet File 
```hive 
DROP TABLE IF EXISTS wdi_csv_parquet;
CREATE EXTERNAL TABLE wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/tedor18/hive/wdi/wdi_csv_parquet';
```
## Load Data 
```hive
FROM wdi_opencsv_gs
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT *;
```

## Compare TextFile and Parquet
### File Size 
```sh
hdfs dfs -du -s -h /user/tedor18/hive/wdi/wdi_opencsv_text;
hdfs dfs -du -s -h /user/tedor18/hive/wdi/wdi_csv_parquet;
```
- parquet is significantly smaller 

### Runtime 
```hive
SELECT count(countryName) FROM wdi_opencsv_text;
SELECT count(countryName) FROM wdi_csv_parquet;
```
- Initial run are about same, however in subsequent runs, parquet is significantly faster

### 2015 GDP Growth 
```hive
SELECT year, countryName, indicatorValue
FROM wdi_opencsv_text
WHERE UPPER(countryName)="CANADA" AND indicatorCode="NY.GDP.MKTP.KD.ZG" AND YEAR="2015";
```
```hive
SELECT year, countryName, indicatorValue
FROM wdi_csv_parquet
WHERE UPPER(countryName)="CANADA" AND indicatorCode="NY.GDP.MKTP.KD.ZG" AND YEAR="2015";
```

In [28]:
DROP TABLE IF EXISTS wdi_csv_parquet;
CREATE EXTERNAL TABLE wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/tedor18/hive/wdi/wdi_csv_parquet';


In [29]:
FROM wdi_opencsv_gs
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT *;


In [30]:
%sh
hdfs dfs -du -s -h /user/tedor18/hive/wdi/wdi_opencsv_text;
hdfs dfs -du -s -h /user/tedor18/hive/wdi/wdi_csv_parquet;

In [31]:
SELECT year, countryName, indicatorValue
FROM wdi_csv_parquet
WHERE UPPER(countryName)="CANADA" AND indicatorCode="NY.GDP.MKTP.KD.ZG" AND YEAR="2015";

# Question 9: Highest GDP Growth For Each Country

```hive
SELECT a.indicatorValue as GDP_growth_value, a.year, a.countryName
FROM wdi_csv_parquet a 
INNER JOIN
(SELECT MAX(indicatorValue) as indicatorValue, countryName
FROM wdi_csv_parquet
WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG"
AND indicatorValue > 0.0
GROUP BY countryName) b
ON  a.indicatorValue = b.indicatorValue
AND a.countryName = b.countryName
```
- Spark was much faster than hive



In [33]:
SELECT a.indicatorValue as GDP_growth_value, a.year, a.countryName
FROM wdi_csv_parquet a 
INNER JOIN
(SELECT MAX(indicatorValue) as indicatorValue, countryName
FROM wdi_csv_parquet
WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG"
AND indicatorValue > 0.0
GROUP BY countryName) b
ON  a.indicatorValue = b.indicatorValue
AND a.countryName = b.countryName

 
# Question 10: Sort GDP by Country and Year 
```hive
SELECT countryName, year indicatorValue AS GDP_Growth 
FROM wdi_csv_parquet 
WHERE indicatorCode="NY.GDP.MKTP.KD.ZG"
ORDER BY countryName, year;
```

In [35]:
SELECT countryName, year indicatorValue AS GDP_Growth 
FROM wdi_csv_parquet 
WHERE indicatorCode="NY.GDP.MKTP.KD.ZG"
ORDER BY countryName, year;