# Imports et configurations

In [0]:
# Imports
# ---------------------------
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.types import *

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

# Localisation des fichiers
#------------------------------------
file_location = "/FileStore/tables/"

# Initialisation de la base de données MySQL

In [0]:
#Test Connection Base de donnee Always Data

driver = "org.mariadb.jdbc.Driver"

db_host = "xxx.alwaysdata.net"
db_port = "xx" # update if you use a non-default port
db_name = "db_airlines"
db_user = "xxx"
db_password = "xxx"
db_url = f"jdbc:mysql://{database_host}:{database_port}/{database_name}"

test_table = "airlines"
remote_table = (spark.read
  .format("jdbc")
  .option("driver", driver)
  .option("url", db_url)
  .option("dbtable", test_table)
  .option("user", db_user)
  .option("password", db_password)
  .load()
)

display(remote_table)

id,carrier,name


# Insertion des DataFrames dans la base de données MySql

In [0]:
# Airlines
airlines = spark.read.csv(file_location + "airlines.csv" , header=True, inferSchema=True)

(airlines.write\
    .format("jdbc")\
    .option("url", db_url)\
    .option("dbtable", 'airlines')\
    .option("user", db_user)\
    .option("password", db_password)\
    .mode("append")
    .save()
)

In [0]:
# Planes
planes = spark.read.csv(file_location + "planes.csv", header=True, inferSchema=True)
planes.printSchema()

planes = planes.withColumnRenamed('type', 'type_plane').withColumnRenamed('engine', 'engine_name').withColumnRenamed('engines', 'engine_nb').withColumn('year', planes.year.cast(IntegerType()))
planes.printSchema()

(planes.write\
    .format("jdbc")\
    .option("url", db_url)\
    .option("dbtable", 'planes')\
    .option("user", db_user)\
    .option("password", db_password)\
    .mode("append")
    .save()
)

root
 |-- tailnum: string (nullable = true)
 |-- year: string (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- type_plane: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engine_nb: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine_name: string (nullable = true)



In [0]:
# Airports
airports = spark.read.csv(file_location + "airports.csv" , header=True, inferSchema=True)

(airports.write\
    .format("jdbc")\
    .option("url", db_url)\
    .option("dbtable", 'airports')\
    .option("user", db_user)\
    .option("password", db_password)\
    .mode("append")
    .save()
)

In [0]:
# Weather
weather = spark.read.csv(file_location + "weather.csv", header=True, inferSchema=True)
weather.printSchema();

weather = weather.withColumn('wind_dir', weather.wind_dir.cast(IntegerType()))\
          .withColumn('temp', weather.temp.cast(FloatType()))\
          .withColumn('dewp', weather.dewp.cast(FloatType()))\
          .withColumn('humid', weather.humid.cast(FloatType()))\
          .withColumn('wind_speed', weather.wind_speed.cast(FloatType()))\
          .withColumn('wind_gust', weather.wind_gust.cast(FloatType()))\
          .withColumn('pressure', weather.pressure.cast(FloatType()))
weather.printSchema()

(weather.write\
    .format("jdbc")\
    .option("url", db_url)\
    .option("dbtable", 'weather')\
    .option("user", db_user)\
    .option("password", db_password)\
    .mode("append")
    .save()
)

root
 |-- origin: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- temp: string (nullable = true)
 |-- dewp: string (nullable = true)
 |-- humid: string (nullable = true)
 |-- wind_dir: string (nullable = true)
 |-- wind_speed: string (nullable = true)
 |-- wind_gust: string (nullable = true)
 |-- precip: double (nullable = true)
 |-- pressure: string (nullable = true)
 |-- visib: double (nullable = true)
 |-- time_hour: timestamp (nullable = true)

root
 |-- origin: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- temp: float (nullable = true)
 |-- dewp: float (nullable = true)
 |-- humid: float (nullable = true)
 |-- wind_dir: integer (nullable = true)
 |-- wind_speed: float (nullable = true)
 |-- wind_gust: float (nullable = true

In [0]:
# Flights
flights = spark.read.csv(file_location + "flights.csv" , header=True)
flights.printSchema();

flights = flights.withColumn('dep_time',  flights.dep_time.cast(IntegerType()))\
            .withColumn('arr_delay',  flights.arr_delay.cast(IntegerType()))\
            .withColumn('dep_delay',  flights.dep_delay.cast(IntegerType()))\
            .withColumn('flight',  flights.flight.cast(IntegerType()))\
            .withColumn('arr_time',  flights.arr_time.cast(IntegerType()))\
            .withColumn('air_time',  flights.air_time.cast(IntegerType()))

(flights.write\
    .format("jdbc")\
    .option("url", db_url)\
    .option("dbtable", 'flights')\
    .option("user", db_user)\
    .option("password", db_password)\
    .mode("append")
    .save()
)

root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- sched_dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- sched_arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- time_hour: string (nullable = true)



# Les missions

In [0]:
# Mission 1 : Se familiariser avec les données