In [1]:
import json
from datetime import datetime

In [2]:
with open ("secrets.json", 'rb') as file:
    SQL_CREDENTIALS = json.load(file)

USERNAME = SQL_CREDENTIALS['username']
PASSWORD = SQL_CREDENTIALS['password']
HOSTNAME = SQL_CREDENTIALS['ip']
SECONDS_IN_DAY = 24 * 60 * 60

In [10]:
# Decorator for timing command
def timer(func):
    def inner(*args, **kwargs):
        start = datetime.now()
        returned_value = func(*args, **kwargs)
        end = datetime.now()
        
        # Calculate runtime
        time_diff = end - start
        runtime=divmod(time_diff.days * SECONDS_IN_DAY + time_diff.seconds, 60)
        mins=runtime[0]
        secs=runtime[1]
        
        # Print runtime
        print(f"Execution time: {mins} min {secs} sec")
        
        return returned_value

    return inner

In [17]:
@timer
def show_dataframe(dataframe, no_of_rows=3):
    print(dataframe.head(no_of_rows))

In [12]:
QUERY = """
(SELECT 
    A.booking_id
    , B.flightno
    , D.airport_id AS origin_airport_id
    , D.icao AS origin_icao
    , D.name AS origin_name
    , F.city AS origin_city
    , F.country AS origin_country
    , F.latitude AS origin_lat
    , F.longitude AS origin_long
    , E.icao AS destination_icao
    , E.name AS destination_name
    , G.city AS destination_city
    , G.country AS destination_country
    , G.latitude AS destination_lat
    , G.longitude AS destination_long
    , B.departure
    , B.arrival
    , H.airlinename
    , J.identifier AS airplane_type
    , I.capacity AS airplane_capacity
    , C.firstname
    , C.lastname
    , A.seat
    , C.passportno
    , K.birthdate as passenger_birthdate
    , K.sex as passenger_sex
    , K.country as passenger_country
    , K.city as passenger_city
    , A.price
FROM booking A
LEFT JOIN flight B on A.flight_id = B.flight_id
LEFT JOIN passenger C on A.passenger_id = C.passenger_id
LEFT JOIN passengerdetails K on A.passenger_id = K.passenger_id
LEFT JOIN airport D on B.`from` = D.airport_id
LEFT JOIN airport E on B.`to` = E.airport_id
LEFT JOIN airport_geo F on B.`from` = F.airport_id
LEFT JOIN airport_geo G on B.`to` = G.airport_id
LEFT JOIN airline H on B.airline_id = H.airline_id
LEFT JOIN airplane I on B.airplane_id = I.airplane_id
LEFT JOIN airplane_type J on I.type_id = J.type_id
) X
"""

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Parquetfy") \
    .config("spark.jars", "/Users/shreyan/Downloads/mysql-connector-j-9.0.0/mysql-connector-j-9.0.0.jar") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

url = f"jdbc:mysql://{HOSTNAME}:3306/airportdb"
properties = {
    "user": USERNAME,
    "password": PASSWORD,
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [14]:
partition_column = "origin_airport_id"  # Adjust this based on your data
lowerbound=1
upperbound=10000
num_partitions = 16  # Adjust based on your hardware resources

In [15]:
spark_df = spark.read.jdbc(
    url=url,
    table=QUERY,
    properties=properties,
    column=partition_column,
    lowerBound=lowerbound,
    upperBound=upperbound,
    numPartitions=num_partitions
)

In [18]:
show_dataframe(dataframe=spark_df,no_of_rows=3)

[Stage 1:>                                                          (0 + 1) / 1]

[Row(booking_id=1191, flightno='SP5604  ', origin_airport_id=509, origin_icao='ESUB', origin_name='ARBRA\r', origin_city='ARBRA', origin_country='SWEDEN', origin_lat=Decimal('61.51250000'), origin_long=Decimal('16.37250000'), destination_icao='OAHN', destination_name='KHWAHAN\r', destination_city='KHWAHAN', destination_country='AFGHANISTAN', destination_lat=Decimal('37.88333300'), destination_long=Decimal('70.20000000'), departure=datetime.datetime(2015, 6, 10, 19, 39), arrival=datetime.datetime(2015, 6, 11, 1, 18), airlinename='Spain Airlines', airplane_type='Airbus-A320-Familie', airplane_capacity=150, firstname='Wade', lastname='Boggs\r', seat='25C ', passportno='P110193  ', passenger_birthdate=datetime.date(1958, 6, 22), passenger_sex='m', passenger_country='Mozambique', passenger_city='Zeltschach', price=Decimal('3.15')), Row(booking_id=1192, flightno='SP5604  ', origin_airport_id=509, origin_icao='ESUB', origin_name='ARBRA\r', origin_city='ARBRA', origin_country='SWEDEN', origin_

                                                                                