In [4]:
from pyspark.sql import SparkSession
spark = SparkSession\
        .builder\
        .appName("GenerateDataset")\
        .getOrCreate()

In [6]:
import pandas as pd

trips = pd.DataFrame({
    "origin": [
        "PMI",
        "ATH",
        "JFK",
        "HND"
    ],
    "destination": [
        "OPO",
        "BCN",
        "MAD",
        "LAX"
    ],
    "internal_flight_ids": [
        [2, 1],
        [3],
        [5, 4, 6],
        [8, 9, 7, 0]
    ]    
})
trips = spark.createDataFrame(trips)
trips.show()

+-----------+-------------------+------+
|destination|internal_flight_ids|origin|
+-----------+-------------------+------+
|        OPO|             [2, 1]|   PMI|
|        BCN|                [3]|   ATH|
|        MAD|          [5, 4, 6]|   JFK|
|        LAX|       [8, 9, 7, 0]|   HND|
+-----------+-------------------+------+



In [7]:

flights = pd.DataFrame({
    "internal_flight_id": [
        0, 1, 2, 3, 4, 5, 6, 7, 8, 9
    ],
    "public_flight_number": [
        "FR5763", "UT9586", "B4325", "RW35675", "LP656",
        "NB4321", "CX4599", "AZ8844", "KH8851", "OP8777"
    ]
})
flights = spark.createDataFrame(flights)
flights.show()


+------------------+--------------------+
|internal_flight_id|public_flight_number|
+------------------+--------------------+
|                 0|              FR5763|
|                 1|              UT9586|
|                 2|               B4325|
|                 3|             RW35675|
|                 4|               LP656|
|                 5|              NB4321|
|                 6|              CX4599|
|                 7|              AZ8844|
|                 8|              KH8851|
|                 9|              OP8777|
+------------------+--------------------+



In [8]:
from pyspark.sql.functions import col, explode, posexplode, collect_list, monotonically_increasing_id
from pyspark.sql.window import Window

In [13]:
trips = trips \
    .withColumn("row_id", monotonically_increasing_id())
trips.select("row_id").show()

+-----------+
|     row_id|
+-----------+
|          0|
| 8589934592|
|17179869184|
|25769803776|
+-----------+



In [14]:
exploded = trips \
    .select(col("row_id"),
            explode(col("internal_flight_ids")) \
               .alias("internal_flight_id"))
exploded.show()

+-----------+------------------+
|     row_id|internal_flight_id|
+-----------+------------------+
|          0|                 2|
|          0|                 1|
| 8589934592|                 3|
|17179869184|                 5|
|17179869184|                 4|
|17179869184|                 6|
|25769803776|                 8|
|25769803776|                 9|
|25769803776|                 7|
|25769803776|                 0|
+-----------+------------------+



In [15]:
exploded_with_flight_number = exploded \
    .join(flights, on="internal_flight_id")
exploded_with_flight_number.show()

+------------------+-----------+--------------------+
|internal_flight_id|     row_id|public_flight_number|
+------------------+-----------+--------------------+
|                 0|25769803776|              FR5763|
|                 7|25769803776|              AZ8844|
|                 6|17179869184|              CX4599|
|                 9|25769803776|              OP8777|
|                 5|17179869184|              NB4321|
|                 1|          0|              UT9586|
|                 3| 8589934592|             RW35675|
|                 8|25769803776|              KH8851|
|                 2|          0|               B4325|
|                 4|17179869184|               LP656|
+------------------+-----------+--------------------+



In [17]:
collected = exploded_with_flight_number \
    .groupBy("row_id") \
    .agg(collect_list("public_flight_number") \
        .alias("public_flight_numbers"))

collected.show()

+-----------+---------------------+
|     row_id|public_flight_numbers|
+-----------+---------------------+
| 8589934592|            [RW35675]|
|          0|      [UT9586, B4325]|
|25769803776| [FR5763, AZ8844, ...|
|17179869184| [CX4599, NB4321, ...|
+-----------+---------------------+



In [18]:
trips_with_flight_numbers = collected \
    .join(trips, on="row_id") \
    .drop("row_id") \
    .drop("internal_flight_ids")
trips_with_flight_numbers.show()

+---------------------+-----------+------+
|public_flight_numbers|destination|origin|
+---------------------+-----------+------+
|            [RW35675]|        BCN|   ATH|
|      [UT9586, B4325]|        OPO|   PMI|
| [FR5763, AZ8844, ...|        LAX|   HND|
| [CX4599, NB4321, ...|        MAD|   JFK|
+---------------------+-----------+------+



In [19]:
exploded = trips \
    .select(col("row_id"),
            posexplode(col("internal_flight_ids"))) \
    .withColumnRenamed("col", "internal_flight_id") \
    .withColumnRenamed("pos", "position")
exploded.show()

+-----------+--------+------------------+
|     row_id|position|internal_flight_id|
+-----------+--------+------------------+
|          0|       0|                 2|
|          0|       1|                 1|
| 8589934592|       0|                 3|
|17179869184|       0|                 5|
|17179869184|       1|                 4|
|17179869184|       2|                 6|
|25769803776|       0|                 8|
|25769803776|       1|                 9|
|25769803776|       2|                 7|
|25769803776|       3|                 0|
+-----------+--------+------------------+



In [20]:
exploded_with_flight_number = exploded \
    .join(flights, on="internal_flight_id")
collected = exploded_with_flight_number \
    .withColumn("public_flight_numbers",
                collect_list("public_flight_number")
                    .over(Window \
                        .partitionBy("row_id") \
                        .orderBy("position") \
                        .rowsBetween(Window.unboundedPreceding,
                                     Window.unboundedFollowing))) \
    .select(["row_id", "public_flight_numbers"])
collected.show()

+-----------+---------------------+
|     row_id|public_flight_numbers|
+-----------+---------------------+
| 8589934592|            [RW35675]|
|          0|      [B4325, UT9586]|
|          0|      [B4325, UT9586]|
|25769803776| [KH8851, OP8777, ...|
|25769803776| [KH8851, OP8777, ...|
|25769803776| [KH8851, OP8777, ...|
|25769803776| [KH8851, OP8777, ...|
|17179869184| [NB4321, LP656, C...|
|17179869184| [NB4321, LP656, C...|
|17179869184| [NB4321, LP656, C...|
+-----------+---------------------+



In [21]:
trips_with_flight_numbers = collected \
    .dropDuplicates() \
    .join(trips, on="row_id") \
    .drop("row_id") \
    .drop("internal_flight_ids")
trips_with_flight_numbers.show()

+---------------------+-----------+------+
|public_flight_numbers|destination|origin|
+---------------------+-----------+------+
|            [RW35675]|        BCN|   ATH|
|      [B4325, UT9586]|        OPO|   PMI|
| [KH8851, OP8777, ...|        LAX|   HND|
| [NB4321, LP656, C...|        MAD|   JFK|
+---------------------+-----------+------+

