In [2]:
spark.stop()

In [9]:
from pyspark import SparkContext, SparkConf
config = SparkConf().setAppName("PySparkSession").setMaster("local[4]")
sc = SparkContext(conf = config)

In [10]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySparkSession").getOrCreate()

In [2]:
sc

In [3]:
spark

In [4]:
from pyspark.sql import SparkSession
mysql_connector_path = "/home/hadoop/Downloads/mysql-connector-java-8.0.13.jar"

In [5]:
spark = SparkSession.builder.appName("ETL Pipeline").getOrCreate()

In [None]:
# pyspark --jars "Path of jar package"

## Spark Integration with MySQL

In [6]:
hremployeeDF = spark.read.format("jdbc")\
    .option("url", "jdbc:mysql://localhost:3306/hremployeeDB")\
    .option("dbtable", "hremployee").option("user", "root").option("password", "hadoop@123")\
    .option("driver", "com.mysql.cj.jdbc.Driver").load()

## Spark Integration with Hive

In [None]:
#spark integration with Hive 
spark = (SparkSession.builder.appName("pyspark-hive-intergraion")
        .config('spark.sql.warehouse.dir','/user/hive/warehouse')
        .enableHiveSupport().getOrCreate())

In [14]:
#hr_df = spark.read.csv("file:///home/hadoop/Downloads/HR_Employee.csv", inferSchema=True, header=True)

In [7]:
hremployeeDF.show()

+----------+--------------------+--------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|EmployeeID|          Department|             JobRole|Attrition|Gender|Age|MaritalStatus|    Education|EducationField|   BusinessTravel|JobInvolvement|JobLevel|JobSatisfaction|Hourlyrate|Income|Salaryhike|OverTime|Workex|YearsSinceLastPromotion|EmpSatisfaction|TrainingTimesLastYear|WorkLifeBalance|Performance_Rating|
+----------+--------------------+--------------------+---------+------+---+-------------+-------------+--------------+-----------------+--------------+--------+---------------+----------+------+----------+--------+------+-----------------------+---------------+---------------------+---------------+------------------+
|         1|               Sales|     Sales

In [8]:
hremployeeDF.createOrReplaceTempView('hremployee')

In [9]:
#show physical plan of execution ie. DAG
hremployeeDF.explain()

== Physical Plan ==
*(1) Scan JDBCRelation(hremployee) [numPartitions=1] [EmployeeID#0,Department#1,JobRole#2,Attrition#3,Gender#4,Age#5,MaritalStatus#6,Education#7,EducationField#8,BusinessTravel#9,JobInvolvement#10,JobLevel#11,JobSatisfaction#12,Hourlyrate#13,Income#14,Salaryhike#15,OverTime#16,Workex#17,YearsSinceLastPromotion#18,EmpSatisfaction#19,TrainingTimesLastYear#20,WorkLifeBalance#21,Performance_Rating#22] PushedFilters: [], ReadSchema: struct<EmployeeID:string,Department:string,JobRole:string,Attrition:string,Gender:string,Age:int,...


#### 2. Write a query to display max salary from each department

In [18]:
spark.sql("""
select Department, max(Income) from hremployee group by Department order by max(Income) Desc
""").show(truncate=False)

+----------------------+-----------+
|Department            |max(Income)|
+----------------------+-----------+
|Research & Development|19999      |
|Sales                 |19847      |
|Human Resources       |19717      |
+----------------------+-----------+



#### 3.write a query to show employee details (empid, age , department , jobrole) having highest salary from each department

In [20]:
spark.sql("""
select EmployeeID, age, Department, JobRole, Income from hremployee where Income in 
(select max(Income) from hremployee group by Department)
""").show()

+----------+---+--------------------+-------+------+
|EmployeeID|age|          Department|JobRole|Income|
+----------+---+--------------------+-------+------+
|       191| 52|Research & Develo...|Manager| 19999|
|       919| 51|               Sales|Manager| 19847|
|       957| 56|     Human Resources|Manager| 19717|
+----------+---+--------------------+-------+------+



#### 1. Display shape of hremployee table
* show number of records and number of columns

In [18]:
print(hr_df.count())

1469


In [23]:
print(len(hr_df.columns))

23


In [10]:
spark.sql("""select count(*) number_of_rows from hremployee""").show()

+--------------+
|number_of_rows|
+--------------+
|          1469|
+--------------+



In [34]:
spark.sql("""
select 
    count(*) number_of_rows
    --(select count(*) as count from hremployee.columns ) number_of_columns  
    from hremployee
""").show()

+--------------+
|number_of_rows|
+--------------+
|          1469|
+--------------+



In [11]:
spark.sql("""
select count(*) count from information_schema.columns where table_name = hremployee
""").show()

AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

#### 2. write a query to show first three employees from each job role to join the company

In [11]:
spark.sql("""
select * from
    (select 
        EmployeeID, 
        Department,
        JobRole,
        row_number() over(partition by JobRole order by EmployeeID) row_num
        from hremployee)
where row_num < 4
""").show()

+----------+--------------------+--------------------+-------+
|EmployeeID|          Department|             JobRole|row_num|
+----------+--------------------+--------------------+-------+
|         1|               Sales|     Sales Executive|      1|
|      1012|               Sales|     Sales Executive|      2|
|      1014|               Sales|     Sales Executive|      3|
|      1003|Research & Develo...|Manufacturing Dir...|      1|
|      1034|Research & Develo...|Manufacturing Dir...|      2|
|      1038|Research & Develo...|Manufacturing Dir...|      3|
|       100|Research & Develo...|Laboratory Techni...|      1|
|      1001|Research & Develo...|Laboratory Techni...|      2|
|      1002|Research & Develo...|Laboratory Techni...|      3|
|      1013|               Sales|Sales Representative|      1|
|      1022|               Sales|Sales Representative|      2|
|      1057|               Sales|Sales Representative|      3|
|        10|Research & Develo...|Healthcare Repres...| 

In [12]:
#### 3. write a query to show top 3 employee from each job role earning max salary

In [26]:
spark.sql("""
select * from
    (select 
        Income, 
        Department,
        JobRole,
        rank() over(partition by JobRole order by Income desc) rank
        from hremployee)
where rank < 4
""").show(truncate = False)

+------+----------------------+-------------------------+----+
|Income|Department            |JobRole                  |rank|
+------+----------------------+-------------------------+----+
|13872 |Sales                 |Sales Executive          |1   |
|13770 |Sales                 |Sales Executive          |2   |
|13758 |Sales                 |Sales Executive          |3   |
|13973 |Research & Development|Manufacturing Director   |1   |
|13826 |Research & Development|Manufacturing Director   |2   |
|13726 |Research & Development|Manufacturing Director   |3   |
|7403  |Research & Development|Laboratory Technician    |1   |
|6782  |Research & Development|Laboratory Technician    |2   |
|6674  |Research & Development|Laboratory Technician    |3   |
|6632  |Sales                 |Sales Representative     |1   |
|5405  |Sales                 |Sales Representative     |2   |
|4502  |Sales                 |Sales Representative     |3   |
|13966 |Research & Development|Healthcare Representativ

In [14]:
#### 4. show top 3 highest package for all jobrole

In [25]:
spark.sql("""
select * from
    (select 
        Income, 
        Department,
        JobRole,
        rank() over(order by Income desc) rank
        from hremployee)
        limit 3
""").show()

+------+--------------------+-----------------+----+
|Income|          Department|          JobRole|rank|
+------+--------------------+-----------------+----+
| 19999|Research & Develo...|          Manager|   1|
| 19973|Research & Develo...|Research Director|   2|
| 19943|Research & Develo...|          Manager|   3|
+------+--------------------+-----------------+----+



In [23]:
spark.sql("select Income, JobRole from hremployee order by Income Desc limit 3").show()

+------+-----------------+
|Income|          JobRole|
+------+-----------------+
| 19999|          Manager|
| 19973|Research Director|
| 19943|          Manager|
+------+-----------------+



#### 5. Dense_rank()

In [34]:
spark.sql("""
select 
    employeeid, 
    department, 
    jobrole, 
    age, 
    gender, 
    income , 
    workex,
    dense_rank() over(partition by jobrole order by age DESC, workex DESC) as Rank
    from hremployee
""").show()

+----------+----------+---------------+---+------+------+------+----+
|employeeid|department|        jobrole|age|gender|income|workex|Rank|
+----------+----------+---------------+---+------+------+------+----+
|       428|     Sales|Sales Executive| 60|Female| 10266|    22|   1|
|       880|     Sales|Sales Executive| 60|  Male|  5220|    12|   2|
|       537|     Sales|Sales Executive| 60|  Male|  5405|    10|   3|
|        64|     Sales|Sales Executive| 59|Female|  7637|    28|   4|
|        71|     Sales|Sales Executive| 59|Female|  5473|    20|   5|
|       898|     Sales|Sales Executive| 59|Female|  5171|    13|   6|
|        99|     Sales|Sales Executive| 58|  Male| 13872|    38|   7|
|       402|     Sales|Sales Executive| 56|Female| 13212|    36|   8|
|      1371|     Sales|Sales Executive| 56|Female|  5380|     6|   9|
|        83|     Sales|Sales Executive| 55|  Male| 10239|    24|  10|
|       976|     Sales|Sales Executive| 55|  Male| 13695|    24|  10|
|       609|     Sal

#### lag()
* lag() is a window function that allows you access records from row
* it will be useful in comparing records between rows or calculating differences
* lag(columnname, offset, default) - lag(income, 2,0)

In [39]:
spark.sql("""
select 
    employeeid, 
    department, 
    jobrole, 
    age, 
    gender, 
    income , 
    workex,
    lag(income,1) over(partition by jobrole order by employeeid) as previous_income
    from hremployee
""").show()

+----------+----------+---------------+---+------+------+------+---------------+
|employeeid|department|        jobrole|age|gender|income|workex|previous_income|
+----------+----------+---------------+---+------+------+------+---------------+
|         1|     Sales|Sales Executive| 41|Female|  5993|     8|           null|
|      1012|     Sales|Sales Executive| 36|Female|  9278|    15|           5993|
|      1014|     Sales|Sales Executive| 30|Female|  4779|     8|           9278|
|      1020|     Sales|Sales Executive| 36|Female|  5647|    11|           4779|
|      1026|     Sales|Sales Executive| 24|Female|  4162|     5|           5647|
|      1027|     Sales|Sales Executive| 32|  Male|  9204|     7|           4162|
|      1031|     Sales|Sales Executive| 31|  Male| 10793|    13|           9204|
|      1032|     Sales|Sales Executive| 46|  Male| 10096|    28|          10793|
|      1039|     Sales|Sales Executive| 48|  Male|  5486|    15|          10096|
|      1042|     Sales|Sales

In [40]:
#### Lead()

In [41]:
spark.sql("""
select 
    employeeid, 
    department, 
    jobrole, 
    age, 
    gender, 
    income , 
    workex,
    lead(income,1) over(partition by jobrole order by employeeid) as next_income
    from hremployee
""").show()

+----------+----------+---------------+---+------+------+------+-----------+
|employeeid|department|        jobrole|age|gender|income|workex|next_income|
+----------+----------+---------------+---+------+------+------+-----------+
|         1|     Sales|Sales Executive| 41|Female|  5993|     8|       9278|
|      1012|     Sales|Sales Executive| 36|Female|  9278|    15|       4779|
|      1014|     Sales|Sales Executive| 30|Female|  4779|     8|       5647|
|      1020|     Sales|Sales Executive| 36|Female|  5647|    11|       4162|
|      1026|     Sales|Sales Executive| 24|Female|  4162|     5|       9204|
|      1027|     Sales|Sales Executive| 32|  Male|  9204|     7|      10793|
|      1031|     Sales|Sales Executive| 31|  Male| 10793|    13|      10096|
|      1032|     Sales|Sales Executive| 46|  Male| 10096|    28|       5486|
|      1039|     Sales|Sales Executive| 48|  Male|  5486|    15|       8463|
|      1042|     Sales|Sales Executive| 28|  Male|  8463|     6|       4373|

#### NTLE()
* Dividing records into quantiles

In [47]:
spark.sql("""
select 
    employeeid, 
    department, 
    jobrole,
    income , 
    workex,
    NTILE(2) over(order by income DESC) as salary_quantile
    from hremployee
""").show(1400)

+----------+--------------------+--------------------+------+------+---------------+
|employeeid|          department|             jobrole|income|workex|salary_quantile|
+----------+--------------------+--------------------+------+------+---------------+
|       191|Research & Develo...|             Manager| 19999|    34|              1|
|       747|Research & Develo...|   Research Director| 19973|    21|              1|
|       852|Research & Develo...|             Manager| 19943|    28|              1|
|       166|Research & Develo...|             Manager| 19926|    21|              1|
|       569|Research & Develo...|             Manager| 19859|    24|              1|
|       919|               Sales|             Manager| 19847|    31|              1|
|       750|               Sales|             Manager| 19845|    33|              1|
|      1242|               Sales|             Manager| 19833|    21|              1|
|       899|Research & Develo...|   Research Director| 19740|    

In [None]:
#### calculate max , mean, average income from each quantile

In [1]:
spark.sql("""
select mean(income) as Mean, max(income) as Maximum, percentile(income,0.5),salary_quantile as Median from
(select 
    employeeid, 
    department, 
    jobrole,
    income , 
    workex,
    NTILE(4) over(order by income DESC) as salary_quantile
    from hremployee)
    group by salary_quantile
""").show()

AnalysisException: 'Table or view not found: hremployee; line 10 pos 9'

#### percent_rank()
* window function that calculates relative rank() of row within a result set expressed as percentage

In [126]:
spark.sql("""
select 
    employeeid, 
    department, 
    jobrole,
    income , 
    workex,
    percent_rank() over(partition by department order by income Desc) as percent_rank
    from hremployee
""").show(hremployeeDF.count())

+----------+--------------------+--------------------+------+------+--------------------+
|employeeid|          department|             jobrole|income|workex|        percent_rank|
+----------+--------------------+--------------------+------+------+--------------------+
|       919|               Sales|             Manager| 19847|    31|                 0.0|
|       750|               Sales|             Manager| 19845|    33|0.002252252252252...|
|      1242|               Sales|             Manager| 19833|    21|0.004504504504504...|
|      1116|               Sales|             Manager| 19586|    36|0.006756756756756757|
|       234|               Sales|             Manager| 19517|    32|0.009009009009009009|
|      1126|               Sales|             Manager| 19331|    27| 0.01126126126126126|
|       238|               Sales|             Manager| 19068|    33|0.013513513513513514|
|        30|               Sales|             Manager| 18947|    22|0.015765765765765764|
|       91

#### find the total count of employee in each quartile 0-25, 25-50, 50-75, 75-100

+--------+---+---+---+---+
|count(1)| Q1| Q2| Q3| Q4|
+--------+---+---+---+---+
|     367|  0|  0|  0|  1|
|       2|  1|  1|  0|  0|
|     367|  1|  0|  0|  0|
|       3|  0|  1|  1|  0|
|       1|  0|  0|  1|  1|
|     365|  0|  0|  1|  0|
|     364|  0|  1|  0|  0|
+--------+---+---+---+---+



In [86]:
spark.sql("""
select count(*) as employee_count from
(select 
    employeeid, 
    department, 
    jobrole,
    income , 
    workex,
    NTILE(4) over(order by income DESC) as salary_quantile
    from hremployee)
    group by salary_quantile
""").show()

+--------------+
|employee_count|
+--------------+
|           368|
|           367|
|           367|
|           367|
+--------------+



In [119]:
spark.sql("""
select *, count(*) as Employee_count from (select 
case when pr <= 0.25 then 1 else 0 end  as Q1,
case when pr > 0.25 and pr<= 0.5 then 1 else 0 end as Q2 ,
case when pr > 0.50 and pr<= 0.75 then 1 else 0 end  as Q3,
case when pr > 0.75 and pr <=1 then 1 else 0 end as Q4
from
(select 
    employeeid, 
    department, 
    jobrole,
    income , 
    workex,
    percent_rank() over(partition by department order by income Desc) as pr
    from hremployee))
    group by Q1,Q2,Q3,Q4
    
""").show()

+---+---+---+---+--------------+
| Q1| Q2| Q3| Q4|Employee_count|
+---+---+---+---+--------------+
|  0|  0|  0|  1|           367|
|  1|  0|  0|  0|           369|
|  0|  0|  1|  0|           366|
|  0|  1|  0|  0|           367|
+---+---+---+---+--------------+



In [127]:
spark.sql("""
select count(*) , percent_category from (
select
case 
    when percent_rank <0.25 then 'Q1'
    when percent_rank <= 0.5 then 'Q2' 
    when percent_rank <= 0.75 then 'Q3'
    else 'Q4'
end as percent_category
from(
select 
    employeeid, 
    department, 
    jobrole,
    income , 
    workex,
    percent_rank() over(partition by department order by income) as percent_rank
    from hremployee))
    group by percent_category
    order by percent_category
""").show()

+--------+----------------+
|count(1)|percent_category|
+--------+----------------+
|     367|              Q1|
|     369|              Q2|
|     366|              Q3|
|     367|              Q4|
+--------+----------------+



In [128]:
#### Show jobrole with maximum attrition Rate (in percentage)

In [146]:
spark.sql("""
select 
    jobrole, 
    round((100*(sum(case when new_attrition = 1 then 1 else 0 end)/count(*))),3) as AttritionRate 
    from ( 
        select 
            jobrole,
            case 
            when attrition = 'Yes' then 1 else 0
            end as new_attrition 
            from hremployee)
    group by jobrole
    order by AttritionRate desc
""").show()

+--------------------+-------------+
|             jobrole|AttritionRate|
+--------------------+-------------+
|Sales Representative|       40.244|
|Laboratory Techni...|       23.938|
|     Human Resources|       23.077|
|     Sales Executive|       17.485|
|  Research Scientist|       16.096|
|Manufacturing Dir...|        6.897|
|Healthcare Repres...|         6.87|
|             Manager|        4.902|
|   Research Director|          2.5|
+--------------------+-------------+



In [149]:
spark.sql("""
select 
    jobrole,
    round((100*(sum(case when attrition = 'Yes' then 1 else 0 end)/count(*))),3) as AttritionRate 
    from hremployee
    group by jobrole
    order by AttritionRate desc
""").show()

+--------------------+-------------+
|             jobrole|AttritionRate|
+--------------------+-------------+
|Sales Representative|       40.244|
|Laboratory Techni...|       23.938|
|     Human Resources|       23.077|
|     Sales Executive|       17.485|
|  Research Scientist|       16.096|
|Manufacturing Dir...|        6.897|
|Healthcare Repres...|         6.87|
|             Manager|        4.902|
|   Research Director|          2.5|
+--------------------+-------------+



#### Hbase integration with Pyspark

In [1]:
spark.stop()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HbaseIntegration").getOrCreate()

In [3]:
hbase_conf = {
    "hbase.zookeeper.quorum": 'localhost',
    "hbase.mapreduce.inputtable": 'hcustomer',
    "hbase.client.scanner.timeout.period": "600000"
}

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38965)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38965)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38965)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception

In [None]:
df = spark.read.format("org.apache.hadoop.hbase.spark").options(**hbase_conf).load()

#### Hive integration with pyspark

In [3]:
#spark integration with Hive with spark session
spark = (SparkSession.builder.appName("pyspark-hive-intergraion")
        .config('spark.sql.warehouse.dir','/user/hive/warehouse')
        .enableHiveSupport().getOrCreate())

In [4]:
spark.sql("show databases").show()

+------------+
|databaseName|
+------------+
|    airlines|
|  banking_db|
|     default|
+------------+



In [5]:
spark.sql("drop table if exists airlines.flights")

DataFrame[]

In [6]:
spark.sql("create database if not exists airlines")

DataFrame[]

In [7]:
spark.sql("""create table if not exists airlines.flights(DayofMonth int, DayOfWeek int, Carrier varchar(10),
          OriginAirportID int, DestAirportID int, DepDelay int, ArrDelay int)
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          LINES TERMINATED BY '\n'
          STORED AS TEXTFILE
          """)
# tblproperties("skip.header.line.count" = 1)

DataFrame[]

In [8]:
spark.sql("use airlines")

DataFrame[]

In [9]:
spark.sql(" describe flights").show()

+---------------+---------+-------+
|       col_name|data_type|comment|
+---------------+---------+-------+
|     DayofMonth|      int|   null|
|      DayOfWeek|      int|   null|
|        Carrier|   string|   null|
|OriginAirportID|      int|   null|
|  DestAirportID|      int|   null|
|       DepDelay|      int|   null|
|       ArrDelay|      int|   null|
+---------------+---------+-------+



In [10]:
spark.sql("""
    load data local inpath '/home/hadoop/Downloads/raw_flight_data1.csv'
    overwrite into table flights
""")

DataFrame[]

In [11]:
# spark.sql("""LOAD DATA INPATH '/home/hadoop/Downloads/raw_flight_data.csv'
# INTO TABLE flights
# FIELDS TERMINATED BY ','
# ENCLOSED BY '"'
# LINES TERMINATED BY '\n'
# IGNORE 1 ROWS
# """)

In [12]:
spark.sql("""
select  * from flights
""").show()

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|      

In [21]:
spark.sql("drop table if exists airlines.airports")

DataFrame[]

In [22]:
spark.sql("""create table if not exists airlines.airports(airport_id int, city string, 
            state varchar(20), name string)
          ROW FORMAT DELIMITED
          FIELDS TERMINATED BY ','
          LINES TERMINATED BY '\n'
          STORED AS TEXTFILE
          """)

DataFrame[]

In [23]:
spark.sql("describe airports").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|airport_id|      int|   null|
|      city|   string|   null|
|     state|   string|   null|
|      name|   string|   null|
+----------+---------+-------+



In [24]:
spark.sql("""
    load data local inpath '/home/hadoop/Downloads/airports1.csv'
    overwrite into table airports
""")

DataFrame[]

In [25]:
spark.sql("""
select * from airports
""").show()

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
|     10926|    Cordova|   AK|Merle K Mudhole S...|
|     14709|  Deadhorse|   AK|   Deadhorse Airport|
|     11336| Dillingham|   AK|  Dillingham Airport|
|     11630|  Fairbanks|   AK|Fairbanks Interna...|
|     11997|   Gustavus|   AK|    Gustavus Airport|
|     12523|     Juneau|   AK|Juneau International|
|     12819|  Ketchikan|   AK|Ketchikan Interna...|
|     10245|King Salmon|   AK| King Salmon Airport|
|     10170|     Kodiak|   AK|      Kodiak Airport|
|     13970|   Kotzebue|   AK| Ralph Wien Memorial|
|     13873|       Nome|   AK|        Nome Airport|
|     14256|

In [36]:
flights_df = spark.table("flights")
airports_df = spark.table("airports")

In [37]:
flights_df.show(5)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 5 rows



In [38]:
airports_df.show(5)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
+----------+-----------+-----+--------------------+
only showing top 5 rows



#### Transformations

In [39]:
flights_join = flights_df.join(airports_df, on = airports_df.airport_id == flights_df.OriginAirportID , how = "inner")
flights_join.show(3)

+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|airport_id|          city|state|                name|
+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
|        19|        5|     DL|          11433|        13303|      -3|       1|     11433|       Detroit|   MI|Detroit Metro Way...|
|        19|        5|     DL|          14869|        12478|       0|      -8|     14869|Salt Lake City|   UT|Salt Lake City In...|
|        19|        5|     DL|          14057|        14869|      -4|     -15|     14057|      Portland|   OR|Portland Internat...|
+----------+---------+-------+---------------+-------------+--------+--------+----------+--------------+-----+--------------------+
only showing top 3 rows



In [40]:
flights_join = flights_df.join(airports_df, on = airports_df.airport_id == flights_df.OriginAirportID , how = "left")

In [41]:
flights_join = flights_df.join(airports_df, on = airports_df.airport_id == flights_df.OriginAirportID , how = "right")

In [42]:
flights_join = flights_df.join(airports_df, on = airports_df.airport_id == flights_df.OriginAirportID , how = "outer")

In [43]:
flights_join1 = flights_df.join(airports_df, on = airports_df.airport_id == flights_df.OriginAirportID , how = "leftanti")

In [47]:
flights_join1.show(5)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 5 rows



In [45]:
flights_join1 = flights_df.join(airports_df, on = airports_df.airport_id == flights_df.OriginAirportID , how = "leftsemi")

In [None]:
### partitioning by carrier into parquet

In [49]:
flights_join.write.partitionBy('Carrier').parquet("/airlines")

In [51]:
airlines_join = spark.read.parquet("/airlines")

In [55]:
airlines_join.show(10)

+----------+---------+---------------+-------------+--------+--------+----------+-----------------+-----+--------------------+-------+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|ArrDelay|airport_id|             city|state|                name|Carrier|
+----------+---------+---------------+-------------+--------+--------+----------+-----------------+-----+--------------------+-------+
|        17|        3|          11298|        11278|      59|      63|     11298|Dallas/Fort Worth|   TX|Dallas/Fort Worth...|     AA|
|        18|        4|          11298|        11278|       3|      -7|     11298|Dallas/Fort Worth|   TX|Dallas/Fort Worth...|     AA|
|        19|        5|          11298|        11278|      -3|       0|     11298|Dallas/Fort Worth|   TX|Dallas/Fort Worth...|     AA|
|        21|        7|          11298|        11278|      -4|     -13|     11298|Dallas/Fort Worth|   TX|Dallas/Fort Worth...|     AA|
|        22|        1|          11298|        11278|   

In [52]:
airlines_join.createOrReplaceTempView('airlines')

In [54]:
spark.sql("""select avg(ArrDelay) as arrival_delay from airlines where Carrier = 'AA'""").show()

+------------------+
|     arrival_delay|
+------------------+
|11.651327332054372|
+------------------+



In [59]:
print(airlines_join.columns)

['DayofMonth', 'DayOfWeek', 'OriginAirportID', 'DestAirportID', 'DepDelay', 'ArrDelay', 'airport_id', 'city', 'state', 'name', 'Carrier']


#### Load on mysql

In [61]:
connection_properties = {
    'user' : "root",
    'password' : "hadoop@123",
    'driver' : "com.mysql.cj.jdbc.Driver"
}
airlines_join.write.jdbc(url = "jdbc:mysql://localhost:3306/airlines", table = "airports",
                mode = "overwrite", properties = connection_properties)