In [1]:
# library installation
!pip install -q kaggle
!pip install pyspark
!pip install psycopg2-binary



In [5]:
# imports
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from zipfile import ZipFile
import os
import psycopg2
import pandas as pd
import json

In [6]:
# folder creation
!mkdir -p /home/coder/.kaggle

In [7]:
# file copy
!cp kaggle.json /home/coder/.kaggle/

In [8]:
# granting permissions to read file "kaggle.json"
!chmod 600 /home/coder/.kaggle/kaggle.json

In [9]:
# download files from Kaggle
!kaggle datasets download -d francuzovd/airplane-tickets-from-moscow-2022 --force

Downloading airplane-tickets-from-moscow-2022.zip to /home/coder/working_dir
 93%|███████████████████████████████████▎  | 2.00M/2.15M [00:01<00:00, 1.39MB/s]
100%|██████████████████████████████████████| 2.15M/2.15M [00:01<00:00, 1.36MB/s]


In [10]:
# file decompression
file_name = "airplane-tickets-from-moscow-2022.zip"
with ZipFile(file_name, 'r') as zip:
    zip.extractall()
    print("Done")

Done


In [11]:
env = os.environ

In [12]:
# Postgres and Redshift JDBCs
driver_path = "./spark_drivers/postgresql-42.5.2.jar"

env['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
env['SPARK_CLASSPATH'] = driver_path


# Create SparkSession
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [13]:
# Connect to Redshift using psycopg2
conn = psycopg2.connect(
    host=env['REDSHIFT_HOST'],
    port=env['REDSHIFT_PORT'],
    dbname=env['REDSHIFT_DB'],
    user=env['REDSHIFT_USER'],
    password=env['REDSHIFT_PASSWORD']
)

In [15]:
# Create table
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['REDSHIFT_SCHEMA']}.airplane_tickets (
    found_at timestamp distkey,
    class varchar(5),
    value decimal(10,2),
    number_of_changes integer,
    depart_date date,
    search_date date,
    airline_code varchar(12),
    airline_name_translations varchar(100),
    origin varchar(12),
    destination varchar(12),
    country_code varchar(12),
    time_zone varchar(50),
    flightable boolean,
    iata_type varchar(12),
    airport_name varchar(50),
    airport_name_translations varchar(100),
    airport_latitude decimal(10,2),
    airport_longitude decimal(10,2)
) sortkey(found_at);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


In [16]:
# verify that the table is created
cursor = conn.cursor()
cursor.execute(f"""
SELECT
  distinct tablename
FROM
  PG_TABLE_DEF
WHERE
  schemaname = '{env['REDSHIFT_SCHEMA']}';
""")
print(", ".join(map(lambda x: x[0], cursor.fetchall())))
cursor.close()

airplane_tickets


In [17]:
# Create the Fact DataFrame
with open(r"ticket_dataset_RusAirports.json") as file:
    dic1 = json.load(file)

pandasdf1 =pd.json_normalize(dic1)
fact_df1 = spark.createDataFrame(pandasdf1)
fact_df1.show()

+-------+----------+------+-----------------+-------------------+-----------+-----------+-------+-----------+
|  value|trip_class|origin|number_of_changes|           found_at|destination|depart_date|airline|search_date|
+-------+----------+------+-----------------+-------------------+-----------+-----------+-------+-----------+
|43270.0|         0|   MOW|              2.0|2022-03-30T14:54:04|        SYZ| 2022-04-21|     TK| 2022-04-02|
|28166.0|         0|   MOW|              3.0|2022-04-02T08:08:06|        SPU| 2022-04-25|     W6| 2022-04-02|
|24159.0|         0|   MOW|              2.0|2022-04-01T09:23:26|        SZF| 2022-04-14|     TK| 2022-04-02|
|11160.0|         0|   MOW|              1.0|2022-04-02T12:09:50|        ABA| 2022-04-12|     S7| 2022-04-02|
|19920.0|         0|   MOW|              1.0|2022-04-02T12:44:34|        CAI| 2022-04-14|     SU| 2022-04-02|
|24260.0|         0|   MOW|              2.0|2022-04-01T23:54:23|        GZT| 2022-04-11|     TK| 2022-04-02|
|30395.0| 

In [18]:
fact_df1.printSchema()

root
 |-- value: double (nullable = true)
 |-- trip_class: long (nullable = true)
 |-- origin: string (nullable = true)
 |-- number_of_changes: double (nullable = true)
 |-- found_at: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- depart_date: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- search_date: string (nullable = true)



In [19]:
# data type change
fact_df1 = fact_df1.withColumn("search_date", f.col("search_date").cast(DateType()))
fact_df1 = fact_df1.withColumn("depart_date", f.col("depart_date").cast(DateType()))
fact_df1 = fact_df1.withColumn("found_at", f.col("found_at").cast(TimestampType()))
fact_df1 = fact_df1.withColumn("number_of_changes", f.col("number_of_changes").cast(IntegerType()))

In [20]:
fact_df1.printSchema()

root
 |-- value: double (nullable = true)
 |-- trip_class: long (nullable = true)
 |-- origin: string (nullable = true)
 |-- number_of_changes: integer (nullable = true)
 |-- found_at: timestamp (nullable = true)
 |-- destination: string (nullable = true)
 |-- depart_date: date (nullable = true)
 |-- airline: string (nullable = true)
 |-- search_date: date (nullable = true)



In [21]:
# number of rows and columns
print((fact_df1.count(), len(fact_df1.columns)))

(106348, 9)


In [22]:
# frequency of elements in the different columns
lista = ['trip_class', 'origin', 'number_of_changes', 'destination', 'airline']

for elem in lista:
  fact_df1.groupby(elem).count().show()

+----------+------+
|trip_class| count|
+----------+------+
|         0|106348|
+----------+------+

+------+-----+
|origin|count|
+------+-----+
|   MOW|39355|
|   KRR|12822|
|   AER|14845|
|   LED|23663|
|   SVX|15663|
+------+-----+

+-----------------+-----+
|number_of_changes|count|
+-----------------+-----+
|                1|40937|
|                6|   24|
|                3|11796|
|                5|  262|
|                4| 1824|
|                2|33357|
|                0|18148|
+-----------------+-----+

+-----------+-----+
|destination|count|
+-----------+-----+
|        SCW|  293|
|        KGL|   69|
|        NBC|  302|
|        PMI|  137|
|        KKQ|    7|
|        NWI|    1|
|        FMY|   18|
|        CLQ|    4|
|        LLA|   25|
|        SXB|  127|
|        HEL|  225|
|        IKS|   70|
|        PUF|   14|
|        ZCO|    6|
|        AAT|    1|
|        NYM|  264|
|        БАТ|   24|
|        RJK|   36|
|        SZZ|   24|
|        MSY|   23|
+-----------+---

In [23]:
# removal of column "trip_class" because it contains all its records with the value "0"
fact_df1 = fact_df1.drop('trip_class')

In [24]:
# presence (or not) of null values in the different columns
lista1 = fact_df1.columns

for elem in lista1:
  fact_df1.groupby(f.col(elem).isNotNull()).count().show()

+-------------------+------+
|(value IS NOT NULL)| count|
+-------------------+------+
|               true|106348|
+-------------------+------+

+--------------------+------+
|(origin IS NOT NULL)| count|
+--------------------+------+
|                true|106348|
+--------------------+------+

+-------------------------------+------+
|(number_of_changes IS NOT NULL)| count|
+-------------------------------+------+
|                           true|106348|
+-------------------------------+------+

+----------------------+------+
|(found_at IS NOT NULL)| count|
+----------------------+------+
|                  true|106348|
+----------------------+------+

+-------------------------+------+
|(destination IS NOT NULL)| count|
+-------------------------+------+
|                     true|106348|
+-------------------------+------+

+-------------------------+------+
|(depart_date IS NOT NULL)| count|
+-------------------------+------+
|                     true|106348|
+-------------------

In [25]:
# elimination of duplicate records
print(fact_df1.count())
print(fact_df1.dropDuplicates().count())

106348
106348


In [26]:
# Create the Dim DataFrame
with open(r"IATA_Airports.json") as file:
    dic2 = json.load(file)

pandasdf2 =pd.json_normalize(dic2)
dim_df2 = spark.createDataFrame(pandasdf2)
dim_df2.show()

+---------+------------+--------------------+----------+--------------------+----+---------+--------------------+---------------+---------------+
|city_code|country_code|           time_zone|flightable|                name|code|iata_type|name_translations.en|coordinates.lat|coordinates.lon|
+---------+------------+--------------------+----------+--------------------+----+---------+--------------------+---------------+---------------+
|      PIT|          US|    America/New_York|      true|Питтсбург Интернэ...| PIT|  airport|Pittsburgh Intern...|       40.49585|      -80.25657|
|      GAL|          US|   America/Anchorage|      true|              Галена| GAL|  airport|Edward G. Pitka S...|       64.73798|     -156.94186|
|      YAG|          CA|    America/Winnipeg|      true|        Форт-Франсес| YAG|  airport|Fort Frances Muni...|       48.65278|      -93.44722|
|      ILD|          ES|       Europe/Madrid|      true|  Льейда (жд вокзал)| QLQ|  railway|Lleida Railway St...|       41.6

In [27]:
dim_df2 = dim_df2.drop("city_code")

In [28]:
dim_df2.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- time_zone: string (nullable = true)
 |-- flightable: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- code: string (nullable = true)
 |-- iata_type: string (nullable = true)
 |-- name_translations.en: string (nullable = true)
 |-- coordinates.lat: double (nullable = true)
 |-- coordinates.lon: double (nullable = true)



In [29]:
# column rename
dim_df2 = dim_df2.withColumnRenamed("name", "airport_name")
dim_df2 = dim_df2.withColumnRenamed("name_translations.en", "airport_name_translations")
dim_df2 = dim_df2.withColumnRenamed("coordinates.lat", "airport_latitude")
dim_df2 = dim_df2.withColumnRenamed("coordinates.lon", "airport_longitude")

In [30]:
# number of rows and columns
print((dim_df2.count(), len(dim_df2.columns)))

(3547, 9)


In [31]:
# presence (or not) of null values in the different columns
lista2 = dim_df2.columns

for elem in lista2:
  dim_df2.groupby(f.col(elem).isNotNull()).count().show()

+--------------------------+-----+
|(country_code IS NOT NULL)|count|
+--------------------------+-----+
|                      true| 3547|
+--------------------------+-----+

+-----------------------+-----+
|(time_zone IS NOT NULL)|count|
+-----------------------+-----+
|                   true| 3547|
+-----------------------+-----+

+------------------------+-----+
|(flightable IS NOT NULL)|count|
+------------------------+-----+
|                    true| 3547|
+------------------------+-----+

+--------------------------+-----+
|(airport_name IS NOT NULL)|count|
+--------------------------+-----+
|                      true| 3390|
|                     false|  157|
+--------------------------+-----+

+------------------+-----+
|(code IS NOT NULL)|count|
+------------------+-----+
|              true| 3547|
+------------------+-----+

+-----------------------+-----+
|(iata_type IS NOT NULL)|count|
+-----------------------+-----+
|                   true| 3547|
+---------------------

In [32]:
# Create the Dim DataFrame
with open(r"IATA_Airlines.json") as file:
    dic3 = json.load(file)

pandasdf3 =pd.json_normalize(dic3)
dim_df3 = spark.createDataFrame(pandasdf3)
dim_df3.show(truncate = False)

+-------------------+----+------------------------------------+
|name               |code|name_translations.en                |
+-------------------+----+------------------------------------+
|null               |DN  |Norwegian Air                       |
|null               |AI  |Air India                           |
|null               |V1  |IBS Software Services Americas, Inc.|
|null               |DE  |Condor                              |
|Аврора             |HZ  |Aurora                              |
|null               |WX  |Cityjet                             |
|Air Mekong         |P8  |Air Mekong                          |
|Белавиа            |B2  |Belavia                             |
|null               |2D  |Eastern Airlines, LLC               |
|null               |2I  |Star Peru                           |
|null               |AF  |Air France                          |
|Ellinair           |EL  |Ellinair                            |
|North Flying       |M3  |North Flying  

In [33]:
# presence of null values in the column "name"
dim_df3.groupby(f.col("name").isNotNull()).count().show()

+------------------+-----+
|(name IS NOT NULL)|count|
+------------------+-----+
|              true|  409|
|             false|  658|
+------------------+-----+



In [34]:
# removing the "name" column
dim_df3 = dim_df3.drop("name")

In [35]:
dim_df3.printSchema()
dim_df3.show(truncate = False)

root
 |-- code: string (nullable = true)
 |-- name_translations.en: string (nullable = true)

+----+------------------------------------+
|code|name_translations.en                |
+----+------------------------------------+
|DN  |Norwegian Air                       |
|AI  |Air India                           |
|V1  |IBS Software Services Americas, Inc.|
|DE  |Condor                              |
|HZ  |Aurora                              |
|WX  |Cityjet                             |
|P8  |Air Mekong                          |
|B2  |Belavia                             |
|2D  |Eastern Airlines, LLC               |
|2I  |Star Peru                           |
|AF  |Air France                          |
|EL  |Ellinair                            |
|M3  |North Flying                        |
|SV  |Saudia                              |
|FM  |Shanghai Airlines                   |
|B0  |La Compagnie                        |
|F2  |Safarilink Aviation                 |
|JQ  |Jetstar             

In [36]:
# number of rows and columns
print((dim_df3.count(), len(dim_df3.columns)))

(1067, 2)


In [37]:
# union of dataframes "fact_df1" and "dim_df3"
merged_df =fact_df1.join(dim_df3, fact_df1.airline == dim_df3.code, "inner")
merged_df.show()

+-------+------+-----------------+-------------------+-----------+-----------+-------+-----------+----+--------------------+
|  value|origin|number_of_changes|           found_at|destination|depart_date|airline|search_date|code|name_translations.en|
+-------+------+-----------------+-------------------+-----------+-----------+-------+-----------+----+--------------------+
|18768.0|   MOW|                1|2022-02-03 12:15:36|        MED| 2022-10-15|     PC| 2022-02-10|  PC|    Pegasus Airlines|
|11270.0|   MOW|                1|2022-02-05 08:19:15|        SJJ| 2022-10-01|     PC| 2022-02-10|  PC|    Pegasus Airlines|
|11721.0|   MOW|                1|2022-02-07 09:00:09|        AJI| 2022-10-10|     PC| 2022-02-10|  PC|    Pegasus Airlines|
|10339.0|   MOW|                1|2022-02-03 15:01:46|        ECN| 2022-10-22|     PC| 2022-02-10|  PC|    Pegasus Airlines|
|16661.0|   MOW|                2|2022-02-07 14:22:59|        NAV| 2022-10-01|     PC| 2022-02-10|  PC|    Pegasus Airlines|


In [38]:
merged_df = merged_df.drop("airline")
merged_df = merged_df.withColumnRenamed("code", "airline_code")
merged_df = merged_df.withColumnRenamed("name_translations.en", "airline_name_translations")

In [39]:
# union of dataframes "merged_df" and "dim_df2" and creation of the final dataframe "sparkdf"
sparkdf = merged_df.join(dim_df2, merged_df.destination == dim_df2.code, "inner")
sparkdf.show()

+-------+------+-----------------+-------------------+-----------+-----------+-----------+------------+-------------------------+------------+-----------------+----------+--------------------+----+---------+-------------------------+----------------+-----------------+
|  value|origin|number_of_changes|           found_at|destination|depart_date|search_date|airline_code|airline_name_translations|country_code|        time_zone|flightable|        airport_name|code|iata_type|airport_name_translations|airport_latitude|airport_longitude|
+-------+------+-----------------+-------------------+-----------+-----------+-----------+------------+-------------------------+------------+-----------------+----------+--------------------+----+---------+-------------------------+----------------+-----------------+
|18768.0|   MOW|                1|2022-02-03 12:15:36|        MED| 2022-10-15| 2022-02-10|          PC|         Pegasus Airlines|          SA|      Asia/Riyadh|      true|              Медина| 

In [40]:
sparkdf = sparkdf.drop("code")

In [41]:
sparkdf.printSchema()

root
 |-- value: double (nullable = true)
 |-- origin: string (nullable = true)
 |-- number_of_changes: integer (nullable = true)
 |-- found_at: timestamp (nullable = true)
 |-- destination: string (nullable = true)
 |-- depart_date: date (nullable = true)
 |-- search_date: date (nullable = true)
 |-- airline_code: string (nullable = true)
 |-- airline_name_translations: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- time_zone: string (nullable = true)
 |-- flightable: boolean (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- iata_type: string (nullable = true)
 |-- airport_name_translations: string (nullable = true)
 |-- airport_latitude: double (nullable = true)
 |-- airport_longitude: double (nullable = true)



In [42]:
# number of rows and columns
print((sparkdf.count(), len(sparkdf.columns)))

(97859, 17)


In [43]:
# presence (or not) of null values in the different columns
lista3 = sparkdf.columns

for elem in lista3:
  sparkdf.groupby(f.col(elem).isNotNull()).count().show()

+-------------------+-----+
|(value IS NOT NULL)|count|
+-------------------+-----+
|               true|97859|
+-------------------+-----+

+--------------------+-----+
|(origin IS NOT NULL)|count|
+--------------------+-----+
|                true|97859|
+--------------------+-----+

+-------------------------------+-----+
|(number_of_changes IS NOT NULL)|count|
+-------------------------------+-----+
|                           true|97859|
+-------------------------------+-----+

+----------------------+-----+
|(found_at IS NOT NULL)|count|
+----------------------+-----+
|                  true|97859|
+----------------------+-----+

+-------------------------+-----+
|(destination IS NOT NULL)|count|
+-------------------------+-----+
|                     true|97859|
+-------------------------+-----+

+-------------------------+-----+
|(depart_date IS NOT NULL)|count|
+-------------------------+-----+
|                     true|97859|
+-------------------------+-----+

+-------------

In [44]:
# removal of null records corresponding to column "name" and subsequent counting of rows
sparkdf = sparkdf.na.drop(subset= "airport_name")
print((sparkdf.count(), len(sparkdf.columns)))

(97744, 17)


In [45]:
sparkdf.describe().show()

+-------+------------------+------+------------------+-----------+------------------+-------------------------+------------+--------------+------------+---------+-------------------------+------------------+-----------------+
|summary|             value|origin| number_of_changes|destination|      airline_code|airline_name_translations|country_code|     time_zone|airport_name|iata_type|airport_name_translations|  airport_latitude|airport_longitude|
+-------+------------------+------+------------------+-----------+------------------+-------------------------+------------+--------------+------------+---------+-------------------------+------------------+-----------------+
|  count|             97744| 97744|             97744|      97744|             97744|                    97744|       97744|         97744|       97744|    97744|                    97744|             97744|            97744|
|   mean|29167.953981830087|  null|1.4081068914715993|        NaN| 4.792763157894737|           

In [46]:
# creation of column "class" (function of column "value")
sparkdf = sparkdf.withColumn("class", \
   f.when((sparkdf.value > 100000), f.lit("A")) \
     .when((sparkdf.value >= 15000) & (sparkdf.value <= 100000), f.lit("B")) \
     .otherwise(f.lit("C")) \
  )

In [47]:
# function defined by user "replace_character" and implemented in column "time_zone"
replace_character = f.udf(lambda x: x.replace("/","-"), StringType())
sparkdf = sparkdf.withColumn("time_zone", replace_character("time_zone"))

In [48]:
# filter based on column "number_of_changes"
sparkdf = sparkdf.filter(sparkdf.number_of_changes < 5)
sparkdf.show()

+-------+------+-----------------+-------------------+-----------+-----------+-----------+------------+-------------------------+------------+-----------------+----------+--------------------+---------+-------------------------+----------------+-----------------+-----+
|  value|origin|number_of_changes|           found_at|destination|depart_date|search_date|airline_code|airline_name_translations|country_code|        time_zone|flightable|        airport_name|iata_type|airport_name_translations|airport_latitude|airport_longitude|class|
+-------+------+-----------------+-------------------+-----------+-----------+-----------+------------+-------------------------+------------+-----------------+----------+--------------------+---------+-------------------------+----------------+-----------------+-----+
|18768.0|   MOW|                1|2022-02-03 12:15:36|        MED| 2022-10-15| 2022-02-10|          PC|         Pegasus Airlines|          SA|      Asia-Riyadh|      true|              Медин

In [49]:
# number of subsequent rows and columns (after applying the filter)
print((sparkdf.count(), len(sparkdf.columns)))

(97472, 18)


In [50]:
# column ordering
sparkdf = sparkdf.select("found_at", "class", "value", "number_of_changes",\
                         "depart_date", "search_date", "airline_code", "airline_name_translations",\
                         "origin", "destination", "country_code", \
                         "time_zone", "flightable", "iata_type", "airport_name", \
                         "airport_name_translations", "airport_latitude","airport_longitude")
sparkdf.show(n = 10, truncate = False)

+-------------------+-----+-------+-----------------+-----------+-----------+------------+-------------------------+------+-----------+------------+---------------+----------+---------+------------+---------------------------------------------------+----------------+-----------------+
|found_at           |class|value  |number_of_changes|depart_date|search_date|airline_code|airline_name_translations|origin|destination|country_code|time_zone      |flightable|iata_type|airport_name|airport_name_translations                          |airport_latitude|airport_longitude|
+-------------------+-----+-------+-----------------+-----------+-----------+------------+-------------------------+------+-----------+------------+---------------+----------+---------+------------+---------------------------------------------------+----------------+-----------------+
|2022-02-03 12:15:36|B    |18768.0|1                |2022-10-15 |2022-02-10 |PC          |Pegasus Airlines         |MOW   |MED        |SA     

In [51]:
redshift_url = "jdbc:postgresql://{host}:{port}/{database}".format(
    host=env['REDSHIFT_HOST'],
    port=env['REDSHIFT_PORT'],
    database=env['REDSHIFT_DB']
)

redshift_properties = {
    "user": env['REDSHIFT_USER'],
    "password": env['REDSHIFT_PASSWORD'],
    "driver": "org.postgresql.Driver",
    "dbtable": f"{env['REDSHIFT_SCHEMA']}.airplane_tickets"
}

In [52]:
# write to DW Redshift
sparkdf.write.jdbc(url=redshift_url, table='airplane_tickets' , mode="overwrite", properties=redshift_properties)

In [60]:
# Query Redshift using Spark SQL
query = f"select * from {env['REDSHIFT_SCHEMA']}.airplane_tickets"
data = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['REDSHIFT_HOST']}:{env['REDSHIFT_PORT']}/{env['REDSHIFT_DB']}") \
    .option("dbtable", f"({query}) as tmp_table") \
    .option("user", env['REDSHIFT_USER']) \
    .option("password", env['REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [63]:
data.printSchema()
data.show(n = 10)

root
 |-- found_at: timestamp (nullable = true)
 |-- class: string (nullable = true)
 |-- value: double (nullable = true)
 |-- number_of_changes: integer (nullable = true)
 |-- depart_date: date (nullable = true)
 |-- search_date: date (nullable = true)
 |-- airline_code: string (nullable = true)
 |-- airline_name_translations: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- time_zone: string (nullable = true)
 |-- flightable: boolean (nullable = true)
 |-- iata_type: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- airport_name_translations: string (nullable = true)
 |-- airport_latitude: double (nullable = true)
 |-- airport_longitude: double (nullable = true)

+-------------------+-----+-------+-----------------+-----------+-----------+------------+-------------------------+------+-----------+------------+-----------------+----------+---------+---------

In [64]:
conn.close()