<a href="https://colab.research.google.com/github/kaushik-yadav/ETL/blob/main/PySpark_ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import findspark
findspark.init()

In [51]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,isnan,when,count,round,avg,mean

In [None]:
spark = SparkSession.builder.appName('intro').getOrCreate()
sqlContext = SparkSession(spark)
spark.sparkContext.setLogLevel("ERROR")

IF the Data is to be created manually then we can define spark df as df = create_df(spark_session,data,schema_of_data)

Loading data from a csv

In [17]:
csv_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/airports.csv")

In [19]:
csv_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude_deg: double (nullable = true)
 |-- longitude_deg: double (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- scheduled_service: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- home_link: string (nullable = true)
 |-- wikipedia_link: string (nullable = true)
 |-- keywords: string (nullable = true)



In [20]:
csv_df.show()

+------+-----+-------------+--------------------+------------------+-------------------+------------+---------+-----------+----------+------------+-----------------+--------+---------+----------+--------------------+--------------------+--------+
|    id|ident|         type|                name|      latitude_deg|      longitude_deg|elevation_ft|continent|iso_country|iso_region|municipality|scheduled_service|gps_code|iata_code|local_code|           home_link|      wikipedia_link|keywords|
+------+-----+-------------+--------------------+------------------+-------------------+------------+---------+-----------+----------+------------+-----------------+--------+---------+----------+--------------------+--------------------+--------+
|  6523|  00A|     heliport|   Total RF Heliport|         40.070985|         -74.933689|          11|       NA|         US|     US-PA|    Bensalem|               no|    K00A|     NULL|       00A|https://www.pennd...|                NULL|    NULL|
|323361| 00A

counting null values for elevation_ft column

filter is used for filtering the dataframes , it is analogous to WHERE in sql

In [29]:
count_null = csv_df.filter(col("elevation_ft").isNull()).count()
count_null

14514

sorting based on elevation


In [35]:
sorted_df = csv_df.orderBy(col("elevation_ft").desc())
sorted_df.show(n=10)

+------+-------+-------------+--------------------+------------------+-----------------+------------+---------+-----------+----------+----------------+-----------------+--------+---------+----------+---------+--------------------+----------+
|    id|  ident|         type|                name|      latitude_deg|    longitude_deg|elevation_ft|continent|iso_country|iso_region|    municipality|scheduled_service|gps_code|iata_code|local_code|home_link|      wikipedia_link|  keywords|
+------+-------+-------------+--------------------+------------------+-----------------+------------+---------+-----------+----------+----------------+-----------------+--------+---------+----------+---------+--------------------+----------+
|350894|NP-0007|     heliport|Mount Everest Bas...|          28.00369|         86.85324|       17372|       AS|         NP|     NP-P1|        Khumjung|               no|    NULL|     NULL|      NULL|     NULL|                NULL|      NULL|
|350896|NP-0009|     heliport|Ka

In [None]:

ordered_by_id = csv_df.orderBy(col("id"))
ordered_by_id.show()

adding a new column for latitude in radians


In [44]:
conversion_unit = 0.0174533
plane_details = ordered_by_id.select("id","ident","type","name",(round(col("latitude_deg")*conversion_unit,5)).alias("latitude_rad"))
plane_details.show()

+---+-------+--------------+--------------------+------------+
| id|  ident|          type|                name|latitude_rad|
+---+-------+--------------+--------------------+------------+
|  2|   OM11| small_airport|Abu Dhabi Northea...|     0.42807|
|  3|   AGGH| large_airport|Honiara Internati...|    -0.16455|
|  4|   AGGM|medium_airport|       Munda Airport|    -0.14535|
|  5|AL-LA10|        closed|Gjirokastër Airfield|     0.69962|
|  7|   UD21| small_airport|       Arzni Airport|     0.70327|
|  8|   ANYN|medium_airport|Nauru Internation...|    -0.00955|
|  9|   FN18| small_airport|      Matala Airport|    -0.25704|
| 10|   FN19| small_airport|  Cabo Ledo Air Base|    -0.16848|
| 11|   NZ12| small_airport|Palmer Station Ai...|    -1.13053|
| 12|   SA47| small_airport|      Petrel Airport|    -1.10792|
| 13|   SA01| small_airport|       Cachi Airport|    -0.43817|
| 14|   SA02| small_airport|    Cafayate Airport|    -0.45476|
| 15|   SA03| small_airport|Villa Minetti Air...|    -0

Average continent and Country wise Elevation

In [69]:
elev_df = csv_df.select("iso_country","continent","elevation_ft")
continent_avg_elev_df = elev_df.groupBy("continent").agg(round(mean("elevation_ft"),4).alias("avg_continent_elevation"))
country_avg_elev_df = elev_df.groupBy("iso_country").agg(round(mean("elevation_ft"),4).alias("avg_country_elevation"))
avg_elev_df = elev_df.join(country_avg_elev_df, on = "iso_country",how = "left")\
                      .join(continent_avg_elev_df, on = "continent",how = "left")

avg_elev_df.show()


+---------+-----------+------------+---------------------+-----------------------+
|continent|iso_country|elevation_ft|avg_country_elevation|avg_continent_elevation|
+---------+-----------+------------+---------------------+-----------------------+
|       NA|         US|          11|            1275.2224|              1353.0655|
|       NA|         US|        3435|            1275.2224|              1353.0655|
|       NA|         US|         450|            1275.2224|              1353.0655|
|       NA|         US|         820|            1275.2224|              1353.0655|
|       NA|         US|          80|            1275.2224|              1353.0655|
|       NA|         US|         237|            1275.2224|              1353.0655|
|       NA|         US|        1100|            1275.2224|              1353.0655|
|       NA|         US|        3810|            1275.2224|              1353.0655|
|       NA|         US|        3038|            1275.2224|              1353.0655|
|   

taking only distinct countries by using dropduplicates