In [1]:
spark.stop()

In [2]:
sc.stop()

In [3]:
from pyspark import SparkConf, SparkContext
# setMaster() - Set Spark Content Manager which is local[cpu_cores] 
config = SparkConf().setMaster("local[4]").setAppName("ETL Pipeline")
sc = SparkContext(conf=config)

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()

In [5]:
spark

In [8]:
hremployeeDF = spark.read.format("jdbc")\
.option("url", "jdbc:mysql://localhost:3306/hremployeeDB")\
.option("dbtable", "HR_Employee").option("user","root").option("password","hadoop@123")\
.option("driver", "com.mysql.cj.jdbc.Driver").load()

In [10]:
hremployeeDF.show(2)

+----------+--------------------+------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|EmployeeID|          Department|           JobRole|Attrition|Gender|Age|MaritalStatus|    Education|EducationField|   BusinessTravel|JobInvolvement|JobLevel|JobSatisfaction|Hourlyrate|Income|Salaryhike|OverTime|Workex|YearsSinceLastPromotion|EmpSatisfaction|TrainingTimesLastYear|WorkLifeBalance|Performance_Rating|
+----------+--------------------+------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|
|
+----------+--------------------+------------

In [11]:
# Show Physical plan of execution which is known as DAG.
hremployeeDF.explain()

== Physical Plan ==
*(1) Scan JDBCRelation(HR_Employee) [numPartitions=1] [EmployeeID#0,Department#1,JobRole#2,Attrition#3,Gender#4,Age#5,MaritalStatus#6,Education#7,EducationField#8,BusinessTravel#9,JobInvolvement#10,JobLevel#11,JobSatisfaction#12,Hourlyrate#13,Income#14,Salaryhike#15,OverTime#16,Workex#17,YearsSinceLastPromotion#18,EmpSatisfaction#19,TrainingTimesLastYear#20,WorkLifeBalance#21,Performance_Rating#22] PushedFilters: [], ReadSchema: struct<EmployeeID:int,Department:string,JobRole:string,Attrition:string,Gender:string,Age:int,Mar...


#### Materialized View of Table

In [12]:
hremployeeDF.createOrReplaceTempView("hremployee")

#### 1. Display Shape of hremployee table 
    * Show number of rows & number of columns

In [23]:
spark.sql(f"""
select count(*)  as row_count, {num_of_cols} as columns_count from hremployee
""").show()

+---------+-------------+
|row_count|columns_count|
+---------+-------------+
|     1469|           23|
+---------+-------------+



In [19]:
num_of_cols = len(hremployeeDF.columns)

In [28]:
spark.sql("describe hremployee").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|          EmployeeID|      int|   null|
|          Department|   string|   null|
|             JobRole|   string|   null|
|           Attrition|   string|   null|
|              Gender|   string|   null|
|                 Age|      int|   null|
|       MaritalStatus|   string|   null|
|           Education|   string|   null|
|      EducationField|   string|   null|
|      BusinessTravel|   string|   null|
|      JobInvolvement|   string|   null|
|            JobLevel|      int|   null|
|     JobSatisfaction|   string|   null|
|          Hourlyrate|      int|   null|
|              Income|      int|   null|
|          Salaryhike|      int|   null|
|            OverTime|   string|   null|
|              Workex|      int|   null|
|YearsSinceLastPro...|      int|   null|
|     EmpSatisfaction|   string|   null|
+--------------------+---------+-------+
only showing top

#### 2. Write a Query to show first three employee from each Job Role to join the company.

In [32]:
spark.sql(""" select * from (
select EmployeeID, Department, JobRole, row_number() 
over(partition by JobRole order by EmployeeID) as row_num from hremployee)
where row_num < 4""").show(40)

+----------+--------------------+--------------------+-------+
|EmployeeID|          Department|             JobRole|row_num|
+----------+--------------------+--------------------+-------+
|         1|               Sales|     Sales Executive|      1|
|        28|               Sales|     Sales Executive|      2|
|        40|               Sales|     Sales Executive|      3|
|         9|Research & Develo...|Manufacturing Dir...|      1|
|        16|Research & Develo...|Manufacturing Dir...|      2|
|        21|Research & Develo...|Manufacturing Dir...|      3|
|         3|Research & Develo...|Laboratory Techni...|      1|
|         5|Research & Develo...|Laboratory Techni...|      2|
|         6|Research & Develo...|Laboratory Techni...|      3|
|        22|               Sales|Sales Representative|      1|
|        34|               Sales|Sales Representative|      2|
|        37|               Sales|Sales Representative|      3|
|        10|Research & Develo...|Healthcare Repres...| 

#### 3. Write a Query to show Top three Employee from each JobRole earning Highest Salary.

#### 4. Show Top 3 Highest Package from overall Job Role.

#### 5. Lag()
Write a Spark SQL query to show employee in Ascending Order with respect to employee income comapred to previous income for each job role.

In [38]:
spark.sql("""
select *, (Income - prev_income) as difference from (select EmployeeID, JobRole, Income, 
lag(Income, 1) over (partition by JobRole order by EmployeeID) as prev_income 
from hremployee) as _ order by JobRole, difference
""").show()

+----------+--------------------+------+-----------+----------+
|EmployeeID|             JobRole|Income|prev_income|difference|
+----------+--------------------+------+-----------+----------+
|        10|Healthcare Repres...|  5237|       null|      null|
|       285|Healthcare Repres...|  4741|      13496|     -8755|
|      1183|Healthcare Repres...|  6842|      13966|     -7124|
|      1157|Healthcare Repres...|  4148|      11245|     -7097|
|       205|Healthcare Repres...|  6673|      13734|     -7061|
|       677|Healthcare Repres...|  4014|      10552|     -6538|
|       397|Healthcare Repres...|  4522|      10965|     -6443|
|       833|Healthcare Repres...|  5731|      12169|     -6438|
|      1065|Healthcare Repres...|  4035|      10466|     -6431|
|       745|Healthcare Repres...|  4777|      10999|     -6222|
|       736|Healthcare Repres...|  4240|      10388|     -6148|
|      1098|Healthcare Repres...|  4069|      10124|     -6055|
|        89|Healthcare Repres...|  4152|

In [45]:
spark.sql("""
select EmployeeID, JobRole, Income, 
Income - lag(Income, 1) over (partition by JobRole order by EmployeeID) as difference
from hremployee
""").show()

+----------+---------------+------+----------+
|EmployeeID|        JobRole|Income|difference|
+----------+---------------+------+----------+
|         1|Sales Executive|  5993|      null|
|        28|Sales Executive|  6825|       832|
|        40|Sales Executive|  5376|     -1449|
|        44|Sales Executive|  8726|      3350|
|        47|Sales Executive|  4568|     -4158|
|        49|Sales Executive|  5772|      1204|
|        53|Sales Executive|  5454|      -318|
|        55|Sales Executive|  4157|     -1297|
|        57|Sales Executive|  9069|      4912|
|        64|Sales Executive|  7637|     -1432|
|        71|Sales Executive|  5473|     -2164|
|        77|Sales Executive|  4312|     -1161|
|        83|Sales Executive| 10239|      5927|
|        90|Sales Executive|  9619|      -620|
|        92|Sales Executive|  5441|     -4178|
|        93|Sales Executive|  5209|      -232|
|        95|Sales Executive|  5010|      -199|
|        97|Sales Executive|  4999|       -11|
|        98|S

#### Lead()
    
    * Row's Next Records.

In [54]:
spark.sql("""
select employeeid, department, jobrole, age, gender, income, workex, 
LEAD(income, 2) over(order by employeeid) as next_income
from hremployee
""").show()

+----------+--------------------+--------------------+---+------+------+------+-----------+
|employeeid|          department|             jobrole|age|gender|income|workex|next_income|
+----------+--------------------+--------------------+---+------+------+------+-----------+
|         1|               Sales|     Sales Executive| 41|Female|  5993|     8|       2090|
|         2|Research & Develo...|  Research Scientist| 49|  Male|  5130|    10|       2909|
|         3|Research & Develo...|Laboratory Techni...| 37|  Male|  2090|     7|       3468|
|         4|Research & Develo...|  Research Scientist| 33|Female|  2909|     8|       3068|
|         5|Research & Develo...|Laboratory Techni...| 27|  Male|  3468|     6|       2670|
|         6|Research & Develo...|Laboratory Techni...| 32|  Male|  3068|     8|       2693|
|         7|Research & Develo...|Laboratory Techni...| 59|Female|  2670|    12|       9526|
|         8|Research & Develo...|Laboratory Techni...| 30|  Male|  2693|     1| 

#### NTILE()
    * Dividing Records into percetiles.

In [58]:
spark.sql("""
select employeeid, department, jobrole, age, gender, 
income, NTILE(4) over(order by income) as salary_quartiles 
from hremployee""").show()

+----------+--------------------+--------------------+---+------+------+----------------+
|employeeid|          department|             jobrole|age|gender|income|salary_quartiles|
+----------+--------------------+--------------------+---+------+------+----------------+
|       514|Research & Develo...|  Research Scientist| 20|  Male|  1009|               1|
|       728|Research & Develo...|  Research Scientist| 18|  Male|  1051|               1|
|       765|               Sales|Sales Representative| 28|  Male|  1052|               1|
|      1338|               Sales|Sales Representative| 30|  Male|  1081|               1|
|      1365|               Sales|Sales Representative| 29|  Male|  1091|               1|
|       178|Research & Develo...|Laboratory Techni...| 19|  Male|  1102|               1|
|       912|               Sales|Sales Representative| 25|  Male|  1118|               1|
|      1402|Research & Develo...|Laboratory Techni...| 31|Female|  1129|               1|
|       30

#### Find the Number of Employee in each percentile_group 0-25th, 25-50th, 50-75th, 75-100th using percent_rank and create category percentile_group using case when.

In [64]:
spark.sql("""
select 
case 
    when percetile_rank < 0.25 then "0-25th"
    when percetile_rank < 0.5 then "25-50th"
    when percetile_rank < 0.75 then "50-75th"
    else '75th-100th'
END AS percentile_group,
count(*) as employee_count
from(
select employeeid, department,jobrole, age, gender, 
income, percent_rank() over(partition by department 
order by income) as percetile_rank 
from hremployee) 
group by percentile_group 
order by percentile_group""").show()

+----------------+--------------+
|percentile_group|employee_count|
+----------------+--------------+
|          0-25th|           367|
|         25-50th|           366|
|         50-75th|           367|
|      75th-100th|           369|
+----------------+--------------+



#### Hive Integration with PySpark.

In [1]:
spark.stop()

In [2]:
!jps

14433 ResourceManager
14274 SecondaryNameNode
14597 NodeManager
13848 NameNode
5385 Jps
14042 DataNode
5306 SparkSubmit


In [3]:
# Spark integration with Hive WareHouse.
# config for Hive-Integration property-name "spark.sql.warehouse.dir" 
# value = "/user/hive/warehouse"
spark = (SparkSession.builder.appName("pyspark-Hive-Integration")
        .config("spark.sql.warehouse.dir","/user/hive/warehouse")
        .enableHiveSupport().getOrCreate())

In [4]:
spark

In [5]:
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [6]:
spark.sql("""
create database if not exists airlines
""")

DataFrame[]

In [9]:
spark.sql("""show databases""").show()

+------------+
|databaseName|
+------------+
|    airlines|
|     default|
+------------+



In [7]:
spark.sql("""use airlines""")

DataFrame[]

In [10]:
spark.sql("""show tables""").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [12]:
spark.sql("""
create table if not exists flights(DayofMonth int, 
DayOfWeek int, Carrier varchar(10), OriginAirportID int,
DestAirportID int, DepDelay int, ArrDelay int)
row format delimited
fields terminated by ','
lines terminated by '\n'
STORED AS TEXTFILE
TBLPROPERTIES('skip.header.line.count'='1')
""")

DataFrame[]

In [14]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|airlines|  flights|      false|
+--------+---------+-----------+



In [22]:
spark.sql("""load data local inpath '/home/hadoop/Downloads/raw_flight_data1.csv'
overwrite into table flights""")

DataFrame[]

In [16]:
spark.sql("""
create table if not exists airports(airport_id int, 
city varchar(50), state varchar(50), name varchar(100))
row format delimited
fields terminated by ','
lines terminated by '\n'
STORED AS TEXTFILE
TBLPROPERTIES('skip.header.line.count'='1')
""")

DataFrame[]

In [23]:
spark.sql("""load data local inpath '/home/hadoop/Downloads/airports1.csv'
overwrite into table airports""")

DataFrame[]

In [24]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|airlines| airports|      false|
|airlines|  flights|      false|
+--------+---------+-----------+



In [25]:
spark.sql("select * from airports").show()

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
|     10926|    Cordova|   AK|Merle K Mudhole S...|
|     14709|  Deadhorse|   AK|   Deadhorse Airport|
|     11336| Dillingham|   AK|  Dillingham Airport|
|     11630|  Fairbanks|   AK|Fairbanks Interna...|
|     11997|   Gustavus|   AK|    Gustavus Airport|
|     12523|     Juneau|   AK|Juneau International|
|     12819|  Ketchikan|   AK|Ketchikan Interna...|
|     10245|King Salmon|   AK| King Salmon Airport|
|     10170|     Kodiak|   AK|      Kodiak Airport|
|     13970|   Kotzebue|   AK| Ralph Wien Memorial|
|     13873|       Nome|   AK|        Nome Airport|
|     14256|

#### 1. Extract

In [27]:
flights_df = spark.table("airlines.flights")
airports_df = spark.table("airlines.airports")

In [28]:
airports_df.show(2)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
+----------+-----------+-----+--------------------+
only showing top 2 rows



### 2. Transformation

In [30]:
flights_join = flights_df.join(airports_df, 
                   on = flights_df.OriginAirportID == airports_df.airport_id, how = "inner")

In [31]:
flights_join.show(5)

+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|airport_id|          city|state|                name|
+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
|        19|        5|     DL|          11433|        13303|      -3|       1|     11433|       Detroit|   MI|Detroit Metro Way...|
|        19|        5|     DL|          14869|        12478|       0|      -8|     14869|Salt Lake City|   UT|Salt Lake City In...|
|        19|        5|     DL|          14057|        14869|      -4|     -15|     14057|      Portland|   OR|Portland Internat...|
|        19|        5|     DL|          15016|        11433|      28|      24|     15016|     St. Louis|   MO|Lambert-St. Louis...|
|        19|        5|     DL|          11193|        12892|      -6|     -1

#### 3. Load

In [37]:
flights_join = flights_join.repartition(4)

In [39]:
flights_join.write.parquet("file:///home/hadoop/Downloads/flights/")

In [40]:
# Read a parquet file format
flights_parquet_df = spark.read.parquet("file:///home/hadoop/Downloads/flights/")

In [42]:
flights_join.write.parquet("/flights1")

In [43]:
flights_join.write.partitionBy("Carrier").parquet("/airlines")

In [45]:
flights_join.show()

+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|airport_id|          city|state|                name|
+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
|         4|        7|     UA|          13930|        12953|     115|      91|     13930|       Chicago|   IL|Chicago O'Hare In...|
|        28|        7|     AS|          14679|        13830|     -11|     -27|     14679|     San Diego|   CA|San Diego Interna...|
|         3|        4|     WN|          11259|        12191|      28|      29|     11259|        Dallas|   TX|   Dallas Love Field|
|        29|        6|     WN|          11292|        13204|       6|     -11|     11292|        Denver|   CO|Denver International|
|         1|        3|     B6|          13204|        12478|      -3|      -

In [53]:
flights_join.write.bucketBy(col = 'state', numBuckets = 50).format("csv")\
.saveAsTable("bucketed_table")

In [55]:
flights_join.write.partitionBy('Carrier').bucketBy(col = 'state', numBuckets = 30)\
.format("parquet").saveAsTable("part_bucket_table")

In [57]:
spark.sql("select carrier, count(*) from part_bucket_table group by carrier").show()

+-------+--------+
|carrier|count(1)|
+-------+--------+
|     UA|  287601|
|     AA|  291771|
|     EV|  158253|
|     B6|  122297|
|     DL|  385040|
|     OO|  161102|
|     F9|   35821|
|     YV|   53022|
|     US|  235031|
|     MQ|  113634|
|     HA|   18658|
|     AS|   69056|
|     FL|   93013|
|     VX|   34869|
|     WN|  580029|
|     9E|   80221|
+-------+--------+



#### Load on MySQL.

In [58]:
connection_properties = {
    'user':"root",
    'password':"hadoop@123",
    'driver':'com.mysql.cj.jdbc.Driver'
}


flights_join.write.jdbc(url="jdbc:mysql://localhost:3306/flights", table= "airlines",
                       mode="overwrite", properties=connection_properties)

In [59]:
spark.stop()

In [60]:
sc.stop()