In [2]:
spark.stop()

In [3]:
sc.stop()

In [4]:
from pyspark import SparkConf, SparkContext
config = SparkConf().setMaster("local[4]").setAppName("ETL Pipeline")
sc = SparkContext(conf=config)

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()

In [6]:
spark

In [9]:
hremployeeDF = spark.read.format("jdbc")\
.option("url","jdbc:mysql://localhost:3306/hremployeeDB")\
.option("dbtable","HR_Employee").option("user","root").option("password","hadoop@123")\
.option("driver","com.mysql.cj.jdbc.Driver").load()

In [10]:
hremployeeDF.show(2)

+----------+--------------------+------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|EmployeeID|          Department|           JobRole|Attrition|Gender|Age|MaritalStatus|    Education|EducationField|   BusinessTravel|JobInvolvement|JobLevel|JobSatisfaction|Hourlyrate|Income|Salaryhike|OverTime|Workex|YearsSinceLastPromotion|EmpSatisfaction|TrainingTimesLastYear|WorkLifeBalance|Performance_Rating|
+----------+--------------------+------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|         1|               Sales|   Sales Executi

In [11]:
#Physical plan of execution which is known as DAB
hremployeeDF.explain()

== Physical Plan ==
*(1) Scan JDBCRelation(HR_Employee) [numPartitions=1] [EmployeeID#0,Department#1,JobRole#2,Attrition#3,Gender#4,Age#5,MaritalStatus#6,Education#7,EducationField#8,BusinessTravel#9,JobInvolvement#10,JobLevel#11,JobSatisfaction#12,Hourlyrate#13,Income#14,Salaryhike#15,OverTime#16,Workex#17,YearsSinceLastPromotion#18,EmpSatisfaction#19,TrainingTimesLastYear#20,WorkLifeBalance#21,Performance_Rating#22] PushedFilters: [], ReadSchema: struct<EmployeeID:int,Department:string,JobRole:string,Attrition:string,Gender:string,Age:int,Mar...


### Materialized view of table

In [12]:
hremployeeDF.createOrReplaceTempView("hremployee")

#### 1. Display shape of HR Employee
- show no.of rows and no.of columns

In [13]:
spark.sql("""SELECT COUNT(*) as No_of_rows FROM hremployee""").show()

AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

In [None]:
no_of_cols = len(hremployeeDF.columns)


In [None]:
spark.sql(f"""SELECT COUNT(*) as No_of_rows, {no_of_cols} as No_of_cols  FROM hremployee""").show()

In [None]:
# spark.sql(f"""
#     SELECT 
#         COUNT(*) as No_of_rows, 
#         SIZE(COLLECT_LIST(*)) as No_of_cols  
#     FROM 
#         hremployee""").show()

#### 2.Write a query to show first 3 employee from each job Role to join the company

In [None]:
spark.sql("""
    SELECT EmployeeID, JobRole, rank
    FROM (SELECT *,
               DENSE_RANK() OVER (PARTITION BY JobRole ORDER BY EmployeeID) as rank
        FROM hremployee) as _
    WHERE rank <= 3
""").show()

#### 3.  Write a query to show top 3 employee from each job role earning highest salary

In [None]:
spark.sql("""
    SELECT EmployeeID, JobRole,Income, rank
    FROM (SELECT *,
               DENSE_RANK() OVER (PARTITION BY JobRole ORDER BY Income DESC) as rank
        FROM hremployee) as _
    WHERE rank <= 3
""").show()

#### Show top 3 highest package from over all job role

In [None]:
spark.sql("""
    SELECT EmployeeID, JobRole,Income,rank
    FROM (SELECT *,
               DENSE_RANK() OVER (ORDER BY Income DESC) as rank
        FROM hremployee) as _
    WHERE rank <= 3
""").show()

#### 5. Write a Spark SQL query to show employee in order of Ascending order with respect to employee income compared to previous income for each job role

In [None]:
spark.sql("""
    SELECT  EmployeeID,JobRole, Income,
    LAG(Income, 1) OVER (PARTITION BY JobRole ORDER BY EmployeeID) AS prev,
    Income-LAG(Income, 1) OVER (PARTITION BY JobRole ORDER BY EmployeeID) as diff
    FROM hremployee
    ORDER BY JobRole, diff
""").show()



In [None]:
spark.sql("""
    SELECT  EmployeeID,JobRole, Income,
    LAG(Income, 1) OVER (PARTITION BY JobRole ORDER BY EmployeeID) AS prev,
    Income-LAG(Income, 1) OVER (PARTITION BY JobRole ORDER BY EmployeeID) as diff
    FROM hremployee
""").show()


#### Lead()
- Row's Next records

In [None]:
spark.sql("""
    SELECT  
        EmployeeID,
        Department,
        JobRole, 
        age, 
        gender, 
        Income, 
        Workex,
        LEAD(Income, 2,0) OVER(PARTITION BY JobRole ORDER BY EmployeeID) AS next
    FROM hremployee
""").show()

### NTILE()
- Dividing records int0 percentile

In [None]:
spark.sql("""
    SELECT  
        EmployeeID,
        Department,
        JobRole, 
        age, 
        gender, 
        Income, 
        NTILE(4) OVER(ORDER BY Income) AS salary_quartiles
    FROM hremployee
""").show()

#### Find the no.of employees in each percentile_group 0-25th, 25th-50th, 50th-75th,75th-100 using percent_rank and  create a new category using case when.

In [None]:
spark.sql("""
    WITH CTE AS (
        SELECT  
            EmployeeID,
            Department,
            PERCENT_RANK() OVER(PARTITION BY Department ORDER BY Income) AS quartile
        FROM hremployee
    ), 
    GROUPS AS (
        SELECT
            CASE
                WHEN quartile < 0.25 THEN '0-25th'
                WHEN quartile >= 0.25 AND quartile < 0.50 THEN '25th-50th'
                WHEN quartile >= 0.50 AND quartile < 0.75 THEN '50th-75th'
                WHEN quartile >= 0.75 THEN '75th-100th'
                ELSE NULL
            END AS percent_group
        FROM CTE
    )
    SELECT 
        percent_group,
        COUNT(*) AS count
    FROM GROUPS
    GROUP BY percent_group
    ORDER BY percent_group
    
""").show()


## Hive integration with PySpark

In [7]:
spark.stop()

In [1]:
!jps

2722 DataNode
2547 NameNode
2084 NodeManager
3525 SparkSubmit
1894 ResourceManager
3593 Jps
3002 SecondaryNameNode


In [2]:
#Spark integration with Hive WareHouse
# config name for Hive Integration property name "spark.sql.warehouse.dir" value = ""/user/hive/warehouse"

spark = (SparkSession.builder.appName("pyspark-hive-integration")
         .config("spark.sql.warehouse.dir","/user/hive/warehouse") # main lines of session enabling for hive
        .enableHiveSupport().getOrCreate())

In [3]:
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|    airlines|
|     default|
+------------+



In [4]:
spark.sql("""
    CREATE DATABASE IF NOT EXISTS AIRLINES
""")

DataFrame[]

In [5]:
spark.sql("""
    use airlines
""")

DataFrame[]

In [6]:
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|    airlines|
|     default|
+------------+



In [7]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|airlines| airports|      false|
|airlines|  flights|      false|
+--------+---------+-----------+



In [8]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS flights(DayofMonth int, 
    DayOfWeek int, 
    Carrier VARCHAR(10), 
    OriginAirportID int, 
    DestAirportID int, 
    DepDelay int, 
    ArrDelay int
) 
row format delimited
fields terminated by ','
lines terminated by '\n'
STORED AS TEXTFILE
TBLPROPERTIES('skip.header.line.count'='1')
""")

DataFrame[]

In [9]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|airlines| airports|      false|
|airlines|  flights|      false|
+--------+---------+-----------+



In [10]:
spark.sql("""LOAD DATA LOCAL inpath '/home/hadoop/Downloads/hivedata/raw_flight_data.csv' overwrite into table flights""")

DataFrame[]

In [11]:
spark.sql("SELECT * FROM flights").show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

In [12]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS airports(airport_id int,
    city varchar(50), state varchar(50),name varchar(100)
) 
row format delimited
fields terminated by ','
lines terminated by '\n'
STORED AS TEXTFILE
TBLPROPERTIES('skip.header.line.count'='1')
""")

DataFrame[]

In [13]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|airlines| airports|      false|
|airlines|  flights|      false|
+--------+---------+-----------+



In [14]:
spark.sql("""LOAD DATA LOCAL inpath '/home/hadoop/Downloads/hivedata/airports.csv' overwrite into table airports""")

DataFrame[]

In [15]:
spark.sql("SELECT * FROM airports").show()

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
|     10926|    Cordova|   AK|Merle K Mudhole S...|
|     14709|  Deadhorse|   AK|   Deadhorse Airport|
|     11336| Dillingham|   AK|  Dillingham Airport|
|     11630|  Fairbanks|   AK|Fairbanks Interna...|
|     11997|   Gustavus|   AK|    Gustavus Airport|
|     12523|     Juneau|   AK|Juneau International|
|     12819|  Ketchikan|   AK|Ketchikan Interna...|
|     10245|King Salmon|   AK| King Salmon Airport|
|     10170|     Kodiak|   AK|      Kodiak Airport|
|     13970|   Kotzebue|   AK| Ralph Wien Memorial|
|     13873|       Nome|   AK|        Nome Airport|
|     14256|

### 1.Extract

In [20]:
flights_df = spark.table("airlines.flights")

In [21]:
airports_df = spark.table("airlines.airports")

In [22]:
airports_df.show(2)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
+----------+-----------+-----+--------------------+
only showing top 2 rows



### 2.Transformation

In [23]:
flights_join = flights_df.join(airports_df, on = flights_df.OriginAirportID == airports_df.airport_id, how="inner")

In [24]:
flights_join.show(10)

+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|airport_id|          city|state|                name|
+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
|        19|        5|     DL|          11433|        13303|      -3|       1|     11433|       Detroit|   MI|Detroit Metro Way...|
|        19|        5|     DL|          14869|        12478|       0|      -8|     14869|Salt Lake City|   UT|Salt Lake City In...|
|        19|        5|     DL|          14057|        14869|      -4|     -15|     14057|      Portland|   OR|Portland Internat...|
|        19|        5|     DL|          15016|        11433|      28|      24|     15016|     St. Louis|   MO|Lambert-St. Louis...|
|        19|        5|     DL|          11193|        12892|      -6|     -1

### 3. Load

In [25]:
flights_join=flights_join.repartition(4)

In [46]:
flights_join.write.parquet("file:///home/hadoop/Downloads/flights")

In [47]:
# read a parquet file format
flights_parquet_df = spark.read.parquet("file:///home/hadoop/Downloads/flights")

In [50]:
flights_join.write.parquet("/flights1")

In [51]:
flights_join.write.partitionBy("Carrier").parquet("/airlines")

In [26]:
flights_join.show()

+----------+---------+-------+---------------+-------------+--------+--------+----------+-----------------+-----+--------------------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|airport_id|             city|state|                name|
+----------+---------+-------+---------------+-------------+--------+--------+----------+-----------------+-----+--------------------+
|        20|        1|     WN|          14908|        13796|      -1|     -10|     14908|        Santa Ana|   CA|John Wayne Airpor...|
|         9|        7|     WN|          14831|        13891|      -1|      -3|     14831|         San Jose|   CA|Norman Y. Mineta ...|
|        31|        5|     WN|          10821|        12451|      22|      14|     10821|        Baltimore|   MD|Baltimore/Washing...|
|        30|        4|     F9|          11292|        12892|      74|      82|     11292|           Denver|   CO|Denver International|
|        20|        1|     WN|          13232|        1

In [27]:
flights_join.write.bucketBy(col = 'state', numBuckets = 50).format('csv')\
.saveAsTable("bucketed_table")

In [29]:
flights_join.write.partitionBy('Carrier').bucketBy(col = 'state', numBuckets = 50).format('parquet')\
.saveAsTable("part_bucket_table")

In [30]:
spark.sql("select carrier, count(*) from part_bucket_table group by carrier").show()

+-------+--------+
|carrier|count(1)|
+-------+--------+
|     UA|  122443|
|     AA|  124037|
|     EV|   46563|
|     B6|   51381|
|     DL|  134724|
|     OO|   69785|
|     F9|    9811|
|     YV|   14612|
|     US|  100668|
|     MQ|   45926|
|     HA|    4962|
|     AS|   28796|
|     FL|   28053|
|     VX|   14683|
|     WN|  216101|
|     9E|   36030|
+-------+--------+



+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|          DayofMonth|      int|   null|
|           DayOfWeek|      int|   null|
|     OriginAirportID|      int|   null|
|       DestAirportID|      int|   null|
|            DepDelay|      int|   null|
|            ArrDelay|      int|   null|
|          airport_id|      int|   null|
|                city|   string|   null|
|               state|   string|   null|
|                name|   string|   null|
|             Carrier|   string|   null|
|# Partition Infor...|         |       |
|          # col_name|data_type|comment|
|             Carrier|   string|   null|
+--------------------+---------+-------+



### Load on MySQL

In [34]:
connection_properties = {
    'user':'root',
    'password':'hadoop@123',
    'driver':'com.mysql.cj.jdbc.Driver'
}

flights_join.write.jdbc(url="jdbc:mysql://localhost:3306/flights", table="airlines",
                        mode="overwrite", properties=connection_properties)

In [35]:
spark.stop()

In [36]:
sc.stop()