## Creating a table from the cloud storage data

In [1]:
DROP TABLE IF EXISTS wdi_gs;

CREATE EXTERNAL TABLE IF NOT EXISTS wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_alexkirilenko/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");

## Creating a table on the hdfs and transferring data to it

In [3]:
DROP TABLE IF EXISTS wdi_csv_text;

CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/akirilenko200/hive/wdi/wdi_csv_text';

INSERT OVERWRITE TABLE wdi_csv_text SELECT * FROM wdi_gs;

## Exploring the data with a simple query

In [5]:
SELECT count(countryName) FROM wdi_csv_text;

## Rerunning the previous query
* The result is faster since the data has been cached on the DataNode filesystems

In [7]:
SELECT count(countryName) FROM wdi_csv_text;



## Running the query using Bash utilities
ssh to the master node to run the following code:
```Bash
    cd ~
    hdfs  dfs -get  hdfs:///user/akirilenko200/hive/wdi/wdi_csv_text .
    cd wdi_csv_text
    #calculate current directory size
    du -ch .
    #1.8G	total
    #clear fs cache
    echo 3 | sudo tee /proc/sys/vm/drop_caches
    #bash row count
    date +%s && cat * | wc && date +%s
```
Time taken ~20 seconds.

The query runs slower than the Hive equivalent, since it is not parallelized.

## Selecting distinct codes

In [10]:
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 20;

## Because the data is not formatter properly, a table with raw rows is created for debugging
* As can be seen, indicator name appears in quotes and contains commas and it it parsed incorrectly

In [12]:
CREATE EXTERNAL TABLE IF NOT EXISTS wdi_csv_debug
(line STRING)
LOCATION 'gs://jarvis_data_eng_alexkirilenko/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");

SELECT line FROM wdi_csv_debug WHERE line like '%\(\% of urban population\)\"%';

 
## Creating a table with custom SerDe (Serializtion Deserialization)
* First, a table is created with all string columns (as required by the OpenCSV)
* Then, a new table is created with appropriate data types

In [14]:
DROP TABLE IF EXISTS wdi_opencsv_gs;
CREATE EXTERNAL TABLE wdi_opencsv_gs 
(year STRING, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue STRING)
ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION 'gs://jarvis_data_eng_alexkirilenko/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1");


In [15]:
DROP TABLE IF EXISTS wdi_opencsv_text;
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
LOCATION 'hdfs:///user/akirilenko200/hive/wdi/wdi_opencsv_text';

INSERT OVERWRITE TABLE wdi_opencsv_text SELECT * FROM wdi_opencsv_gs;

SELECT * FROM wdi_opencsv_text LIMIT 15;

## Comparing execution times from original and formatted tables
Here, the queries on the formatted table take longer to execute 

In [17]:
SELECT count(countryName) FROM wdi_opencsv_text;

SELECT count(countryName) FROM wdi_csv_text;

## 2015 Canada GDP Growth HQL

In [19]:
SELECT indicatorValue as GDP_growth_value, year, countryName FROM wdi_opencsv_text
WHERE year = 2015 AND countryName = 'Canada' AND indicatorName like '%GDP growth \(annual \%\)%';

## Hive Partitions
* To speed up the previous query, partitions can be created on data by year, limiting the search space
* This way, more than 2 time speed up is achieved in the 2015 Canada GDP query

In [21]:
set hive.exec.dynamic.partition.mode=nonstrict;

DROP TABLE IF EXISTS wdi_opencsv_text_partitions;
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED BY (year INTEGER)
LOCATION 'hdfs:///user/akirilenko200/hive/wdi/wdi_opencsv_text_partitions';

-- partitioned column is at the end
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION(year) SELECT 
origin.countryName, origin.countryCode, origin.indicatorName, origin.indicatorCode, origin.indicatorValue, origin.year as year FROM wdi_opencsv_text origin;

In [22]:
SELECT indicatorValue as GDP_growth_value, year, countryName FROM wdi_opencsv_text_partitions
WHERE year = 2015 AND countryName = 'Canada' AND indicatorName like '%GDP growth \(annual \%\)%';

## Columnar file optimization
* Data is stored in Parquet format
* Query times are compared, with Parquet queries running faster than uncached queries of standard form
* The disk usage of Parquet is significantly less as well (by an order of magnitude)

In [24]:
DROP TABLE IF EXISTS wdi_opencsv_parquet;
CREATE EXTERNAL TABLE wdi_opencsv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
STORED AS PARQUET
LOCATION 'hdfs:///user/akirilenko200/hive/wdi/wdi_opencsv_parquet';

INSERT OVERWRITE TABLE wdi_opencsv_parquet SELECT * FROM wdi_opencsv_gs;

In [25]:
SELECT count(countryName) FROM wdi_opencsv_parquet;

SELECT count(countryName) FROM wdi_opencsv_text;

## Highest GDP growth by country

In [27]:
SELECT max(t.indicatorValue) GDP_growth, t.countryName
FROM wdi_opencsv_text_partitions t
WHERE t.indicatorCode = 'NY.GDP.MKTP.KD.ZG'
GROUP BY t.countryName;

In [28]:
%spark.sql
-- Using spark engine
SELECT max(t.indicatorValue) GDP_growth, t.countryName
FROM wdi_opencsv_text t
WHERE t.indicatorCode = 'NY.GDP.MKTP.KD.ZG'
GROUP BY t.countryName;

In [29]:
SELECT indicatorValue, year, countryName
FROM wdi_opencsv_text_partitions
WHERE indicatorCode = 'NY.GDP.MKTP.KD.ZG'
DISTRIBUTE BY countryName
SORT BY countryName, year;