* SQL like query tool to query data stored in HDFS and other filesystem that integrate with Hadoop.
* Deveoped by facebook.
* It processes structrued data that can be stored into tables
* Hive query is converted into MR program and that will be run on HDFS.
* Hive is NOT a database. It points to data in HDFS
* Hive is NOT tool for OLTP. Does not provide row level insert, update, delete.
* Not used when fast response time is needed. Used where high latency is acceptable with batch processing.
* Hive is built on write once and read many concept. SQL is built on write many and read many concept.
* Hive is good for batch processing, SQL for transaction processing.

### Architecture
#### Hive CLI
* UI, user directly interacts via this. Hive CLI, web interface and thrift server (JDBC/ODBC connection).
* Driver, Convert SQL query to MR program with help of compiler. Receives query from UI.
* Compiler: Convert Hive SQL query to MR program. Also create execution plan with help of metastore.
* Metastore is small db which has all structural info like, table, partition, number of columns and its data type, serializer, deseializer.
    - It is not stored on HDFS, as it requires to have random reads and update, HDFS is good for sequential read.
* Execution engine is connected with hadoop framework. Executes plan created by compile. Contacts name node and resource manager to fetch desired data

* We have to create database to store metadata of our table. It will NOT store actual data, actual data is in HDFS.
* It is called metastore.

```
create database d1;

or

create database if not exists d1;

or 

create database if not exists d1 comment "This is a database";

or

create database if not exists d1 with dbproperties('creator'='purvil','date'='2019-07-15')
```

```
create table if not exists table1 (col1 string, col2 array<string>, col3 string, col4 int) row format delimited fields terminated by ',' collection items terminated by ':' lines terminated by '\n' stored as textfile;
```

```
describe database d1;

describe database extended d1;
```

```
show databases; // show all created databases
```

```
use d1; //now onwards all table data will be stored in d1 
```

#### Create table with location
* By default hive store data in `hive.metastore.warehouse.dir` variable.
```
set hive.metastore.warehouse.dir=/user/hive/warehouse;
```
* To store in other location provide path explicitly
```
create table if not exists table1 (col1 string, col2 array<string>, col3 string, col4 int) row format delimited fields terminated by ',' collection items terminated by ':' lines terminated by '\n' stored as textfile location '/user/purvil/dir_name';
```

#### Link metadata to file
```
load data local inpath '/home/purvil/dir_name/data.txt' into table table1; // for data in local FS
load data inpath '/home/purvil/dir_name/data.txt' into table table1; // for data in local HDFS
```
* Here `into` means append the data. We can also use `overwrite` which 

### Internal vs External table
* For internal table hive is owner of table data and metadata. For external table, hive is only responsible for table's metadata. So if we drop internal table, all our data and metadata is gone. After dropping external, hive will not have linkage to data stored in HDFS.

```
create table if not exists table7 (col1 string, col2 string, col3 int) row format delimited fields terminated by ',', collection items terminated by ':' lines terminated by '\n' stored as textfile;
```
* Above is syntax to create internal table.
* We can drop internal table as
```
drop table table7
```
* To create external table
```
create external table if not exists table7 (col1 string, col2 string, col3 int) row format delimited fields terminated by ',', collection items terminated by ':' lines terminated by '\n' stored as textfile;
```

* `set hive.cli.print.header=true;` Which prints column name of table 

* Tables can have more or less columns than target file or table. But make sure to match column type, otherwise it will be filled with null.

### Insert
#### Loading data into table from another table

```
create table tab (col1 int, col2 string, col3 string) stored as textfile; // here we do not have to define delimiter as it is already specified for table from where we are loading data.

insert into table tab select id,fname,lname from table1; // append data to table tab
insert overwrite table tab select id,fname,lname from table1 where lname = 'dave'; // overwrite data to table tab
```

#### Multi Insert
```
from emp_tab insert into table developer_tab select col1,col2,col3 where col3 = 'Developer' insert into table manager_tab select col1, col2, col3 where col3='Mgr';
```

### Alter table
```
alter table tab add columns(col4 string, col5 int);
```
* It will be null as long as new file is not loaded.

```
alter table tab change col1 col1 int after col3; // move col1 after col3
alter table tab change column col2 new_col2 string; // rename col
alter table tab rename to tab1; // rename table
alter table tab1 replace columns (id int, name string); // replace all column names
```

### Table properties 
```
desc formatted tab1;  // list all metadata of table
alter table tab1 set tblproperties('prop'='val')
alter table tab1 set fileformat avro;
```

### ORDER BY
* Full sorting of data, so uses single reducer.
```
select col2 from table5 order by col2 limit 5;
```

### SORT BY
* sorting within reducer
```
select col2 from table5 sort by col2
```

### DISTRIBUTE BY
* Used to distribute key-value pairs among the reducer. Decided which key will go to which reducer. Guarantees that each reducer will get non overlapping values. It does not gurantees sorting.
```
select col2 from table5 distribute by col2
```

### CLUSTER BY
* Combination of distribute by sort by command.

![](images/map_reduce.PNG)

### Function

#### unix_timestamp
* Return number of seconds between specified time and epoch (1970-01-01 00:00:00)
```
select unix_timestamp('2017-04-26 00:00:00')
```

#### from_unixtime
* Give date from seconds
```
select from_unixtime(123456);
```

#### TO_DATE(string timestamp) 
* Returns date type of given string

#### YEAR(string date)
* Return YEAR part of the date

#### MONTH(string date)
* Return MONTH part of the date

#### HOUR, DAY, MINUTE, SECOND, WEEKOFYEAR, DATEDIFF(string date1, string date2), DATE_ADD(string date, int days), DATE_SUB(string date, int days)

#### ciel(9.5)
* It will return 10

#### floor(10.9)
* return 10

#### round(123.456, 2)
* return 123.46

#### rand()
* Generate random number

#### concat
```
select concat(col1, '-', col3) from table1;
```

#### length, lower, upper, lpad(col_nam2, 10, 'v'), rpad(col_nam2, 10, 'v'), ltrim(str), rtrim(str), repeat(col_name, 2), reverse(col_name)

#### split
```
select split('hive:hadoop', ':');
```
* Returns hive and hadoop

#### substring
```
select substr('hive is querying tool', 4); // starting from index 4, first index is 1.
select substr('hive is querying tool', 4, 3); // starting from index 4 total 3 characters, first index is 1. 
```

#### instr
```
select instr('hive is querying tool', 'e'); // Returns index of first occurence of e.
```

#### Condition

```
select if(col3='England', col1, col4) from table2; // if condition is met choose col1 else col4
select isnull(col1) from table1;
select isnotnull(col1) from table1;
select coalesce(col1,col2,col3) from table1; // select first non null value
select NVL(col1,col2) from table1; // if col1 is null then col2 is selected else col1.
```

#### Explode
* Takes array as input and outputs the elements of array as separate rows. 
```
select explode(col_name) as mycol from table1;
```
* Using it we can only select exploded column

#### lateral view
* Overcome limitation of explode, we can select any number of columns.
* Create virtual table of exploded column and join it with original table.
```
select col1, col_name from table2 lateral view explode(col3) dummy as col_name; // virtual view is stored in virtual table called dummy with col_name column
```
* It is used to take out key:value as column from map data type.
```
select key,value from table3 lateral view explode(map_col) dummy as key,value;
```

#### RLIKE
* Right like
* Any substring of A matches with B then returns true.
```
select 'hadoop' rlike 'ha'; // returns true
select 'hadoop' rlike 'ha*'; // returns true
select null rlike 'ha'; // returns null
select 'hadoop' rlike null; // returns null
```

#### RANK
* Gives ranking within ordered partition. Ties are assigned same rank, with next ranking skipped.

#### DENSE_RANK
* Gives ranking within ordered partition, rank are consecutive. No rank are skipped if there are rank with multiple items

#### ROW_NUMBER
* Assign unique number to each row within the partition given by order by clause.

![](images/rank.PNG)

### Partitioning and Bucketing
* Ways to organize table data in parts.
* Ex. Partition data on department basis. So if we condition on department, we do not have to search entire table.

#### Static Partitioning
```
create table table_name (deptno int, empname string, sal int) partitioned by (deptname string) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile;

// insert data from another table
insert into table table_name partition(deptname='part_name') select col1, col3, col4 from dept where col2='Accounts';

// insert data from file
load data local inpath 'path/to/file' into table table_name partition(deptname='part_name')
```

#### Dynamic Partitioning
* `set hive.exec.dynamic.partition=true;`
* `set hive.exec.dynamic.partition.mode = nonstrict;`
```
create table table_name (deptno int, empname string, sal int) partitioned by (deptname string) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile;

insert into table table_name partition(deptname) select col1, col3, col4, col2 from dept; // column on which partition is done should be at last.
```

```
show partitions table_name;
alter table table_name drop partition (deptname='HR');
alter table table_name add partition (deptname='DEV');
```

* Another way to create partition is create directory directly, but doing so will not update metadata.
* So, we have to run `msck repair table table_name;`

* In bucketing, all the same column values of a bucketed column will go to same bucket. Can be used along with partition.
* Partition data by department, then bucket data within partition by bucketing over salary.
* `set hive.enforce.bucketing = true;`
* `set hive.exec.dynamic.partition.mode = nonstrict;`
```
create table table_name (deptno int, empname string, sal int, location string) partition by(deptname string) clustered by (location) into 4 buckets row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile

insert into table_name partition(deptname) select col1, col3, col4, col5, col2 from dept;
```

* number of buckets = number of reduce task.
* Bucket is physically a file..
* We can set number of buckets during creation of table. Bucketed Map joins are fastest joins. Both joining table should be bucketed on same column as of joining column and both tables should have equal number of buckets.

### Table Sampling
* Sample is small portion of data.
* Limit can grab data from only 1 partition. To make representative sample we have to use table sampling.
```
select * from table_name tablesample (bucket 1 out of 2 on location);
```
* It divide total 4 buckets in group of 2 buckets and select 1 of that. To make smaller selection use 1 out of 3.

```
select * from table_name tablesample (2 percent); // Select 2 % of dataset.
select * from table_name tablesample (1M); // select 1 MB of data
select * from table_name tablesample (20 rows);
```

### no_drop
* Prevent dropping of table or partition
```
alter table table_name enable no_drop;
alter table table_name disable no_drop;
alter table table_name partition ('deptname'='HR') enable no_drop;
```

### offline
* Prevents data from being queried
```
alter table table_name enable offline;
```

### Join
#### Inner join
```
select * from table1 join table2 on (table1.id = table2.id)
```

#### Outer join
* Left, Right, Full
```
select * from table1 left outer join table2 on (table1.id = table2.id)
```

* By default last table in join is streamed and other are buffered in memory. We can explicitly specify which table should be stremed
```
select /*+ STREAMTABLE (table_name) */ ....
```
* How to do join on mapper side? (map join)
    - Mention in query itself `select /*+ MAPJOIN(table_name) */ ....`
    - `set hive.auto.convert.join=true` `set hive.mapjoin.smalltable.filesize=25000000`
* Full outer join can NOT be performed via this.

### VIEWS
* Virtual table created as a result of a hive query on a table.
* Does not contains any data on its own.
* All DML operations can be performed on views
* Can be created by selecting any number of columns of its base table.
* Views are read only.
* Once created, the schema of view is frozen and is independent of changes made to base table schema.
* Drop the table and we can not fire query on views.
```
create view view_name as select * from table_name;
```
* It can be used to hide underlying table from some users.
* Protect base table from dropped or altered.

### Indexing
* Acts as reference to record
* Used to speed up searching of data.
* Will search for only portion of data and not whole dataset.
* Partition done at hdfs, indexing done at table level.
```
create index index_name on table table_name(col_name) as 'COMPACT' with deferred rebuild;
```
* COMPACT and BITMAP. 
    - COMAPCT : Stores pair of index column value and block id
    - BITMAP : Stores pair of index column value and rows.
* When data gets changed we will have to rebuild index.
```
alter index index_name on table table_name(col_name);
```
```
show formatted index on table9;
```
* We can create multiple index on the same table.
* Use index when
    - Dataset is large
    - Speed is concerned
    - Frequent use of where clause in queirs
* Do not use
    - When dataset is unique
    - No frequent use of where clause.

### UDF (User defined function)
* Create Java program
* Save and convert the program to jar file
* Add jar file to Hive.
    - `add jar /path/to/jar;`
* Create function of the Jar file added.
    - `create temporary function f2 as 'com.hive1.ud2';`
* Use those functions in Hive query.

### Table Properties
* Passive properties just tag a table (name, date of creation)
* Active properties, helps in data processing.
```
create table table_name (col1 string, col2 int) row format delimited fields terminated ',' lines terminated by '\n' stored as textfile tblproperties("skip.header.line.count"="3");
```
* another property for footer is `"skip.footer.line.count"="3"`

* Immutable table
```
create table table3(col1 string, col2 int) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile tblproperties("immutable"="true")
```
* It allows to load data only 1 time. After that we can not change table3. Insert overwrite will work always.
* Drop vs truncate:
    - Drop remove metadata and data for a table for internal table. For external table linkage between HDFS and hive is removed, metadata is removed.
    - Truncate only remove data from table. Metadata or structure will not be removed.
```
drop table table_name;
truncate table table_name;
```
* Purge is table property. If it is set to true, during dropping internal table, data will NOT go to trash it will be gone permanently.
```
create table table_name (col1 string, col2 int) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile tblproperties("auto.purge"="true");
```
* Null format property
    - By default nothing is null between delimiter.
    - To declare null use `tblproperties("serialization.null.format"="")`

### ACID / Transactional properties
* Only ORC file format is supported for transactional feature.
* Table must be bucketed
* Begin, commit, rollback features are also not supported
* Reading/writing to a acid table is only allowed in a session where transactional properties are true. All operation are auto commit.
```
create table table_name (id int, emp_name string, dep string) clustered by (id) into 4 buckets stored as orc tblproperties("transactional"="true");
```
* Update of bucketing column is not supported.

### ORC file format properties
```
stored as orc tblproperties("orc.format"="zlib")
show tblproperties table_name;
```

* To run hive command from terminal
```
hive -e 'select * from table_name;'
```

* To run hive script from hive shell
    - save hive script with .hql extension
```
source /path/to/file.hql;
```
* To run hive script from terminal
    - `hive -f /path/to/script.hql`

### Variables in Hive
* `set deptno=40;`
* Check value `set deptno;`
* `set hiveconf:d1=20;`
```
select * from table_name where col6=${hiveconf:deptno};
```
* hivevar variable:
    - sets value globally, hiveconf sets value locally.
```
set hivevar:deptnumber=10;
```
* hiveconf is local, hivevar is global.

* Setting variable in terminal
```
hive --hiveconf var_name=20 -e 'select * from tablename where col6=${hiveconf:var_name}'
```

### Compression
* Small size, occupy less space, less network bandwidth.
* Compressing input file
* Compressing map output
* Compressing reducer output.

### Hiverc
* Executed when we launch the hive session.
* Setting column header to make visible
* Making current database name part of the hive prompt
* Adding any jars or files
* Registering UDFs.
* Put it in hive conf directory.

### Cartesian product
```
select * from table2, table3;
```

### Archiving
* Packing files to less used storage
* Old data is archived mostly
* Stored as HAR file
* Decreases load on name node.

### TEZ
* Distributed execution framework to run data processing application on hadoop.
* Built on hadoop yarn to execute DAG of general data processing task.
* Advanced successor of MapReduce.
* Tez is better than MR.
* Tez represents data processing as dataflow graph.
* Vertices as application logic and edges as movement of data.
* Enables Yarn framework to allocate resource intelligently.
* Multiple reduce task
    - MR will create one MR job per reduce task, more reads and write of data.
    - TEZ link all reduce task, data can be passed directly to one part of reducer to other.
* In memory vs disk write
    - In MR data is shuffled across nodes regardless of data size.
    - Tez allows small datasets to be handled fully in memory.

### XML data
* We have to install appropriate serde. 

### Word count
* my file is like word, word, word, word.
```
create table word_count(line string) stored as textfile;
load data local inpath '/path/to/file' into table word_count;
select word, count(1) as count from (select explode(split(line, ',')) as word from word_count) w group by word; 
```

* Use power of 2 number of bucket for optimized join

* Advantage of columnar storage like RCfile, ORC (optimized row columnar), Parquet.
     - Different compression per different type of column.
     - skip whole column when not needed.
     - ORC file has index info, like max and min value in column stripe, so we can skip entire that strip. Also contains count and sum info.