# <font color=blue><center>Retail Sales Analysis</center></font>
## Agenda

### Why?
Walmart is an American retail corporation that operates a chain of hypermarkets, discount department stores, and grocery stores. In this usecase, we will answer the following question:

- Which store has minimum and maximum sales?
- Which store has maximum standard deviation?
- Which store/s has good quarterly growth rate in Q3’2012?
- Find out holidays which have higher sales than the mean sales in non-holiday season for all stores together.

### Architecture
- Overview of data flow
- Tech Stack

### Environment Setup
- AWS EC2 instance and security group creation
- Docker installation and running
- Usage of docker-composer and starting all the tools
- How to access tools in local machine

### Deep dive - HDFS
- Introduction (Overview & Why is it needed?)
- Terminology
- Architecture
- File processing
- Various file formats
- Frequently used commands

### Deep dive - Sqoop
- Introduction
- Architecture
- Import
- Export
- Job
- Useful tools/commands

### Deep dive - Hive
- Introduction
- Basic Commands
- Functions
- Partitioning
- Bucketing
- Joins and Views
- Various file formats
- Custom input formatter
- SCD implementation

### MySQL Set up
- Download Dataset
- Table creation

### Extraction
- Import SQL data using Sqoop
- Job creation

### Transformation and Load
- Table creation
- Queries to answer questions

### Project Overview

## <font color=blue>Architecture</font>
### Overview of data flow
#### Data Flow Architecture
![alt text](images/Retail_Analysis_Architecture.png)
### Tech Stack
* AWS EC2
* Docker
* Jupyter Lab
* MySQL
* Sqoop
* HDFS
* Hive

## <font color=blue>Environment Setup</font>
### AWS EC2 instance and security group creation
- t2.xlarge instance
- 32GB of storage recommended
- Allow ports 4000 - 38888
- Connect to ec2 via ssh
 <code>ssh -i "D:\path\to\private\key.pem" user@Public_DNS</code>
 <br/>Example:<code>ssh -i "D:\Users\pyerravelly\Desktop\twitter_analysis.pem" ec2-user@ec2-54-203-235-65.us-west-2.compute.amazonaws.com</code><br/>
- Port forwarding 
 <code>ssh -i "D:\path\to\private\key.pem" user@Public_DNS</code>
 <br/>Example:<code>ssh -i "D:\Users\pyerravelly\Desktop\twitter_analysis.pem" ec2-user@ec2-34-208-254-29.us-west-2.compute.amazonaws.com -L 2081:localhost:2041 -L 4888:localhost:4888 -L 2080:localhost:2080 -L 8050:localhost:8050 -L 4141:localhost:4141</code><br/>
- Copy from local to ec2
  <code>scp -r -i "D:\Users\pyerravelly\Desktop\twitter_analysis.pem"</code>
  <br/>Example:<code>scp -r -i "D:\Users\pyerravelly\Desktop\twitter_analysis.pem" D:\Users\pyerravelly\Downloads\spark-standalone-cluster-on-docker-master\build\docker\docker-exp ec2-user@ec2-34-208-254-29.us-west-2.compute.amazonaws.com:/home/ec2-user/docker_exp
</code>

### Docker installation and running
    
### Usage of docker-composer and starting all the tools

- Commands to install Docker

<code>sudo yum update -y</code>
<code><br/>sudo yum install docker</code>
<code><br/>sudo curl -L "https://github.com/docker/compose/releases/download/1.29.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose</code>
<code><br/>sudo chmod +x /usr/local/bin/docker-compose</code>
<code><br/>sudo gpasswd -a $USER docker</code>
<code><br/>newgrp docker</code>
<br/>Start Docker: <code>sudo systemctl start docker</code>
<br/>Stop Docker: <code>sudo systemctl stop docker</code>

- How to access tools in local machine <br/>
    List Docker containers running: <code>docker ps</code><br/>
    CLI access in Docker container: <code>docker exec -i -t kafka bash</code><br/>
    NiFi at: http://localhost:2080/nifi/ <br/>
    Jupyter Lab at: http://localhost:4888/lab? <br/>
    HDFS at: http://localhost:50070/
    Dash Application at: http://localhost:8050/ (this will be available when executed log_visualizer.ipynb)

## <font color=blue> Deep dive - HDFS</font>

### Introduction (Overview & Why is it needed?)
#### Function of File System
#### Various File Systems
#### How HDFS is different?
![alt text](images/hdfs_ov.PNG)

###  Terminology
- Cluster
- NameNode
- DataNode
- Mapper
- Reducer
- JobTracker
- TaskTracker
- Block
- Secondary NameNode
- Safemode

### Key Points
- Only one Data Node
- Node are computers
- Number of Blocks depends on File size
- Number of blocks = mappers
- Number of Reducers = output

###  Architecture
![alt text](images/hdfs_arhitecture.png)

###  File processing
![alt text](images/file_processing_steps.png)

###  Various file formats
- Text/CSV Files
- JSON Records
- Avro Files
- Sequence Files
- RC Files
- ORC Files
- Parquet Files

###  Frequently used commands

<code>docker exec -it ra_namenode bash</code></br>
<code>hdfs dfs -ls /user/hive/warehouse/raw_sales/part-m-00000</code></br>
<code>hdfs dfs -cat /user/hive/warehouse/raw_sales/part-m-00000</code></br>
<code>hdfs dfs -cat /user/hive/warehouse/raw_sales/part-m-00000|head -10</code></br>
<code>hdfs dfs -touch /demo</code></br>
<code>hdfs dfs -chmod 755 /demo</code></br>
<code>hdfs dfs -ls /demo</code></br>
<code>echo "example">local_demo</code></br>
<code>hdfs dfs -copyFromLocal local_demo /local_demo</code></br>
<code>hdfs dfs -copyToLocal /local_demo local_demo2</code></br>
<code>hdfs dfs -count /local_demo</code></br>
<code>hdfs dfs -cp -f /local_demo /local_demo4</code></br>
<code>hdfs dfs -df -h</code></br>
<code>hdfs dfs -du -h /user</code></br>
<code>hdfs dfs -help</code></br>
<code>hdfs dfs -mkdir /user/tmp</code></br>
<code>hdfs dfs -rmdir /user/tmp</code></br>

## <font color=blue>Deep dive - Sqoop</font>

### Introduction
- Usage
- Features

### Architecture
- Basic Architecture

![alt text](images/sqoop_basic_architecture.png)

- Architecture with Task

![alt text](images/Sqoop_Architecture.png)

#### Login to Sqoop using <code>docker exec -i -t ra_sqoop bash</code>

### Import
- Basic
<code>
sqoop import \
  --connect jdbc:mysql://mysql:3306/demo \
  --username root \
  --password example \
  --table employee \
  --m 1
 </code>
- Target directory
<code>
    sqoop import \
 --connect jdbc:mysql://mysql:3306/demo \
 --username root \
 --password example \
 --table employee \
 --m 1 \
 --target-dir /empdetails
</code>
- Based on a condition
<code>
sqoop import \
 --connect jdbc:mysql://mysql:3306/demo \
 --username root \
 --password example \
 --table employee \
 --m 1 \
 --where "id =4" \
 --target-dir /empdetails1
</code>
- Incremental
<code>
    sqoop import \
 --connect jdbc:mysql://mysql:3306/demo \
 --username root \
 --password example \
 --table employee \
 --m 1 \
 --incremental append \
 --check-column id \
 --last-value 10
</code>
- All tables at once
<code>
    sqoop import-all-tables \
 --connect jdbc:mysql://mysql:3306/demo \
 --username root \
 --password example \
 --autoreset-to-one-mapper
</code>


### Export
- Command
<code>
 sqoop export \
 --connect jdbc:mysql://mysql:3306/demo \
 --username root \
 --password example \
 --table employee_exp \
 --export-dir employee
</code>

### Job
<code>docker cp java-json-schema.jar ra_sqoop:/opt/sqoop-1.4.7.bin__hadoop-2.6.0/lib/java-json-schema.jar</code>
- Command
<code>
 sqoop job –-create firstjob \
 -- import \
 --connect jdbc:mysql://mysql:3306/demo \
 --username root \
 --password example \
 --table employee \
 --m 1 \
 --target-dir /empdetails_job
</code>
- Delete
<code>
    sqoop job --delete firstjob
</code>
- List
<code>
    sqoop job --list
</code>
- Execute
<code>
    sqoop job --exec myjob
</code>

### Useful tools/commands
- Get List DBs
<code>
 sqoop list-databases \
 --connect jdbc:mysql://mysql:3306/demo \
 --username root
</code>

- Get List Tables
<code>
    sqoop list-tables \
 --connect jdbc:mysql://mysql:3306/demo \
 --username root
</code>

- SQL Queries
<code>
    sqoop eval \
 --connect jdbc:mysql://mysql:3306/demo \
 --username root \
 --query "SELECT * FROM employee LIMIT 3"
</code>

## <font color=blue>Deep dive - Hive</font>

### Introduction
- SQL like querying tool
- Structured data
- Batch processing
- Layer between MR and HDFS
- Various file formats
- NOT a database
- More of OLAP but not suitable for OLTP
- Suport for UPSERT in latest versions

### Architecture
![alt text](images/hive_architecture.png)

### Basic Commands
<code> docker cp table1.txt ra_hive-server:table1.txt <br/> set hive.cli.print.header=true;<br/>set hive.resultset.use.unique.column.names=false;</code>

- Database creation

<code> create database if not exists db [comment 'Demo database'] [with dbproperties('creator'='Pavan Kumar Yerravelly')];</br></code>
<code> describe database [extended] db; </br></code>
<code> show databases; </br></code>
<code> use db; </br></code>

- Table creation
  - Managed Table
  <code> create table if not exists table1(col1 int,col2 array&lt;string&gt;,col3 string,col4 int)row format delimited fields terminated by',' collection items terminated by':' lines terminated by'\n' stored as textfile location '/data/table1';<br/>load data local inpath '/table1.txt' into table table1;<br/></code>
  - External Table
  <code>create external table if not exists table2(col1 int,col2 array&lt;string&gt;,col3 string,col4 int)row format delimited fields terminated by',' collection items terminated by':' lines terminated by'\n' stored as textfile location'/data/table2';<br/>hdfs dfs -mkdir /data/table2/<br/>hdfs dfs -copyFromLocal /table1.txt /data/table2/</code>

- Insert
  - Basic
<code> create table table3 like table1;<br/>insert into table3 select * from table1;<br/>insert overwrite table3 select * from table1;<br/>create table table3 as select * from table1;</code>
  - Multi
<code>create table below like table1;<br/>create table above like table1;<br/> from table2 insert into below select *  where col1&lt;500 insert into above select * where col1&gt;=500; </code> 

- Alter Table
<code><br/> alter table below add columns (col5 int,col6 string);<br/> alter table below change column col1 id int;<br/>desc formatted below;<br/>alter table below set tblproperties('auto.perge'='true')</code>
      
### Functions
- Date <code> https://cwiki.apache.org/confluence/display/hive/languagemanual+udf#LanguageManualUDF-DateFunctions </code>
- Mathematical <code> https://cwiki.apache.org/confluence/display/hive/languagemanual+udf#LanguageManualUDF-MathematicalFunctions </code>
- String <code> https://cwiki.apache.org/confluence/display/hive/languagemanual+udf#LanguageManualUDF-StringFunctions </code>
- Conditional <code> select *,if(col1=499, 'foo','bar') as derived from table1; </code>
<code> select case when col1=499 then 'foo' when col1=500 then 'bar' else col1 end as derived from table1; </code>
- Explode and Lateral View
  - Data<br/>
      <code>select explode(col2) from table1;<br/>select col1, exp_col from table1 lateral view explode(col2) exp as exp_col;</code>
- Window<br/>
<code>create table if not exists table4(col1 string,col2 int) row format delimited fields terminated by',' lines terminated by'\n'stored as textfile;<br/><br/>insert into table4 values('John',1500),('Albert',1500),('Mark',1000),('Frank',1150),('Loopa',1100),('Lui',1300),('John',1300),('John',900),('Lesa',1500),('Lesa',900),('Pars',800),('leo',700),('leo',1500),('lock',650),('Bhut',800),('Lio',500);<br/><br/>select col1,col2,rank() over(order by col2 desc) as ranking from table4;<br/><br/>select r1.col1,r1.col2,r1.ranking from (select col1,col2,rank() over(order by col2 desc) as ranking from table4) as r1 where r1.ranking&lt2;<br/><br/>select col1,col2,dense_rank() over(order by col2 desc) as ranking from table4;<br/><br/>select r1.col1,r1.col2,r1.ranking from (select col1,col2,dense_rank() over(order by col2 desc) as ranking from table4) as r1 where r1.ranking&lt2;<br/><br/>select col1,col2,row_number() over(order by col2 desc) as ranking from table4;<br/><br/>select r1.col1,r1.col2,r1.ranking from (select col1,col2,row_number() over(order by col2 desc) as ranking from table4) as r1 where r1.ranking &le;2;<br/><br/>select r1.col1,r1.col2,r1.ranking from (select col1,col2,row_number() over(partition by col1 order by col2 desc) as ranking from table4) as r1 where r1.ranking &le;2;</code>
      
### Partitioning
<code>docker cp departments.txt ra_hive-server:departments.txt<br/>create table if not exists department (col1 int,col2 string,col3 string,col4 int) row format delimited fields terminated by',' lines terminated by'\n'stored as textfile;<br/>load data local inpath'/departments.txt'into table department;</code>

- Static
<code>create table if not exists part_department (departmentno int,empname string,sal int) partitioned by (departmentname string) row format delimited fields terminated by',' lines terminated by'\n'stored as textfile;<br/>insert into table part_department partition (departmentname = 'HR') select col1,col3,col4 from department where col2 = 'HR';<br/>load data local inpath'/departments.txt'into table part_department partition( departmentname ='XZ');</code>

- Dynamic
<code>create table if not exists part_department1 (departmentno int,empname string,sal int) partitioned by (departmentname string) row format delimited fields terminated by',' lines terminated by'\n'stored as textfile;<br/>set hive.exec.dynamic.partition=true;<br/>set hive.exec.dynamic.partition.mode=nonstrict;<br/>insert into table part_department1 partition (departmentname) select col1,col3,col4,col2 from department; </code>

### Bucketing
<code>docker cp dept_location.txt ra_hive-server:dept_location.txt<br/>create table if not exists department_with_location (departmentno int,departmentname string,empname string,sal int,location string) row format delimited fields terminated by',' lines terminated by'\n'stored as textfile;<br/>load data local inpath'/dept_location.txt'into table department_with_location;<br/>set hive.enforce.bucketing=true;<br/>create table if not exists clustered_department (departmentno int,empname string,sal int,location string) partitioned by (departmentname string) clustered by(location) into 4 buckets  row format delimited fields terminated by',' lines terminated by'\n'stored as textfile;<br/>insert into clustered_department partition (departmentname) select departmentno,empname,sal,location,departmentname from department_with_location;</code>
### Joins and Views
- Joins
<code><br/>select tab1.col1,tab2.col2,tab3.col4 from tab1 join tab2 on (tab1.col4=tab2.col5) join tab3 on (tab2.col6==tab3.col1)<br/>select /*+ STREAMTABLE (tab1) */ tab1.col1,tab2.col2 from tab1 join tab2 on (tab1.col4=tab2.col5) join tab3 on (tab2.col6==tab3.col1);</code>
 - Map Joins
<code>Hint: select /*+ MAPJOIN (tab1) */ tab1.col1,tab2.col2 from tab1 join tab2 on (tab1.col4=tab2.col5);<br/>config: set hive.auto.convert.join=true;<br/>set hive.mapjoin.smalltable.filesize;</code>

- Views
<code><br/>create view emp_view1 as select * from emp_tab;<br/>create view emp_view2 as select col1,col2 from emp_tab;<br/>create view if not exists emp_view3 as select col1 as id,col2 as name from emp_tab;<br/>create view emp_view4 as select emp_tab.col1,emp_tab.col2,dept_tab.col3 from emp_tab join dept_tab on (emp_tab.col6 = dept_tab.col1);<br/>alter view emp_view1 as select col1 from emp_tab;<br/>alter view emp_view1 rename to emp_view_new;<br/>drop view emp_view2;<br/></code>

### Various file formats
- TSV/CSV:
 - Read/Write: Reads slow
 - Compression: No bblock compression
 - Split: Only on new line
 - Schema Evolution: Limited
- Sequence:
 - Read/Write: faster writes that text files
 - Compression: Block level
 - Split: Yes
 - Schema Evolution: Same as text files
- Avro:
 - Read/Write: Average
 - Compression: Block level
 - Split: Yes
 - Schema Evolution: Yes
- RC:
 - Read/Write: Faster reads
 - Compression: High
 - Split: Yes
 - Schema Evolution: No
- ORC:
 - Read/Write: Faster
 - Compression: Yes
 - Split: Yes, at stripe level
 - Schema Evolution: No
- Parquet:
 - Read/Write: faster reads
 - Compression: Yes, with Sanppy
 - Split: Limited
 - Schema Evolution: Limited
 
- Which format to choose?
 - Schema evolution - Avro
 - Dumping data from Filesystem - Text
 - Reading - ORC/Parquret
 - Intermediate Files - Sequence

### SCD Type-1 implementation
<code>
CREATE TABLE merge_src
(
ID INT,
FirstName VARCHAR(100),
LastName VARCHAR(100)
);<br/>INSERT INTO merge_src VALUES (1, 'aaaa', 'bbbb'),(2, 'cccc', 'dddd'),(3, 'eeee', 'ffff'),(4, 'gggg', 'hhhh'),(5, 'iiii', 'jjjj');<br/>create table merge_tgt
(
id int,
firstname varchar(100),
lastname varchar(100)
)
;<br/>
drop table merge_staging;<br/>create temporary table merge_staging as 
SELECT A.id        AS ID, 
       A.firstname AS FirstName, 
       CASE 
         WHEN B.id IS NOT NULL THEN B.lastname 
         ELSE A.lastname 
       end         AS LastName 
FROM   merge_tgt AS A 
       LEFT OUTER JOIN merge_src AS B 
                    ON A.id = B.id; <br/>INSERT INTO merge_staging 
SELECT B.id        AS ID, 
       B.firstname AS FirstName, 
       B.lastname  AS LastName 
FROM   merge_src AS B 
       LEFT OUTER JOIN merge_staging AS A 
                    ON A.id = B.id 
WHERE  A.id IS NULL;<br/>insert overwrite table merge_tgt select * from merge_staging;<br/>--Second run<br/>INSERT overwrite table merge_src VALUES (2, 'cccc', 'kkkk'),(3, 'eeee', 'llll'),(7, 'xxxx', 'yyyy');
    </code>

## <font color=blue>Extraction</font>

### MySQL Set up

- Download Dataset
<code>docker cp Walmart_Store_sales.csv ra_mysql:Walmart_Store_sales.csv </code>
- Table creation
<code>docker exec -it ra_mysql bash<br/>mysql -p (type example when passsword prompted)<br/>SET GLOBAL local_infile=1;<br/>mysql --local-infile=1 -p<br/>CREATE DATABASE if not exists retail;<br/>use retail;<br/>CREATE TABLE if not exists walmart_sales (Store VARCHAR(255),Date Date,Weekly_Sales VARCHAR(255),Holiday_Flag VARCHAR(255),Temperature VARCHAR(255),Fuel_Price VARCHAR(255),CPI VARCHAR(255),Unemployment VARCHAR(255));<br/>show tables;<br/>LOAD DATA LOCAL INFILE '/Walmart_Store_sales.csv' INTO TABLE walmart_sales FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' IGNORE 1 ROWS (Store,@Date,Weekly_Sales,Holiday_Flag,Temperature,Fuel_Price,CPI,Unemployment) SET Date=STR_TO_DATE( @Date, '%d-%m-%Y' );
</code>

### Sqoop Import

- Import SQL data using Sqoop
<code>docker exec -i -t sqoop bash<br/>sqoop import \\
 --connect jdbc:mysql://mysql:3306/retail \\
 --username root \\
 --password example \\
 --table walmart_sales \\
 --fields-terminated-by "," \\
 --hive-import \\
 --hive-table raw_sales \\
 --hive-overwrite \\
 --m 1 \\
 --incremental append \\
 --check-column Date \\
 --last-value '1900-01-01'
</code>

- Job creation
<code>sqoop job --create retail_job \\
 -- import \\
 --connect jdbc:mysql://mysql:3306/retail \\
 --username root \\
 --password example \\
 --table walmart_sales \\
 --fields-terminated-by "," \\
 --hive-import \\
 --hive-table raw_sales \\
 --hive-overwrite \\
 --m 1 \\
 --incremental append \\
 --check-column Date \\
 --last-value '1900-01-01'<br/>sqoop job --exec retail_job
</code>

- Incremental Load
<code>insert into walmart_sales values(45,'2012-11-03','760281.43','0','58.85','3.882','192.3088989','8.667');</code>

## <font color=blue>Transformation and Load</font>


- Table creation
<code>
beeline
!connect jdbc:hive2://127.0.0.1:10000 scott tiger
create database if not exists retail;
use retail;
create table walmart_sales as 
select 
    cast(Store as int),
    cast(to_date(from_unixtime(unix_timestamp(`Date`, 'yyyy-MM-dd'))) as date) as date_of_entry,
    cast(Weekly_Sales as double),
    Holiday_Flag,
    cast(Temperature as float),
    cast(fuel_price as float),
    cast(cpi as double),
    cast(Unemployment as float)
from default.raw_sales;
</code>
- Queries to answer questions
 - Which store has minimum and maximum sales?
<code>
    with overall_sales as
(select Store,sum(Weekly_Sales) as total from walmart_sales group by Store)
select store,total,CAST(total AS DECIMAL(38,10)),ROUND(CAST(total AS DECIMAL(38,10)),2),dense_rank() over( order by total desc) from overall_sales;
    </code>
 - Which store has maximum standard deviation?<code>
select * from (select store,stddev(weekly_sales) as a from walmart_sales group by store) a order by a desc ;</code>
 - Coefficient of mean to standard deviation?<code>
 select store,stddev(weekly_sales)/avg(weekly_sales) from walmart_sales group by store;
</code>
 - Which store/s has good quarterly growth rate in Q3’2012?<code>
with Q3 as
(select store,sum(Weekly_Sales) as Quarter_Sales,'Q3' as Quarter from walmart_sales where date_of_entry &gt; '2012-07-01' and date_of_entry &lt; '2012-09-30' group by store)
,Q2 as
(select store,sum(Weekly_Sales) as Quarter_Sales,'Q2' as Quarter from walmart_sales where date_of_entry &gt; '2012-04-01' and date_of_entry &lt; '2012-06-30' group by store)
,Q2_Q3 as
(select * from Q3 union select * from Q2)
,final as
(select store,Quarter,(Quarter_Sales - lag(Quarter_Sales) over (partition by store order by Quarter)) as difference,round(100 * (Quarter_Sales - lag(Quarter_Sales) over (partition by store order by Quarter))/ lag(Quarter_Sales) over (partition by store order by Quarter),2) as growth_rate from Q2_Q3)
select * from final where growth_rate > 0 order by growth_rate desc;
    </code>
 - Find out holidays which have higher sales than the mean sales in non-holiday season for all stores together.<code>
    set non_holiday_mean = select avg(weekly_sales) from walmart_sales;
 set hive.strict.checks.cartesian.product=false;
select h.h_sales,h.date_of_entry from (select avg(Weekly_Sales) as h_sales,date_of_entry from walmart_sales where Holiday_Flag=1 group by date_of_entry) as h
left join (select avg(Weekly_Sales) as nh_avg from walmart_sales where Holiday_Flag=0) as nh
where h.h_sales > nh.nh_avg;
</code>

## <font color=blue>Project Overview</font>