In [9]:
import findspark
findspark.init('C:\\spark\\spark-3.0.0-hadoop2.7')
import pyspark
import pandas as pd

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Basic').getOrCreate()

In [7]:
trip = spark.createDataFrame(
[('PMI','OPO',[2,1]),
('ATH','BCN',[3]),
('JKF','LAX',[5,4,6]),
('HND','OPO',[8,9,7,0])],['origin','destination','internal_flight']
)

In [8]:
trip.show()

+------+-----------+---------------+
|origin|destination|internal_flight|
+------+-----------+---------------+
|   PMI|        OPO|         [2, 1]|
|   ATH|        BCN|            [3]|
|   JKF|        LAX|      [5, 4, 6]|
|   HND|        OPO|   [8, 9, 7, 0]|
+------+-----------+---------------+



In [30]:
flights_df = pd.DataFrame({'internal_flight':[0,1,2,3,4,5,6,7,8,9],
                      'public_flight_no': ['FR5760','FR5761','FR5762','FR5763','FR5764','FR5765','FR5766','FR5767','FR5768','FR5769']})


In [31]:
type(flights_df)

pandas.core.frame.DataFrame

In [11]:
# Create Spark Dataframe from Pandas Dataframe
flights = spark.createDataFrame(flights_df)

In [32]:
type(flights)

pyspark.sql.dataframe.DataFrame

In [13]:
flights.show()

+---------------+----------------+
|internal_flight|public_flight_no|
+---------------+----------------+
|              0|          FR5760|
|              1|          FR5761|
|              2|          FR5762|
|              3|          FR5763|
|              4|          FR5764|
|              5|          FR5765|
|              6|          FR5766|
|              7|          FR5767|
|              8|          FR5768|
|              9|          FR5769|
+---------------+----------------+



In [14]:
trip.describe()

DataFrame[summary: string, origin: string, destination: string]

In [19]:
trip.select('origin').show()

+------+
|origin|
+------+
|   PMI|
|   ATH|
|   JKF|
|   HND|
+------+



In [20]:
trip.select('internal_flight')

DataFrame[internal_flight: array<bigint>]

In [22]:
trip.select('internal_flight').describe()

DataFrame[summary: string]

In [23]:
trip['internal_flight']

Column<b'internal_flight'>

In [27]:
trip.select('internal_flight').show()

+---------------+
|internal_flight|
+---------------+
|         [2, 1]|
|            [3]|
|      [5, 4, 6]|
|   [8, 9, 7, 0]|
+---------------+



In [35]:
trip.printSchema()

root
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- internal_flight: array (nullable = true)
 |    |-- element: long (containsNull = true)



In [48]:
from pyspark.sql.types import ArrayType,StringType,IntegerType
from pyspark.sql.functions import udf

In [193]:
l = [1,5]
flights.filter(flights.internal_flight.isin(l)).select('public_flight_no').collect()

[Row(public_flight_no='FR5761'), Row(public_flight_no='FR5765')]

In [211]:
def map_public_flight_name(internal_flight):
    l = list()
    for fly in internal_flight:
        l.append(fly)
        
    return l

In [212]:
# public_flight_name_udf = udf(map_public_flight_name, ArrayType(IntegerType()))
public_flight_udf = udf(lambda l: map_public_flight_name(l))

In [215]:
# public_flight_name_udf = udf(map_public_flight_name, ArrayType(IntegerType()))
public_flight_udf = udf(lambda l: [flight for flight in l])

In [216]:
trip.withColumn('public_flight', public_flight_udf(trip['internal_flight'])).show()

+------+-----------+---------------+-------------+
|origin|destination|internal_flight|public_flight|
+------+-----------+---------------+-------------+
|   PMI|        OPO|         [2, 1]|       [2, 1]|
|   ATH|        BCN|            [3]|          [3]|
|   JKF|        LAX|      [5, 4, 6]|    [5, 4, 6]|
|   HND|        OPO|   [8, 9, 7, 0]| [8, 9, 7, 0]|
+------+-----------+---------------+-------------+



In [218]:
from pyspark.sql.functions import col,explode,posexplode,collect_list,monotonically_increasing_id
from pyspark.sql.window import Window

In [221]:
trip = trip.withColumn('row_id',monotonically_increasing_id())

In [222]:
trip.show()

+------+-----------+---------------+-----------+
|origin|destination|internal_flight|     row_id|
+------+-----------+---------------+-----------+
|   PMI|        OPO|         [2, 1]| 8589934592|
|   ATH|        BCN|            [3]|25769803776|
|   JKF|        LAX|      [5, 4, 6]|42949672960|
|   HND|        OPO|   [8, 9, 7, 0]|60129542144|
+------+-----------+---------------+-----------+



In [224]:
exploded = trip.select(('row_id'),explode(col('internal_flight')).alias('internal_flight'))

In [225]:
exploded.show()

+-----------+---------------+
|     row_id|internal_flight|
+-----------+---------------+
| 8589934592|              2|
| 8589934592|              1|
|25769803776|              3|
|42949672960|              5|
|42949672960|              4|
|42949672960|              6|
|60129542144|              8|
|60129542144|              9|
|60129542144|              7|
|60129542144|              0|
+-----------+---------------+



In [228]:
exploded_with_flight_number = exploded.join(flights, on=['internal_flight'], how='inner')

In [229]:
exploded_with_flight_number.show()

+---------------+-----------+----------------+
|internal_flight|     row_id|public_flight_no|
+---------------+-----------+----------------+
|              0|60129542144|          FR5760|
|              7|60129542144|          FR5767|
|              6|42949672960|          FR5766|
|              9|60129542144|          FR5769|
|              5|42949672960|          FR5765|
|              1| 8589934592|          FR5761|
|              3|25769803776|          FR5763|
|              8|60129542144|          FR5768|
|              2| 8589934592|          FR5762|
|              4|42949672960|          FR5764|
+---------------+-----------+----------------+



In [230]:
collected = exploded_with_flight_number.groupBy('row_id').agg(collect_list('public_flight_no').alias('public_flight_number'))

In [231]:
collected.show()

+-----------+--------------------+
|     row_id|public_flight_number|
+-----------+--------------------+
| 8589934592|    [FR5761, FR5762]|
|60129542144|[FR5760, FR5767, ...|
|42949672960|[FR5766, FR5765, ...|
|25769803776|            [FR5763]|
+-----------+--------------------+



In [232]:
trip = trip.join(collected, on=['row_id'], how='inner')

In [233]:
trip.show()

+-----------+------+-----------+---------------+--------------------+
|     row_id|origin|destination|internal_flight|public_flight_number|
+-----------+------+-----------+---------------+--------------------+
| 8589934592|   PMI|        OPO|         [2, 1]|    [FR5761, FR5762]|
|60129542144|   HND|        OPO|   [8, 9, 7, 0]|[FR5760, FR5767, ...|
|42949672960|   JKF|        LAX|      [5, 4, 6]|[FR5766, FR5765, ...|
|25769803776|   ATH|        BCN|            [3]|            [FR5763]|
+-----------+------+-----------+---------------+--------------------+



In [240]:
trip= trip.drop('internal_flight','row_id')

In [241]:
trip.show()

+------+-----------+--------------------+
|origin|destination|public_flight_number|
+------+-----------+--------------------+
|   PMI|        OPO|    [FR5761, FR5762]|
|   HND|        OPO|[FR5760, FR5767, ...|
|   JKF|        LAX|[FR5766, FR5765, ...|
|   ATH|        BCN|            [FR5763]|
+------+-----------+--------------------+



In [242]:
flights.show()

+---------------+----------------+
|internal_flight|public_flight_no|
+---------------+----------------+
|              0|          FR5760|
|              1|          FR5761|
|              2|          FR5762|
|              3|          FR5763|
|              4|          FR5764|
|              5|          FR5765|
|              6|          FR5766|
|              7|          FR5767|
|              8|          FR5768|
|              9|          FR5769|
+---------------+----------------+



In [None]:
# If you notice in trip order of flight in list has been changed, whereas order of item in list matter for this business scenario.

In [243]:
# Following is right solution.

trip_c = spark.createDataFrame(
[('PMI','OPO',[2,1]),
('ATH','BCN',[3]),
('JKF','LAX',[5,4,6]),
('HND','OPO',[8,9,7,0])],['origin','destination','internal_flight']
)

In [245]:
trip_c = trip_c.withColumn(('row_id'), monotonically_increasing_id())
trip_c.show()

+------+-----------+---------------+-----------+
|origin|destination|internal_flight|     row_id|
+------+-----------+---------------+-----------+
|   PMI|        OPO|         [2, 1]| 8589934592|
|   ATH|        BCN|            [3]|25769803776|
|   JKF|        LAX|      [5, 4, 6]|42949672960|
|   HND|        OPO|   [8, 9, 7, 0]|60129542144|
+------+-----------+---------------+-----------+



In [261]:
exploded = trip_c.select(col('row_id'),posexplode(col('internal_flight')))

In [262]:
exploded.show()

+-----------+---+---+
|     row_id|pos|col|
+-----------+---+---+
| 8589934592|  0|  2|
| 8589934592|  1|  1|
|25769803776|  0|  3|
|42949672960|  0|  5|
|42949672960|  1|  4|
|42949672960|  2|  6|
|60129542144|  0|  8|
|60129542144|  1|  9|
|60129542144|  2|  7|
|60129542144|  3|  0|
+-----------+---+---+



In [263]:
exploded = exploded.withColumnRenamed('col','internal_flight').withColumnRenamed('pos','position')

In [264]:
exploded.show()

+-----------+--------+---------------+
|     row_id|position|internal_flight|
+-----------+--------+---------------+
| 8589934592|       0|              2|
| 8589934592|       1|              1|
|25769803776|       0|              3|
|42949672960|       0|              5|
|42949672960|       1|              4|
|42949672960|       2|              6|
|60129542144|       0|              8|
|60129542144|       1|              9|
|60129542144|       2|              7|
|60129542144|       3|              0|
+-----------+--------+---------------+



In [265]:
exploded_with_flight_number = exploded.join(flights,on=['internal_flight'], how='inner')

In [266]:
exploded_with_flight_number.show()

+---------------+-----------+--------+----------------+
|internal_flight|     row_id|position|public_flight_no|
+---------------+-----------+--------+----------------+
|              0|60129542144|       3|          FR5760|
|              7|60129542144|       2|          FR5767|
|              6|42949672960|       2|          FR5766|
|              9|60129542144|       1|          FR5769|
|              5|42949672960|       0|          FR5765|
|              1| 8589934592|       1|          FR5761|
|              3|25769803776|       0|          FR5763|
|              8|60129542144|       0|          FR5768|
|              2| 8589934592|       0|          FR5762|
|              4|42949672960|       1|          FR5764|
+---------------+-----------+--------+----------------+



In [272]:
collected = exploded_with_flight_number.withColumn('public_flight_no',
                                                   collect_list('public_flight_no').over(
                                                       Window.partitionBy('row_id')
                                                       .orderBy('position')
                                                       .rowsBetween(
                                                           Window.unboundedPreceding,
                                                           Window.unboundedFollowing
                                                       )
                                                    )
                                                  ).select(['row_id','public_flight_no'])

In [273]:
collected.show()

+-----------+--------------------+
|     row_id|    public_flight_no|
+-----------+--------------------+
| 8589934592|    [FR5762, FR5761]|
| 8589934592|    [FR5762, FR5761]|
|60129542144|[FR5768, FR5769, ...|
|60129542144|[FR5768, FR5769, ...|
|60129542144|[FR5768, FR5769, ...|
|60129542144|[FR5768, FR5769, ...|
|42949672960|[FR5765, FR5764, ...|
|42949672960|[FR5765, FR5764, ...|
|42949672960|[FR5765, FR5764, ...|
|25769803776|            [FR5763]|
+-----------+--------------------+



In [274]:
collected = collected.dropDuplicates()

In [275]:
collected.show()

+-----------+--------------------+
|     row_id|    public_flight_no|
+-----------+--------------------+
| 8589934592|    [FR5762, FR5761]|
|60129542144|[FR5768, FR5769, ...|
|42949672960|[FR5765, FR5764, ...|
|25769803776|            [FR5763]|
+-----------+--------------------+



In [277]:
trips_with_flight_name = collected.join(trip_c, on=['row_id'],how='inner')

In [278]:
trips_with_flight_name.show()

+-----------+--------------------+------+-----------+---------------+
|     row_id|    public_flight_no|origin|destination|internal_flight|
+-----------+--------------------+------+-----------+---------------+
| 8589934592|    [FR5762, FR5761]|   PMI|        OPO|         [2, 1]|
|60129542144|[FR5768, FR5769, ...|   HND|        OPO|   [8, 9, 7, 0]|
|42949672960|[FR5765, FR5764, ...|   JKF|        LAX|      [5, 4, 6]|
|25769803776|            [FR5763]|   ATH|        BCN|            [3]|
+-----------+--------------------+------+-----------+---------------+



In [279]:
trips_with_flight_name = trips_with_flight_name.drop('row_id','internal_flight')

In [280]:
trips_with_flight_name.show()

+--------------------+------+-----------+
|    public_flight_no|origin|destination|
+--------------------+------+-----------+
|    [FR5762, FR5761]|   PMI|        OPO|
|[FR5768, FR5769, ...|   HND|        OPO|
|[FR5765, FR5764, ...|   JKF|        LAX|
|            [FR5763]|   ATH|        BCN|
+--------------------+------+-----------+

