In [2]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("TransformData").getOrCreate()

In [3]:
# Define the file path pattern
file_path_pattern = r"D:\GR1\dataset\price_kaggle\file_part_*.csv"

# Load and concatenate all CSV files matching the pattern
combined_data = spark.read.csv(file_path_pattern, header=True, inferSchema=True)

# Show a sample of the combined data to verify
combined_data.show()

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segments

In [4]:
columns_to_keep = [
    "legId", "flightDate", "startingAirport", "destinationAirport", 
    "travelDuration", "isBasicEconomy", "isRefundable", "isNonStop", 
    "baseFare", "totalFare", "seatsRemaining", "segmentsDepartureTimeRaw", 
    "segmentsArrivalTimeRaw", "segmentsArrivalAirportCode", 
    "segmentsDepartureAirportCode", "segmentsAirlineName", 
    "segmentsAirlineCode", "segmentsDurationInSeconds"
]

# Select only the desired columns
filtered_data = combined_data.select(columns_to_keep)

# Show a sample of the filtered data to verify
filtered_data.show()

+--------------------+----------+---------------+------------------+--------------+--------------+------------+---------+--------+---------+--------------+------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+-------------------------+
|               legId|flightDate|startingAirport|destinationAirport|travelDuration|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|segmentsDepartureTimeRaw|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsDurationInSeconds|
+--------------------+----------+---------------+------------------+--------------+--------------+------------+---------+--------+---------+--------------+------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+-------------------------+
|9ca0e81111c683bec.

In [5]:
from pyspark.sql.functions import col

# ============================================ 
# Chuyển đổi các cột sang kiểu boolean
# ============================================ 

filtered_data = filtered_data.withColumn("isRefundable", col("isRefundable").cast("boolean"))
filtered_data = filtered_data.withColumn("isNonStop", col("isNonStop").cast("boolean"))
filtered_data = filtered_data.withColumn("isBasicEconomy", col("isBasicEconomy").cast("boolean"))

In [6]:
# ============================================ 
# Chuyển đổi cột travelDuration từ PT2H29M sang số giờ
# ============================================ 

from pyspark.sql.functions import udf, round
from pyspark.sql.types import DoubleType
import re

# Hàm Python để chuyển đổi định dạng PT2H29M sang số giờ
def convert_duration_to_hours(duration):
    match = re.match(r'PT(\d+)H(\d+)M', duration)
    if match:
        hours = int(match.group(1))
        minutes = int(match.group(2))
        return hours + minutes / 60.0
    return None

# Đăng ký hàm UDF
convert_duration_udf = udf(convert_duration_to_hours, DoubleType())

# Áp dụng UDF lên cột travelDuration và làm tròn đến 2 chữ số thập phân
filtered_data = filtered_data.withColumn("travelDuration", round(convert_duration_udf(col("travelDuration")), 2))

filtered_data.show()

+--------------------+----------+---------------+------------------+--------------+--------------+------------+---------+--------+---------+--------------+------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+-------------------------+
|               legId|flightDate|startingAirport|destinationAirport|travelDuration|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|segmentsDepartureTimeRaw|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsDurationInSeconds|
+--------------------+----------+---------------+------------------+--------------+--------------+------------+---------+--------+---------+--------------+------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+-------------------------+
|9ca0e81111c683bec.

In [7]:
# Print the data types of each column in the DataFrame
for column, dtype in filtered_data.dtypes:
    print(f"Column: {column}, Type: {dtype}")

Column: legId, Type: string
Column: flightDate, Type: date
Column: startingAirport, Type: string
Column: destinationAirport, Type: string
Column: travelDuration, Type: double
Column: isBasicEconomy, Type: boolean
Column: isRefundable, Type: boolean
Column: isNonStop, Type: boolean
Column: baseFare, Type: double
Column: totalFare, Type: double
Column: seatsRemaining, Type: int
Column: segmentsDepartureTimeRaw, Type: string
Column: segmentsArrivalTimeRaw, Type: string
Column: segmentsArrivalAirportCode, Type: string
Column: segmentsDepartureAirportCode, Type: string
Column: segmentsAirlineName, Type: string
Column: segmentsAirlineCode, Type: string
Column: segmentsDurationInSeconds, Type: string


In [8]:
# ============================================ 
# Tạo bảng Dim_Airline
# ============================================ 

from pyspark.sql import functions as F
from pyspark.sql import Window

# Tách các giá trị trong cột segmentsAirlineName và segmentsAirlineCode
exploded_data = filtered_data.select(
    F.explode(F.split(F.col("segmentsAirlineName"), r'\|\|')).alias("airline_name"),
    F.explode(F.split(F.col("segmentsAirlineCode"), r'\|\|')).alias("airline_code")
)

# Loại bỏ các hàng trùng lặp
distinct_airlines = exploded_data.dropDuplicates()

# Tạo khóa chính duy nhất cho mỗi hàng
window_spec = Window.orderBy(F.monotonically_increasing_id())
dim_airline = distinct_airlines.withColumn("airline_key", F.row_number().over(window_spec))

# Sắp xếp lại các cột theo thứ tự yêu cầu
dim_airline = dim_airline.select("airline_key", "airline_code", "airline_name")

# Hiển thị bảng Dim_Airline
dim_airline.show()

# In ra số bản ghi của dim_airline
record_count = dim_airline.count()
print(f"Số bản ghi của dim_airline: {record_count}")



+-----------+------------+--------------------+
|airline_key|airline_code|        airline_name|
+-----------+------------+--------------------+
|          1|          B6|            Cape Air|
|          2|          9K|     JetBlue Airways|
|          3|          9K|            Cape Air|
|          4|          AS|     Alaska Airlines|
|          5|          AS|              United|
|          6|          AA|            Cape Air|
|          7|          9K|   American Airlines|
|          8|          UA|            Cape Air|
|          9|          UA|Southern Airways ...|
|         10|          4B|   American Airlines|
|         11|          SY|Sun Country Airlines|
|         12|          9K|               Delta|
|         13|          UA|        Boutique Air|
|         14|          KG|   American Airlines|
|         15|          KG|        Key Lime Air|
|         16|          NK|     Spirit Airlines|
|         17|          9X|   American Airlines|
|         18|          DL|            Ca

In [10]:
# ============================================ 
# Tạo bảng Dim_FlightType
# ============================================ 

from pyspark.sql import functions as F
from pyspark.sql import Window

# Tạo bảng dim_FlightType từ các cột isBasicEconomy, isNonStop, isRefundable
flight_type_data = filtered_data.select("isBasicEconomy", "isNonStop", "isRefundable").dropDuplicates()

# Chuyển đổi các giá trị boolean thành 0 và 1
flight_type_data = flight_type_data.withColumn("isBasicEconomy", F.when(F.col("isBasicEconomy") == True, 1).otherwise(0))
flight_type_data = flight_type_data.withColumn("isNonStop", F.when(F.col("isNonStop") == True, 1).otherwise(0))
flight_type_data = flight_type_data.withColumn("isRefundable", F.when(F.col("isRefundable") == True, 1).otherwise(0))

# Tạo khóa chính duy nhất cho mỗi hàng
window_spec = Window.orderBy(F.monotonically_increasing_id())
dim_flight_type = flight_type_data.withColumn("flight_type_key", F.row_number().over(window_spec))

# Sắp xếp lại các cột theo thứ tự yêu cầu
dim_flight_type = dim_flight_type.select("flight_type_key", "isBasicEconomy", "isNonStop", "isRefundable")

# Hiển thị bảng Dim_FlightType
dim_flight_type.show()

+---------------+--------------+---------+------------+
|flight_type_key|isBasicEconomy|isNonStop|isRefundable|
+---------------+--------------+---------+------------+
|              1|             0|        0|           0|
|              2|             1|        0|           0|
|              3|             1|        1|           0|
|              4|             0|        1|           0|
|              5|             0|        0|           1|
+---------------+--------------+---------+------------+



In [11]:
# ============================================ 
# Tạo bảng Dim_Date
# ============================================ 

from pyspark.sql import functions as F
from pyspark.sql import Window

# Tạo bảng dim_date từ cột flightDate
dim_date = filtered_data.select("flightDate").dropDuplicates()

# Tách các thành phần year, month, day từ cột flightDate
dim_date = dim_date.withColumn("year", F.year("flightDate")) \
                   .withColumn("month", F.month("flightDate")) \
                   .withColumn("day", F.dayofmonth("flightDate"))

# Tạo khóa chính duy nhất cho mỗi ngày
window_spec = Window.orderBy("flightDate")
dim_date = dim_date.withColumn("date_key", F.row_number().over(window_spec))

# Sắp xếp lại các cột theo thứ tự yêu cầu
dim_date = dim_date.select("date_key", "flightDate", "year", "month", "day")

# Hiển thị bảng Dim_Date
dim_date.show()

+--------+----------+----+-----+---+
|date_key|flightDate|year|month|day|
+--------+----------+----+-----+---+
|       1|2022-04-17|2022|    4| 17|
|       2|2022-04-18|2022|    4| 18|
|       3|2022-04-19|2022|    4| 19|
|       4|2022-04-20|2022|    4| 20|
|       5|2022-04-21|2022|    4| 21|
|       6|2022-04-22|2022|    4| 22|
|       7|2022-04-23|2022|    4| 23|
|       8|2022-04-24|2022|    4| 24|
|       9|2022-04-25|2022|    4| 25|
|      10|2022-04-26|2022|    4| 26|
|      11|2022-04-27|2022|    4| 27|
|      12|2022-04-28|2022|    4| 28|
|      13|2022-04-29|2022|    4| 29|
|      14|2022-04-30|2022|    4| 30|
|      15|2022-05-01|2022|    5|  1|
|      16|2022-05-02|2022|    5|  2|
|      17|2022-05-03|2022|    5|  3|
|      18|2022-05-04|2022|    5|  4|
|      19|2022-05-05|2022|    5|  5|
|      20|2022-05-06|2022|    5|  6|
+--------+----------+----+-----+---+
only showing top 20 rows



In [15]:
# =====================================================
# Tạo bảng Dim_Flight từ các cột legId, startingAirport, destinationAirport, totalFare, seatsRemaining, travelDuration
# ======================================================

from pyspark.sql import functions as F
from pyspark.sql import Window

# Select necessary columns from the DataFrame, including flightDate
flight_data = filtered_data.select(
    "legId", "flightDate", "startingAirport", "destinationAirport", 
    "totalFare", "seatsRemaining", "travelDuration"
).dropDuplicates()

# Create a unique primary key for each row
window_spec = Window.orderBy(F.monotonically_increasing_id())
dim_flight = flight_data.withColumn("flight_key", F.row_number().over(window_spec))

# Rearrange columns to include flightDate
dim_flight = dim_flight.select("flight_key", "legId", "flightDate", "startingAirport", 
                               "destinationAirport", "totalFare", 
                               "seatsRemaining", "travelDuration")

# Display the Dim_Flight table
dim_flight.show()

+----------+--------------------+----------+---------------+------------------+---------+--------------+--------------+
|flight_key|               legId|flightDate|startingAirport|destinationAirport|totalFare|seatsRemaining|travelDuration|
+----------+--------------------+----------+---------------+------------------+---------+--------------+--------------+
|         1|03f618316e2382ae1...|2022-04-17|            ATL|               DTW|    430.2|             4|          9.75|
|         2|5bb138f39804320ec...|2022-04-17|            ATL|               EWR|    198.6|             7|          2.15|
|         3|cfea9da52b84b1735...|2022-04-17|            ATL|               EWR|    278.6|             9|           2.1|
|         4|e18561718207c3100...|2022-04-17|            ATL|               LGA|    377.6|             1|          5.58|
|         5|90261b51f671a2e11...|2022-04-17|            ATL|               ORD|    328.6|             1|          2.25|
|         6|fb20dcc8134409fcd...|2022-04