# PART 1
1. create 'wdi_gs' table to parse data from 'wdi2016'
2. create 'wdi_csv_text' external table and insert data from 'wdi_gs' into it
3. notice the cache in worker and master nodes
4. compare the bash approach and hive approach to scan whole table

- create  a table called 'wdi_gs'
- data is stored in 'gs://jarvis_data_eng_abhinay/datasets/wdi_2016'

In [2]:
%hive
DROP TABLE IF EXISTS wdi_gs

In [3]:
%hive
CREATE EXTERNAL TABLE wdi_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'gs://jarvis_data_eng_abhinay/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

- data information shown below

In [5]:
%hive
DESCRIBE FORMATTED wdi_gs

scan whole table using query `select count(*) from wdi_gs`

In [7]:
%hive
select count(*) from wdi_gs

 
- drop table if exists
- create a external table named as `wdi_gs_text`
- `hdfs` location 'hdfs:///user/hive/wdi/wdi_csv_text'
- `comma` delimited format
- after creating the table, insert data from `wdi_gs` into `wdi_csv_text`
- run query to check if all data is inserted sucessfully

In [9]:
%hive
DROP TABLE IF EXISTS wdi_csv_text

In [10]:
%hive
CREATE EXTERNAL TABLE wdi_csv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
LOCATION 'hdfs:///user/hive/wdi/wdi_csv_text'

In [11]:
%hive
INSERT OVERWRITE TABLE wdi_csv_text
SELECT * FROM wdi_gs

- we have inserted data into database and now we want to check if files are stored in hdfs
- to check that we use `hdfs dfs` command
- after that we execute SELECT count(countryName) FROM wdi_csv_text twice and we find that 2nd query run faster

In [13]:
%sh
hdfs dfs -ls -h hdfs:///user/hive/wdi/wdi_csv_text
hdfs dfs -du -s -h hdfs:///user/hive/wdi/wdi_csv_text

In [14]:
%hive
SELECT count(countryName) FROM wdi_csv_text

In [15]:
%hive
SELECT count(countryName) FROM wdi_csv_text

In [16]:
%sh
cd ~
hdfs dfs -get hdfs:///user/hive/wdi/wdi_csv_text .
cd wdi_csv_text
#calculate current directory size
du -ch .
#1.8G total

#clear fs cache
echo 3 | sudo tee /proc/sys/vm/drop_caches
#bash row count
date +%s && cat * | wc && date +%s

 
# PART 2
- find the bug that the `indicatorCode` col does not display properly
- the bud happens beacuse of the parsing issue, to solve it, we chose to use a different parsing way.
- drop table if exists
- create a new table called `wdi_csv_text` using `OpenCSVSerde`
- create `wdi_opencsv_gs` into `wdi_opencsv_text`
- verify the data


In [18]:
%hive
SELECT distinct(indicatorcode)
FROM wdi_csv_text
OEDER BY indicatorcode
LIMIT 10

In [19]:
%hive
SELECT distinct(indicatorcode)
FROM wdi_csv_text
ORDER BY indicatorcode
LIMIT 10

In [20]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_gs

In [21]:
%hive
CREATE EXTERNAL TABLE wdi_opencsv_gs
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'gs://jarvis_data_eng_abhinay/datasets/wdi_2016'
TBLPROPERTIES ("skip.header.line.count"="1")

In [22]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text

In [23]:
%hive
CREATE EXTERNAL TABLE wdi_opencsv_text
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/hive/wdi/wdi_opencsv_text'

In [24]:
%hive
INSERT OVERWRITE TABLE wdi_opencsv_text
SELECT * FROM wdi_opencsv_gs

In [25]:
%hive
SELECT distinct(indicatorcode)
FROM wdi_opencsv_text
ORDER BY indicatorcode
LIMIT 10

- now we run `SELECT count(countryName)` query to wdi_openscv_text and wdi_csv_text
- we find that wdi_opencsv_text is pretty slow
- it is because that SerDe do extra work to parse csv, and it is much slower that conventional delimiter defination

In [27]:
%hive
SELECT count(countryName) FROM wdi_opencsv_text

In [28]:
%hive
SELECT count(countryName) FROM wdi_csv_text

- we find that `indicatorCode` becomes string in wdi_opencsv_text
- it is because that SerDe treats all columns to be of type String. Even if you create a table with non-string column tpes using this Serde, the DESCRIBE TABLE output would show column type. The type information is receive from the SerDe
- to solve such problem, we can use View table

In [30]:
%hive
DROP VIEW IF EXISTS wdi_opencsv_text_view

In [31]:
%hive
CREATE VIEW IF NOT EXISTS wdi_opencsv_text_view
as
SELECT year, countryName, countryCode, indicatorName, indicatorCode, Cast(indicatorValue AS FLOAT) AS inidcatorValue
FROM wdi_csv_text

# PART 3
write queries to solve business problem

Write a HiveQL to find 2015 `GDP growth (annual %)` for Canada

Output columns: `GDP_growth_value, year, countryName`

In [33]:
%hive
SELECT inidcatorValue as GDP_growth_value, year, countryName
FROM wdi_opencsv_text_view
WHERE indicatorname like "GDP growth (annual %)" and year=2015 and countryName="Canada"

- the query is slow. it costs 26 seconds to fetch the result, the main reason is thta data is not pre-sorted and tere is no duplex to help us quickly fetch result by keywork
- to solve it, we use partition

In [35]:
%hive
DROP TABLE IF EXISTS wdi_opencsv_text_partitions

In [36]:
%hive
CREATE EXTERNAL TABLE wdi_opencsv_text_partitions
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue FLOAT)
PARTITIONED by(y STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
LOCATION 'hdfs:///user/hive/wdi/wdi_opencsv_text'

In [37]:
%hive
SET hive.exec.dynamic.partition.mode=nonstrict

In [38]:
%hive
SET hive.exec.dynamic.partition=true

In [39]:
%hive
INSERT OVERWRITE TABLE wdi_opencsv_text_partitions PARTITION(y)
SELECT *, year as y FROM wdi_opencsv_text

- reduce the file size
- we use PARQUET to store data

In [41]:
%hive
DROP TABLE IF EXISTS wdi_csv_parquet

In [42]:
%hive
CREATE EXTERNAL TABLE wdi_csv_parquet
(year INTEGER, countryName STRING, countryCode STRING, indicatorName STRING, indicatorCode STRING, indicatorValue STRING)
STORED AS PARQUET
LOCATION 'hdfs:///user/hive/wdi/wdi_csv_parquet'

check all data is inserted


In [44]:
%hive
INSERT OVERWRITE TABLE wdi_csv_parquet
SELECT * FROM wdi_opencsv_gs

In [45]:
%hive
SELECT count(countryName) from wdi_csv_parquet

In [46]:
%hive
SELECT count(countryName) FROM wdi_csv_text

In [47]:
%sh
hdfs dfs -du -s -h /user/hive/wdi/wdi_csv_parquet

In [48]:
%sh
hdfs dfs -du -s -h /user/hive/wdi/wdi_csv_text

You can see that using parquet can significantly reduce the file size

Execute 2015 GDP growth HQL against wdi_csv_parquet and wdi_opencsv_text tables, and then compare performance even though, the file size is reduced, the running time in not increased

In [51]:
%hive
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_csv_parquet
WHERE indicatorname like "GDP growth (annual %)" and year = 2015 and countryName="Canada"

In [52]:
%hive
SELECT indicatorValue AS GDP_growth_value, year, countryName
FROM wdi_csv_text
WHERE indicatorname like "GDP growth (annual %)" and year = 2015 and countryName="Canada"

- find the highest GDP growth (NY.GDP.MKTP.KD.ZG) year for each country
- Use spark interpreter to run same query 
- Spark.sql run query faster than that in hive tez

In [54]:
%hive
SELECT v.countryName, v.inidcatorValue, min(v.year)
FROM (SELECT countryName, max(inidcatorValue) as indicatorValue
FROM wdi_opencsv_text_view
WHERE indicatorname like "GDP growth (annual %)"
GROUP BY countryName
ORDER BY countryName) AS t, wdi_opencsv_text_view v
WHERE v.countryName=t.countryName AND v.inidcatorValue=t.indicatorValue
GROUP BY v.countryName, v.inidcatorValue
ORDER BY v.countryName

In [55]:
%spark.sql
SELECT v.countryName, v.inidcatorValue, min(v.year)
FROM (SELECT countryName, max(inidcatorValue) as indicatorValue
FROM wdi_opencsv_text_view
WHERE indicatorname like "GDP growth (annual %)"
GROUP BY countryName
ORDER BY countryName) AS t, wdi_opencsv_text_view v
WHERE v.countryName=t.countryName AND v.inidcatorValue=t.indicatorValue
GROUP BY v.countryName, v.inidcatorValue
ORDER BY v.countryName


write a query that returns GDP growth of all countries sorted by countryName and year


In [57]:
%hive
SELECT countryName, year, inidcatorValue
FROM wdi_opencsv_text_view
where indicatorName like "GDP growth (annual %)"
ORDER BY countryName ASC, year ASC