In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
file_path = "./work/data/airports.csv"
csv_df = spark.read.csv(file_path)
csv_df.printSchema()
csv_df.show(truncate=False, vertical=True)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)

-RECORD 0------------------------------------------
 _c0 | IATA_CODE                                   
 _c1 | AIRPORT                                     
 _c2 | CITY                                        
 _c3 | STATE                                       
 _c4 | COUNTRY                                     
 _c5 | LATITUDE                                    
 _c6 | LONGITUDE                                   
-RECORD 1------------------------------------------
 _c0 | ABE                                         
 _c1 | Lehigh Valley International Airport         
 _c2 | Allentown                                   
 _c3 | PA                                          
 _c4 | USA                                         
 _c5 | 40.65236      

Above we dont have the columns names, let's add them

In [3]:
csv_header_df = spark.read.option("header", True).csv(file_path)
csv_header_df.printSchema()
csv_header_df.show(truncate=False, vertical=True)

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)

-RECORD 0------------------------------------------------
 IATA_CODE | ABE                                         
 AIRPORT   | Lehigh Valley International Airport         
 CITY      | Allentown                                   
 STATE     | PA                                          
 COUNTRY   | USA                                         
 LATITUDE  | 40.65236                                    
 LONGITUDE | -75.44040                                   
-RECORD 1------------------------------------------------
 IATA_CODE | ABI                                         
 AIRPORT   | Abilene Regional Airport                    
 CITY      | Abilene                                     
 STATE     | TX          

In [4]:
csv_clean1_df = spark.read.option("header", True).option("inferschema", True).csv(file_path)
csv_clean1_df.printSchema()


root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)



In [6]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

schema = StructType([
 StructField("IATA_CODE", StringType(), False),
 StructField("AIRPORT", StringType(), False),
 StructField("CITY", StringType(), False),
 StructField("STATE", StringType(), False),
 StructField("COUNTRY", StringType(), False),
 StructField("LATITUDE", FloatType(), False),
 StructField("LONGITUDE", FloatType(), False)
])

csv_clean2_df = spark.read.option("header", True).schema(schema).csv(file_path)
csv_clean2_df.printSchema()


root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LONGITUDE: float (nullable = true)



## Ajout de colonnes

In [8]:
new_df = csv_clean2_df.withColumn("NEWCOL", F.lit("Hello"))
new_df = csv_clean2_df.withColumn("NEWCOL", F.lit("123.456").cast(FloatType()))
new_df = csv_clean2_df.withColumn("NEWCOL", F.lower(F.col("IATA_CODE")))
new_df = csv_clean2_df.withColumn("NEWCOL", F.to_date(F.lit('2024-01-30 12:56:33'), 'yyyy-MM-dd HH:mm:ss'))
new_df.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- NEWCOL: date (nullable = true)



## Renommage et Drop

In [10]:
new_df = csv_clean2_df.withColumnRenamed("IATA_CODE", "NEW_IATA_CODE")
new_df = csv_clean2_df.drop("LATITUDE", "LONGITUDE")
new_df.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)



## Operations Courantes

### Select

In [12]:
new_df = csv_clean2_df.select("IATA_CODE", "AIRPORT")
new_df.head()

Row(IATA_CODE='ABE', AIRPORT='Lehigh Valley International Airport')

### Filtre

In [19]:
new_df = csv_clean2_df.where("IATA_CODE = 'LAX' or IATA_CODE == 'DFW'")
new_df.show()

+---------+--------------------+-----------------+-----+-------+--------+----------+
|IATA_CODE|             AIRPORT|             CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+---------+--------------------+-----------------+-----+-------+--------+----------+
|      DFW|Dallas/Fort Worth...|Dallas-Fort Worth|   TX|    USA|32.89595|  -97.0372|
|      LAX|Los Angeles Inter...|      Los Angeles|   CA|    USA|33.94254|-118.40807|
+---------+--------------------+-----------------+-----+-------+--------+----------+



### Tri

In [21]:
new_df = csv_clean2_df.orderBy("IATA_CODE")
new_df = csv_clean2_df.orderBy("IATA_CODE", ascending=False)
new_df = csv_clean2_df.orderBy("STATE", "IATA_CODE", ascending=[False, False])
new_df.show(5)

+---------+--------------------+------------+-----+-------+--------+----------+
|IATA_CODE|             AIRPORT|        CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|
+---------+--------------------+------------+-----+-------+--------+----------+
|      RKS|Rock Springs-Swee...|Rock Springs|   WY|    USA|41.59422|-109.06519|
|      LAR|Laramie Regional ...|     Laramie|   WY|    USA|41.31205|-105.67499|
|      JAC|Jackson Hole Airport|     Jackson|   WY|    USA|43.60732|-110.73774|
|      GCC|Gillette-Campbell...|    Gillette|   WY|    USA| 44.3489|-105.53936|
|      CPR|Natrona County In...|      Casper|   WY|    USA|42.90836|-106.46447|
+---------+--------------------+------------+-----+-------+--------+----------+
only showing top 5 rows



### Join

In [24]:
states = [
    ("TX", "Texas"),
    ("AK", "Alaska")
]

columns = ["STATE_CODE", "STATE_NAME"]
states_df = spark.createDataFrame(states, columns)

join_df = csv_clean2_df.join(states_df, [csv_clean2_df.STATE == states_df.STATE_CODE])
# join_df = csv_clean2_df.join(states_df, [csv_clean2_df.STATE == states_df.STATE_CODE], how="leftouter")
join_df.show(5)

+---------+--------------------+-------------+-----+-------+--------+----------+----------+----------+
|IATA_CODE|             AIRPORT|         CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|STATE_CODE|STATE_NAME|
+---------+--------------------+-------------+-----+-------+--------+----------+----------+----------+
|      TYR|Tyler Pounds Regi...|        Tyler|   TX|    USA|32.35414| -95.40239|        TX|     Texas|
|      SPS|Wichita Falls Mun...|Wichita Falls|   TX|    USA| 33.9888| -98.49189|        TX|     Texas|
|      SJT|San Angelo Region...|   San Angelo|   TX|    USA|31.35775|-100.49631|        TX|     Texas|
|      SAT|San Antonio Inter...|  San Antonio|   TX|    USA|29.53369| -98.46978|        TX|     Texas|
|      MFE|McAllen-Miller In...|      McAllen|   TX|    USA|26.17583| -98.23861|        TX|     Texas|
+---------+--------------------+-------------+-----+-------+--------+----------+----------+----------+
only showing top 5 rows



### Aggregations

In [28]:
airports_by_state_df = csv_clean2_df.groupBy(F.col("STATE")).agg(F.count("STATE").alias("NB_AIRPORTS"))
airports_stats_df = csv_clean2_df.groupBy(F.col("STATE")).agg(F.count("STATE").alias("NB_AIRPORTS"), F.min("LATITUDE").alias("LAT_MIN"))
airports_stats_df.show(10)
airports_by_state_df.show(5)

+-----+-----------+--------+
|STATE|NB_AIRPORTS| LAT_MIN|
+-----+-----------+--------+
|   SC|          4|32.89865|
|   AZ|          4|32.11608|
|   LA|          7|29.99339|
|   MN|          8|43.90883|
|   NJ|          3|39.45758|
|   OR|          5|42.37423|
|   VA|          7|36.89461|
|   RI|          1|  41.724|
|   WY|          6|41.31205|
|   KY|          4|37.06083|
+-----+-----------+--------+
only showing top 10 rows

+-----+-----------+
|STATE|NB_AIRPORTS|
+-----+-----------+
|   SC|          4|
|   AZ|          4|
|   LA|          7|
|   MN|          8|
|   NJ|          3|
+-----+-----------+
only showing top 5 rows



### Windowing

In [30]:
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("STATE").orderBy("LATITUDE", "LONGITUDE")
window_df = csv_clean2_df.withColumn("rank", F.rank().over(windowSpec))
window_df.show()

+---------+--------------------+-----------+-----+-------+--------+----------+----+
|IATA_CODE|             AIRPORT|       CITY|STATE|COUNTRY|LATITUDE| LONGITUDE|rank|
+---------+--------------------+-----------+-----+-------+--------+----------+----+
|      ADK|        Adak Airport|       Adak|   AK|    USA|51.87796|-176.64603|   1|
|      KTN|Ketchikan Interna...|  Ketchikan|   AK|    USA|55.35557|-131.71375|   2|
|      WRG|    Wrangell Airport|   Wrangell|   AK|    USA|56.48433|-132.36983|   3|
|      PSG|Petersburg James ...| Petersburg|   AK|    USA|56.80165|-132.94528|   4|
|      SIT|Sitka Rocky Gutie...|      Sitka|   AK|    USA|57.04714| -135.3616|   5|
|      ADQ|      Kodiak Airport|     Kodiak|   AK|    USA|57.74997|-152.49387|   6|
|      JNU|Juneau Internatio...|     Juneau|   AK|    USA|58.35496|-134.57628|   7|
|      GST|    Gustavus Airport|   Gustavus|   AK|    USA|58.42438|-135.70738|   8|
|      AKN| King Salmon Airport|King Salmon|   AK|    USA| 58.6768|-156.6492

### Actions

In [31]:
window_df.take(3)
window_df.collect()


[Row(IATA_CODE='ADK', AIRPORT='Adak Airport', CITY='Adak', STATE='AK', COUNTRY='USA', LATITUDE=51.877960205078125, LONGITUDE=-176.64602661132812, rank=1),
 Row(IATA_CODE='KTN', AIRPORT='Ketchikan International Airport', CITY='Ketchikan', STATE='AK', COUNTRY='USA', LATITUDE=55.35557174682617, LONGITUDE=-131.7137451171875, rank=2),
 Row(IATA_CODE='WRG', AIRPORT='Wrangell Airport', CITY='Wrangell', STATE='AK', COUNTRY='USA', LATITUDE=56.48432922363281, LONGITUDE=-132.3698272705078, rank=3),
 Row(IATA_CODE='PSG', AIRPORT='Petersburg James A. Johnson Airport', CITY='Petersburg', STATE='AK', COUNTRY='USA', LATITUDE=56.80165100097656, LONGITUDE=-132.94528198242188, rank=4),
 Row(IATA_CODE='SIT', AIRPORT='Sitka Rocky Gutierrez Airport', CITY='Sitka', STATE='AK', COUNTRY='USA', LATITUDE=57.04713821411133, LONGITUDE=-135.36160278320312, rank=5),
 Row(IATA_CODE='ADQ', AIRPORT='Kodiak Airport', CITY='Kodiak', STATE='AK', COUNTRY='USA', LATITUDE=57.749969482421875, LONGITUDE=-152.49386596679688, ra

### Ecriture de fichiers

In [32]:
window_df.write.option("header", True).csv("./work/data/window_df.csv")
window_df.write.parquet("./work/data/window_df.parquet")
window_df.rdd.getNumPartitions()

window_df_repartitionned = window_df.repartition(4)
window_df_repartitionned.write.option("header", True).csv("./work/window/window_df_repartionned.csv")
