## Question 1: Load GS Data to HDFS
#### Create new table location:
    DROP TABLE IF EXISTS wdi_csv_text;
    CREATE EXTERNAL TABLE wdi_csv_text
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
    LOCATION 'hdfs:///user/mitchellacton/hive/wdi/wdi_csv_text';

#### Export data from google storage (GS) to HDFS:
    INSERT OVERWRITE TABLE wdi_csv_text
    SELECT * FROM wdi_gs;

#### Count the rows:
    SELECT count(countryName) FROM wdi_csv_text;
This query initially took 13.66 seconds to complete query. When run a second time, it took 4.77 seconds due to the data being stored in the file system's cache. After clearing the cache using the bash script `echo 3 | sudo tee /proc/sys/vm/drop_caches`, the query took 14.37 seconds.


In [1]:
%hive
DROP TABLE IF EXISTS wdi_csv_text;
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/mitchellacton/hive/wdi/wdi_csv_text';

In [2]:
%hive
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs;

In [3]:
%hive
SELECT count(countryName) FROM wdi_csv_text;

## Question 2: Monitor Hadoop/Yarn Job
By viewing the Yarn Application Timeline, jobs performed by the cluster can be viewed. Details include the query ID, exit status, and which node executed the job. By using the TEZ UI, queries can be viewed graphically, and this is where execution times can be analyzed.

## Question 3: Hive vs Bash
#### Using bash to import and count the number of rows in the table
    cd ~
    hdfs  dfs -get  hdfs:///user/mitchellacton/hive/wdi/wdi_csv_text .
    cd wdi_csv_text
    echo 3 | sudo tee /proc/sys/vm/drop_caches
    date +%s && cat * | wc && date +%s

This Query took 29 seconds, compared to 14 seconds with hive

In [6]:
%sh
cd ~
hdfs  dfs -get  hdfs:///user/mitchellacton/hive/wdi/wdi_csv_text .
cd wdi_csv_text
echo 3 | sudo tee /proc/sys/vm/drop_caches
date +%s && cat * | wc && date +%s

## Question 4: Parsing Issue
#### What is the problem?
    SELECT distinct(indicatorcode)
    FROM wdi_csv_text
    ORDER BY indicatorcode
    LIMIT 20;
It seems like there are some incorrect values in the indicatorcode column. To further investigate, I will make a debug table below containing one column with all values for each row, with no comma delimiting.

#### Debug Table
    DROP TABLE IF EXISTS wdi_gs_debug;
    CREATE EXTERNAL TABLE wdi_gs_debug
    (line String)
    ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
    LOCATION 'gs://jarvis_data_eng_mitchellacton/datasets/wdi_2016';
#### Querying the Debug Table
    SELECT * FROM wdi_gs_debug
    WHERE line like "%\(\% of urban population\)%"
    LIMIT 20;
Now we can see that some entries in the fourth column, indicatorName, contain commas which are being parsed incorrectly as the end of the field, and the second half of the indicatorName is being assigned to the indicatorCode field. I will make a new table using different parsing to correct this.
#### Fixing the issue in GS
    DROP TABLE IF EXISTS wdi_opencsv_gs;
    CREATE EXTERNAL TABLE wdi_opencsv_gs
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    STORED AS TEXTFILE
    LOCATION 'gs://jarvis_data_eng_mitchellacton/datasets/wdi_2016';
#### Fixing the issue in HDFS
    DROP TABLE IF EXISTS wdi_opencsv_text;
    CREATE EXTERNAL TABLE wdi_opencsv_text
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    STORED AS TEXTFILE
    LOCATION 'hdfs:///user/mitchellacton/hive/wdi/wdi_opencsv_text';
#### Checking the new table
    SELECT distinct(indicatorcode) FROM wdi_opencsv_text
    LIMIT 20;
The query shows that the indicator codes are now displayed correctly, however there is a significant increase in execution time from 15.00 seconds to 81.55 seconds when querying wdi_opencsv_text using the below queries. This is mostly due to the more complex parsing going on behind the scenes. Instead of using the LazySimpleSerDe parser, the OpenCSVSerDe parser is used.
#### Testing execution times
    SELECT count(countryName) FROM wdi_csv_text;
    SELECT count(countryName) FROM wdi_opencsv_text;






In [8]:
%hive
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;


### What's the Problem?
It seems like there are some incorrect values in the indicatorcode column. To further investigate, I will make a debug table below containing one column containing all values for each row, with no comma delimiting.

In [10]:
%hive
DROP TABLE IF EXISTS wdi_gs_debug;
CREATE EXTERNAL TABLE wdi_gs_debug
(line String)
ROW FORMAT DELIMITED LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_mitchellacton/datasets/wdi_2016';




In [11]:
%hive
SELECT * FROM wdi_gs_debug
WHERE line like "%\(\% of urban population\)%"
LIMIT 20;

In [12]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_gs;
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'gs://jarvis_data_eng_mitchellacton/datasets/wdi_2016';

In [13]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text;
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'hdfs:///user/mitchellacton/hive/wdi/wdi_opencsv_text';

In [14]:
%hive
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs;

In [15]:
%hive
SELECT distinct(indicatorcode) FROM wdi_opencsv_text
LIMIT 20;

In [16]:
%hive
SELECT count(countryName) FROM wdi_opencsv_text;


In [17]:
%hive
SELECT count(countryName) FROM wdi_csv_text

## Question 5: OpenCSVSerDe Limitation
#### Table metadata using different parsers
    DESCRIBE FORMATTED wdi_csv_text;
    DESCRIBE FORMATTED wdi_opencsv_text;
All data types in the `wdi_opencsv_text` table are strings, where in the `wdi_csv_text` table the year is `int` type and the indicatorvalue is `float` type. I will cast these fields to the correct type, and check the data types again to confirm.

#### Casting column types
    DROP VIEW IF EXISTS wdi_opencsv_text_view;
    CREATE VIEW wdi_opencsv_text_view AS
    SELECT CAST(year AS INTEGER) , countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT)
    FROM wdi_opencsv_text;
    DESCRIBE FORMATTED wdi_opencsv_text_view;
This has resolved the issue, and the `year` and `indicatorvalue` columns are now listed as `int` and `float` types respectively


In [19]:
%hive
DESCRIBE FORMATTED wdi_csv_text;

In [20]:
%hive
DESCRIBE FORMATTED wdi_opencsv_text;

In [21]:
%hive
DROP VIEW IF EXISTS wdi_opencsv_text_view;
CREATE VIEW wdi_opencsv_text_view AS
SELECT CAST(year AS INTEGER) , countryName, countryCode, indicatorName, indicatorCode, CAST(indicatorValue AS FLOAT)
FROM wdi_opencsv_text;
DESCRIBE FORMATTED wdi_opencsv_text_view;



## Question 6: 2015 Canada GDP Growth HQL
In this section I will write queries to find the annual Canadian GDP growth for 2015.

#### Find GDP growth `indicatorCode`
    SELECT countryname, indicatorname, indicatorcode
    FROM wdi_opencsv_text
    WHERE UPPER(countryName)= "CANADA" and UPPER(indicatorname) LIKE "%GDP%"
    LIMIT 200;
The `indicatorCode` corresponding to GDP Growth is "NY.GDP.MKTP.KD.ZG"
#### Find Canada's GDP growth for 2015
    SELECT countryname, indicatorname, year, indicatorvalue
    FROM wdi_opencsv_text
    WHERE UPPER(countryname)="CANADA" and indicatorcode="NY.GDP.MKTP.KD.ZG" and year="2015";
In 2015, Canada's GDP grew by 1.078%, but this query took 85.66 seconds to get that information, because all 21 million rows need to be searched O(n). To reduce the execution time, the table could be partitioned (e.g. into smaller tables that  contain data for each year) or made faster via indexing using a BST. This would reduce the search time to O(logn), or log(21million), roughly 24 operations (almost a million times faster).



In [23]:
%hive
SELECT countryname, indicatorname, indicatorcode
FROM wdi_opencsv_text
WHERE UPPER(countryName)= "CANADA" and UPPER(indicatorname) LIKE "%GDP%"
LIMIT 200;

In [24]:
%hive
SELECT countryname, indicatorname, year, indicatorvalue
FROM wdi_opencsv_text
WHERE UPPER(countryname)="CANADA" and indicatorcode="NY.GDP.MKTP.KD.ZG" and year="2015";

## Question 7: Hive Partitions
In this section I will be optimizing the query speed by partitioning the table
#### Create partitioned table
    DROP TABLE IF EXISTS wdi_opencsv_text_partitions;
    CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
    (countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue STRING)
    PARTITIONED BY (year STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    STORED AS TEXTFILE
    LOCATION 'hdfs:///user/mitchellacton/hive/wdi/wdi_opencsv_text_partitions';
#### Populate partitioned table
    SET hive.exec.dynamic.partition.mode=nonstrict;
    Set hive.exec.max.dynamic.partitions.pernode=100;
    FROM wdi_opencsv_text
    INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION(year)
    SELECT countryname, countrycode, indicatorname, indicatorcode, indicatorvalue, year;
The `Set hive.exec.max.dynamic.partitions.pernode=100` command sets the maximum number of partitions created by hive for this node. The default value is 100, but this can be increased if needed. 100 is more than enough for this case.
#### Determine the number of partitions
    hdfs dfs -count -h /user/mitchellacton/hive/wdi/wdi_opencsv_text_partitions
By counting the number of subdirectories, this bash command revealed that 59 partitions were created
#### Find Canada's GDP for 2015 in partitioned table
    SELECT countryname, indicatorname, year, indicatorvalue
    FROM wdi_opencsv_text_partitions
    WHERE UPPER(countryname)="CANADA" and indicatorcode="NY.GDP.MKTP.KD.ZG" and year="2015";
This time, the query took only 26 seconds instead of 85


In [26]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text_partitions;
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue STRING)
PARTITIONED BY (year STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'hdfs:///user/mitchellacton/hive/wdi/wdi_opencsv_text_partitions';



In [27]:
%hive
SET hive.exec.dynamic.partition.mode=nonstrict;
Set hive.exec.max.dynamic.partitions.pernode=100;
FROM wdi_opencsv_text
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION(year)
SELECT countryname, countrycode, indicatorname, indicatorcode, indicatorvalue, year;



In [28]:
%hive
SELECT countryname, indicatorname, year, indicatorvalue
FROM wdi_opencsv_text_partitions
WHERE UPPER(countryname)="CANADA" and indicatorcode="NY.GDP.MKTP.KD.ZG" and year="2015";

## Question 8: Columnar File Optimization
#### Store the data in Parquet format
    DROP TABLE IF EXISTS wdi_csv_parquet;
    CREATE EXTERNAL TABLE wdi_csv_parquet
    (year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
    STORED AS PARQUET
    LOCATION 'hdfs:///user/mitchellacton/hive/wdi/wdi_csv_parquet';
    
    INSERT OVERWRITE TABLE wdi_csv_parquet
    SELECT * FROM wdi_opencsv_gs;
#### Compare file sizes of Parquet and Text tables
    hdfs dfs -du -s -h /user/mitchellacton/hive/wdi/wdi_opencsv_text
    hdfs dfs -du -s -h /user/mitchellacton/hive/wdi/wdi_csv_parquet
`wdi_opencsv_text`: 2.3 GB
`wdi_csv_parquet`: 137.2 MB
#### Compare runtime when counting rows
    SELECT count(countryname) FROM wdi_opencsv_text;
    SELECT count(countryname) FROM wdi_csv_parquet;
Running the query took 81 seconds on the text table, and only 6 seconds on the parquet table.
#### Compare runtime for the GDP query
    SELECT countryname, indicatorname, year, indicatorvalue
    FROM wdi_csv_parquet
    WHERE UPPER(countryname)="CANADA" and indicatorcode="NY.GDP.MKTP.KD.ZG" and year="2015";
This query took 34 seconds on the parquet table, compared to 86 seconds on the text table

    

In [30]:
%hive
DROP TABLE IF EXISTS wdi_csv_parquet;
CREATE EXTERNAL TABLE wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/mitchellacton/hive/wdi/wdi_csv_parquet';

In [31]:
%hive
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT * FROM wdi_opencsv_gs;

In [32]:
%hive
SELECT count(countryname) FROM wdi_opencsv_text;
SELECT count(countryname) FROM wdi_csv_parquet;


In [33]:
%hive
SELECT countryname, indicatorname, year, indicatorvalue
FROM wdi_csv_parquet
WHERE UPPER(countryname)="CANADA" and indicatorcode="NY.GDP.MKTP.KD.ZG" and year="2015";

## Question 9: Highest GDP Growth
#### Display the highest GDP growth year for each country
    SELECT table1.countryname, table1.year, table1.indicatorvalue
    FROM wdi_csv_parquet table1
    INNER JOIN (
    SELECT countryname, MAX(indicatorvalue) AS maxGDP
    FROM wdi_csv_parquet
    WHERE indicatorcode="NY.GDP.MKTP.KD.ZG" AND indicatorvalue <> 0
    GROUP BY countryname
    ) table2
    ON table1.indicatorvalue=table2.maxGDP AND table1.countryname = table2.countryname
    ORDER BY table1.countryname;
By joining the table's countryname, year, and indicatorvalue columns with the maximum indicatorvalue for GDP, the query will show the highest GDP growth year for each country. The same query was run using tez and spark, tez took 98 seconds while spark only took 53 seconds.


In [35]:
%hive
SELECT table1.countryname, table1.year, table1.indicatorvalue
FROM wdi_csv_parquet table1
INNER JOIN (
SELECT countryname, MAX(indicatorvalue) AS maxGDP
FROM wdi_csv_parquet
WHERE indicatorcode="NY.GDP.MKTP.KD.ZG" AND indicatorvalue <> 0
GROUP BY countryname
) table2
ON table1.indicatorvalue=table2.maxGDP AND table1.countryname = table2.countryname
ORDER BY table1.countryname;





In [36]:
%spark.sql
SELECT table1.countryname, table1.year, table1.indicatorvalue
FROM wdi_csv_parquet table1
INNER JOIN (
SELECT countryname, MAX(indicatorvalue) AS maxGDP
FROM wdi_csv_parquet
WHERE indicatorcode="NY.GDP.MKTP.KD.ZG" AND indicatorvalue <> 0
GROUP BY countryname
) table2
ON table1.indicatorvalue=table2.maxGDP AND table1.countryname = table2.countryname
ORDER BY table1.countryname;


## Question 10: Sort GDP by country and year
    SELECT countryname, year, indicatorvalue AS GDP_Growth
    FROM wdi_csv_parquet
    WHERE indicatorcode="NY.GDP.MKTP.KD.ZG"
    ORDER BY countryname, year;

In [38]:
%hive
SELECT countryname, year, indicatorvalue AS GDP_Growth
FROM wdi_csv_parquet
WHERE indicatorcode="NY.GDP.MKTP.KD.ZG"
ORDER BY countryname, year;
