<!--- QUESTION 1: Load GS Data to HDFS -->
# QUESTION 1: Load GS Data to HDFS

## Step 1: Checking HDFS home directory
### Show hdfs home directories
`hdfs dfs -ls /user`
### Create a directory for working with HDFS if not exist
`hdfs dfs -mkdir /user/phuong`
    
## Create an empty external table in Hive

    DROP TABLE IF EXISTS wdi_csv_text;
    CREATE EXTERNAL TABLE wdi_csv_text
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
    LOCATION 'hdfs:///user/phuong/hive/wdi/wdi_csv_text';
Note:
- Row FORMAT ... : declare `,` as field delimiter and `\n` as line terminator
- LOCATION: the resource location
- Test if the table is created: `show tables;` and `describe formatted wdi_csv_text;`

## Step 2: Overwrite the empty table by the wdi_gs table
    INSERT OVERWRITE TABLE wdi_csv_text
    SELECT * FROM wdi_gs;

## Step 3: Check if files are copied to the HDFS directory
### Show files and their size
    hdfs dfs -ls -h /user/phuong/hive/wdi/wdi_csv_text;
### Show total disk usage of the directory
    hdfs dfs -du -s -h /user/phuong/hive/wdi/wdi_csv_text;

## Step 4: Execute a querry to check wdi_csv_text content

    SELECT count(countryName) FROM wdi_csv_text;
Note: Execute it at least 2 times to see the difference in performance
   
## Step 5: Clean filesystem cache
    echo 1 | sudo tee /proc/sys/vm/drop_caches
Note: tee cmd helps to write the input (echo 1) to both the standard output (terminal) and the file drop_caches (interface for clearing filesystem cache)

## Step 6: Execute the querry in step 4 again to see the difference in performance
At this point, you should see its performance is like the first time.

In [1]:
%hive
DROP TABLE IF EXISTS wdi_csv_text;
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/phuong/hive/wdi/wdi_csv_text';


In [2]:
%hive
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs;

<!--- QUESTION 2: Monitor Hadoop/Yarn job -->
# QUESTION 2: Monitor Hadoop/Yarn job

## Monitor Hadoop/Yarn job 
### Using Yarn Application Timeline in the Web Interfaces tab

###  Tez in the Web Interfaces tab


# QUESTION 3: Hive vs Bash
## Perform row count using Bash
    cd ~
    hdfs  dfs -get  hdfs:///user/phuong/hive/wdi/wdi_csv_text .
    cd wdi_csv_text
    du -ch .
    
    echo 3 | sudo tee /proc/sys/vm/drop_caches
    date +%s && cat * | wc && date +%s
    
-  move to home directory
-  copy wdi_csv_text dir to current dir
-  calculate current directory size. `-c`: grand total `-h`: humand readable format
-  clear filessystem cache
-   counting row. `date+%s`: current datetime in epoch seconds, `wc`: word count (return newline - word - byte counts)

## Perform row count using Hive
    SELECT count(countryName) FROM wdi_csv_text;
## Conclusion
- Bash is not using filesystem cache. Therefore the performance remains the same with or without filesystem cache
- Hive uses filesystem cache which allow to read the recently read files from memory instead of disk. Hence, improve the performance.

In [5]:
%sh
cd ~
hdfs  dfs -get  hdfs:///user/phuong/hive/wdi/wdi_csv_text .
cd wdi_csv_text
du -ch .

echo 3 | sudo tee /proc/sys/vm/drop_caches
date +%s && cat * | wc && date +%s


# QUESTION 4: Parsing Issue

## Step 1: Execute below statement
    SELECT distinct(indicatorcode)
    FROM wdi_csv_text
    ORDER BY indicatorcode
    LIMIT 20;
Note: the returned values are not indicatorCode
## Step 2: Execute another statement to check
    SELECT indicatorcode
    FROM wdi_csv_text
    ORDER BY indicator
    LIMIT 200;
## Step 3: Compare conclusion
There are wrong values in the indicatorcode column. `select distinct` will return only distinct values
## Step 4: Make a new external table with one column only
    DROP TABLE IF EXISTS wdi_gs_debug;
    CREATE EXTERNAL TABLE wdi_gs_debug 
    (line String)
    ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
    LOCATION 'gs://jarvis_dataeng_minh_phuong/datasets/wdi_2016';
Note: No fields delimiter decalred and the data source is from Google Storage
## Step 5: Check the content of debug table
    SELECT * FROM wdi_gs_debug 
    WHERE line like "%\(\% of urban population\)\"%" LIMIT 20;
Note:Now you can see the problem is that in the indicatorName field, there are `,` which will be misunderstood as field delimiters

## Step 6: Create external table `wdi_opencsv_gs` using OpenCsvSerDe from GS files
    DROP TABLE IF EXISTS wdi_opencsv_gs;
    CREATE EXTERNAL TABLE wdi_opencsv_gs
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    STORED AS TEXTFILE
    LOCATION 'gs://jarvis_dataeng_minh_phuong/datasets/wdi_2016';
### Default delimiter of OpenCsvSerDe are
    DEFAULT_ESCAPE_CHARACTER \
    DEFAULT_QUOTE_CHARACTER  "
    DEFAULT_SEPARATOR        ,

To change the default values, place this right below the `Row FORMAT SERDE` line:  

    WITH SERDEPROPERTIES (
       "separatorChar" = "\t",
       "quoteChar"     = "'",
       "escapeChar"    = "\\"
    )  
## Step 7: Create empty external table `wdi_opencsv_text` for later overwrite data from `wdi_opencsv_gs`
    DROP TABLE IF EXISTS wdi_opencsv_text;
    CREATE EXTERNAL TABLE wdi_opencsv_text
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    STORED AS TEXTFILE
    LOCATION 'hdfs:///user/phuong/hive/wdi/wdi_opencsv_text';
    
## Step 8: Overwrite data from `wdi_opencsv_gs` to `wdi_opencsv_text`
    INSERT OVERWRITE TABLE wdi_opencsv_text
    SELECT * FROM wdi_opencsv_gs;

## Step 9: Check the indicatorCode field again
    SELECT distinct(indicatorcode) FROM wdi_opencsv_text LIMIT 20;
There should be no more parsing issue now.

## Step 10: Compare execution time of `wdi_opencsv_gs` and `wdi_opencsv_text`
    SELECT distinct(indicatorcode) FROM wdi_opencsv_text;
    SELECT distinct(indicatorcode) FROM wdi_csv_text;
Note: 
- The `wdi_csv_text` should be much faster than `wdi_opencsv_text ` since it uses the default `LazySimpleSerDe` SerDe Library. `OpenCSVSerde` gives a more comprehensive Serializer-Deserializer. To see the SerDe Library, use `Describe formatted table_name`
- `OpenCSVSerde` will consider all fields are `String` regardless of the column data type. To cast the column to a desired data type, make a `view` from the table and cast the columns to the dedired type.


In [7]:
%hive
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;

In [8]:
%hive
DROP TABLE IF EXISTS wdi_gs_debug;
CREATE EXTERNAL TABLE wdi_gs_debug 
(line String)
ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_dataeng_minh_phuong/datasets/wdi_2016';


In [9]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_gs;
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'gs://jarvis_dataeng_minh_phuong/datasets/wdi_2016';

In [10]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text;
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'hdfs:///user/phuong/hive/wdi/wdi_opencsv_text';

In [11]:
%hive
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs;

In [12]:
%hive
SELECT distinct(indicatorcode) FROM wdi_opencsv_text LIMIT 20;

# QUESTION 5: OpenCSVSerde limitaion
## Limitation
`OpenCSVSerde` will consider all fields are `String` regardless of the column data type. To cast the column to a desired data type, make a `view` from the table and cast the columns to the dedired type.

## Show the diff between the two tables 
    DESCRIBE FORMATTED wdi_opencsv_text;
    DESCRIBE FORMATTED wdi_csv_text;
Note: check data type of the year and indicatorValue fields in the two tables
## Create a view from `wdi_opencsv_text` table for casting columns data type
    DROP VIEW IF EXISTS wdi_opencsv_text_view;
    CREATE VIEW wdi_opencsv_text_view AS 
    SELECT CAST(year AS INTEGER) , countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT)
    FROM wdi_opencsv_text;
## Check the view fields data types
    DESCRIBE FORMATTED wdi_opencsv_text_view;

In [14]:
%hive
DESCRIBE FORMATTED wdi_opencsv_text;
DESCRIBE FORMATTED wdi_csv_text;

In [15]:
%hive
DROP VIEW IF EXISTS wdi_opencsv_text_view;
CREATE VIEW wdi_opencsv_text_view AS 
SELECT CAST(year AS INTEGER) , countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT)
FROM wdi_opencsv_text;

In [16]:
%hive
DESCRIBE FORMATTED wdi_opencsv_text_view;

# QUESTION 6: 2015 Canada GDP Growth HQL
## Search for Canada indicator Name/Code
    SELECT countryname, indicatorname, indicatorCode 
    FROM wdi_opencsv_text 
    WHERE UPPER(countryName)= "CANADA" and UPPER(indicatorname) like "%GDP GROWTH%"
    GROUP BY countryname, indicatorname, indicatorcode;
## Search for Canada GDP Growth 2015
    SELECT indicatorvalue, countryname, year 
    FROM wdi_opencsv_text
    WHERE UPPER(countryname)="CANADA" AND indicatorcode="NY.GDP.MKTP.KD.ZG" AND year="2015";
## Discuss to improve the querries performance
- It reads data from external source over OpenCSVSerde makes it takes long to process querries
- Indexing/partition the table can improve querry performance

In [18]:
%hive
SELECT countryname, indicatorname, indicatorCode 
FROM wdi_opencsv_text 
WHERE UPPER(countryName)= "CANADA" and UPPER(indicatorname) like "%GDP GROWTH%"
GROUP BY countryname, indicatorname, indicatorcode;

In [19]:
%hive
SELECT indicatorvalue, countryname, year 
FROM wdi_opencsv_text
WHERE UPPER(countryname)="CANADA" AND indicatorcode="NY.GDP.MKTP.KD.ZG" AND year="2015";

# QUESTION 7: Hive Partitions
## Create `wdi_opencsv_text_partitions` table partitioned by `year`
    DROP TABLE IF EXISTS wdi_opencsv_text_partitions;
    
    CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
    (countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue String)
    PARTITIONED BY (year String)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    STORED AS TEXTFILE
    LOCATION 'hdfs:///user/phuong/hive/wdi/wdi_opencsv_text_partitions';
## Load data from `wdi_opencsv_text` to `wdi_opencsv_text_partitions` using dynamic partition
    SET hive.exec.dynamic.partition.mode=nonstrict;
    SET hive.stats.column.autogather=false;
    FROM wdi_opencsv_text
    INSERT OVERWRITE TABLE wdi_opencsv_text_partitions
    PARTITION(year)
    SELECT countryName, countryCode, indicatorName, indicatorCode, indicatorValue, year;
Notes:
- `Set hive.exec.max.dynamic.partitions.pernode=1000` to set max amount of partition can be created per mapper/reducer. It's 100 by default.
- `hive.exec.max.dynamic.partitions` : total amount of partitions can be created per querry
- `hive.exec.max.created.files`: total amount of files can be created by all mapper/reducer
- `SET hive.stats.column.autogather=false` to turn of autogather column stats
- Make sure the year (parition) column is at the end

## Inspect amount of partitions have been created in `wdi_opencsv_text_partitions` table
    hdfs dfs -count -h /user/phuong/hive/wdi/wdi_opencsv_text_partitions;
Note: above cmd return: count of directories, files, total_size and the container_name
## Re-run the Canada GDP Growth 2015 against the `wdi_opencsv_text_partitions` table
    SELECT indicatorvalue, countryname, year 
    FROM wdi_opencsv_text_partitions
    WHERE UPPER(countryname)="CANADA" AND indicatorcode="NY.GDP.MKTP.KD.ZG" AND year="2015";
Note: the performance must be a lot better compared to querry with the non-partitioned table

In [21]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text_partitions;
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED BY (year INTEGER)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/phuong/hive/wdi/wdi_opencsv_text_partitions';

In [22]:
%hive
SET hive.exec.dynamic.partition.mode=nonstrict;
set hive.stats.column.autogather=false;
FROM wdi_opencsv_text
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions
PARTITION(year)
SELECT countryName, countryCode, indicatorName, indicatorCode, indicatorValue, year;

In [23]:
%sh
hdfs dfs -count -h /user/phuong/hive/wdi/wdi_opencsv_text_partitions;

In [24]:
%hive
SELECT indicatorvalue, countryname, year 
FROM wdi_opencsv_text_partitions
WHERE UPPER(countryname)="CANADA" AND indicatorcode="NY.GDP.MKTP.KD.ZG" AND year="2015";

# QUESTION 8: Columnar File Optimization (optimize HQL using Columnar file)
## Create `wdi_csv_parquet` table with Parquet file
    DROP TABLE IF EXISTS wdi_csv_parquet;
    CREATE EXTERNAL TABLE wdi_csv_parquet
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
    STORED AS PARQUET
    LOCATION 'hdfs:///user/phuong/hive/wdi/wdi_csv_parquet';     
## Load data from `wdi_opencsv_gs` to the partquet table
    FROM wdi_opencsv_gs
    INSERT OVERWRITE TABLE wdi_csv_parquet
    SELECT *;
## Compare file size between `wdi_csv_parquet` and `wdi_opencsv_text`
    hdfs dfs -du -s -h /user/phuong/hive/wdi/wdi_opencsv_text;
    hdfs dfs -du -s -h /user/phuong/hive/wdi/wdi_csv_parquet;
Note: parquet files should be a lot smaller than text files
## Compare runtime
    SELECT count(countryName) FROM wdi_csv_parquet;
    SELECT count(countryName) FROM wdi_opencsv_text;
Notes:
- The first execution on both tables are not much different (parquet gave a slight faster)
- The second execution onward, parquet table give almost 4 times faster than the text table (26 seconds vs 99 seconds)
## Execute `2015 GDP Growth` HQL against `wdi_csv_parquet` and `wdi_opencsv_text` tables, and then compare performance.
    SELECT indicatorvalue, countryname, year 
    FROM wdi_csv_parquet
    WHERE UPPER(countryname)="CANADA" AND indicatorcode="NY.GDP.MKTP.KD.ZG" AND year="2015";
Notes:
- Same as the above comparison, it gave much faster performance sicne the second execution.

In [26]:
%hive
 DROP TABLE IF EXISTS wdi_csv_parquet;
CREATE EXTERNAL TABLE wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/phuong/hive/wdi/wdi_csv_parquet';  

In [27]:
%hive
FROM wdi_opencsv_gs
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT *;

In [28]:
%sh
hdfs dfs -du -s -h /user/phuong/hive/wdi/wdi_opencsv_text;
hdfs dfs -du -s -h /user/phuong/hive/wdi/wdi_csv_parquet;

In [29]:
%hive
SELECT count(countryName) FROM wdi_csv_parquet;

In [30]:
%hive
SELECT count(countryName) FROM wdi_opencsv_text;

In [31]:
%hive
SELECT indicatorvalue, countryname, year 
FROM wdi_csv_parquet
WHERE UPPER(countryname)="CANADA" AND indicatorcode="NY.GDP.MKTP.KD.ZG" AND year="2015";

# QUESTION 9: Find Highest GDP Growth using Hive SQL and Spark SQL
## Use Hive SQL
    SELECT t1.indicatorValue, t1.year, t1.countryName 
    FROM wdi_csv_parquet t1
    INNER JOIN (
        SELECT MAX(indicatorValue) AS maxValue, countryName
        FROM wdi_csv_parquet
        WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG" AND indicatorValue <> 0
        GROUP BY countryName
    ) t2
    ON t1.indicatorValue = t2.maxValue AND t1.countryName = t2.countryName;
Notes:
- Find the max GDP Growth of each country
- Join the result set with the current table on the maxValues and countryName for picking out the year.

## Use Spark SQL
    SELECT t1.indicatorValue, t1.year, t1.countryName 
    FROM wdi_csv_parquet t1
    INNER JOIN (
        SELECT MAX(indicatorValue) AS maxValue, countryName
        FROM wdi_csv_parquet
        WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG" AND indicatorValue <> 0
        GROUP BY countryName
    ) t2
    ON t1.indicatorValue = t2.maxValue AND t1.countryName = t2.countryName;


In [33]:
%hive
SELECT t1.indicatorValue, t1.year, t1.countryName 
FROM wdi_csv_parquet t1
INNER JOIN (
    SELECT MAX(indicatorValue) AS maxValue, countryName
    FROM wdi_csv_parquet
    WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG" AND indicatorValue <> 0
    GROUP BY countryName
) t2
ON t1.indicatorValue = t2.maxValue AND t1.countryName = t2.countryName;


In [34]:
%spark.sql
SELECT t1.indicatorValue, t1.year, t1.countryName 
FROM wdi_csv_parquet t1
INNER JOIN (
    SELECT MAX(indicatorValue) AS maxValue, countryName
    FROM wdi_csv_parquet
    WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG" AND indicatorValue <> 0
    GROUP BY countryName
) t2
ON t1.indicatorValue = t2.maxValue AND t1.countryName = t2.countryName;

# QUESTION 10: Sort GDP by Country and Year
## Sort by countryName and Year, return GDP Growth, CountryName, Year and indicatorCode
    SELECT countryName, Year, indicatorCode, indicatorValue
    FROM wdi_csv_parquet
    WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG"
    ORDER BY countryName, year;


In [36]:
%hive
SELECT countryName, Year, indicatorCode, indicatorValue
FROM wdi_csv_parquet
WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG"
ORDER BY countryName, year;

In [37]:
%hive
SELECT countryName, Year, indicatorCode, indicatorValue
FROM wdi_csv_parquet
WHERE indicatorCode = "NY.GDP.MKTP.KD.ZG"
SORT BY countryName, year;