Readme:

Purpose: Automation of Child Seating 
- Note this code will not run as is and needs to have data and options inserted in varius places to make it run.

Current Version:
- 1/1/24 - code cleanup and optimization/ making generic for git
- 7/17/23 - through flight restructure and integration
- 6/20/23 - Initial Creation

Section Summary:
 - Imports and connections
 - Define Functions
 - Universal Datasets
    - Date Range Setup
 - Heracles Protocol
    - Query to find unassigned Children
    - Pull all PNR's Related to families
    - Seat Purchase Query
    - Identfy Children in Exit Rows to Reassign
 - Through Flights
   - Identify all through flights
   - Create through flight seat groups
   - Create through flight passenger groups
   - Through Seat Assignment
      - Expand out through seat assignment to both legs
 - Single Leg Assignments
    - Create single leg seat groups (removing already assigned)
    - Create single leg pax groups
    - Single leg pax assignment
 - Seat Upsale
    - Pull in pax selected for upsale (remove all through assignments)
    - Split flights into test and control groups
    - Split those on test flights into test and control groups
 - Pax Number Assignment
    - Passenger Number query
 - Final Assembly
    - Query for passenger info and contact

 Inputs:
 - passenger import from sql query
 - airplane seatmap

 
 Outputs:
  - automated list for import to api for assignment

 Definitions:
  - PNR/Recordlocator - identifier for booking
  - Pax is passenger
  - Unit designator is a seat on an aircraft
  - LF or Loadfactor is how full the plane is

## Imports


In [0]:
#imports
from datetime import date, datetime, timedelta
from pyspark.sql import functions as F, Window
from pyspark.sql.types import IntegerType,BooleanType,DateType, StringType
from pyspark.sql.functions import dayofweek, concat_ws, array, sort_array, month, year,date_format, to_date, current_date
from pyspark.sql.functions import concat_ws, array, sort_array
from pyspark.sql.functions import month

'''insert database connector as well'''

## Define Functions

In [0]:
'''Define all universal functions that will be used in the algorithm'''

''' function to change group items and create list column'''    
def groupset(df, group, target, colname):
    df = df.groupby(group).agg(F.concat_ws(",", F.collect_list(target)))
    df = df.withColumnRenamed(df.columns[-1],colname)
    df = df.withColumn(colname, F.split(colname, ','))
    return df

'''function for collecting dataset of passenger numbers'''
def paxnumbers(df):
    df = df[['PassengerID','RecordLocator']]
    df = df.dropDuplicates(subset = ['PassengerID'])
    df = df.sort(F.col('PassengerID'))
    df = df.groupby('RecordLocator').agg(F.concat_ws(",", F.collect_list(df.PassengerID))) #aggregate passenger ids into single row per PNR
    df = df.withColumnRenamed('concat_ws(,, collect_list(PassengerID))','PassengerID')
    df = df.withColumn('PassengerID', F.split('PassengerID', ',')) #turn string to list
    df = df.withColumn('PassengerNumber',F.lit('0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40')) #add passenger number list
    df = df.withColumn('PaxNumMax',F.size(F.col('PassengerID'))).withColumn('PassengerNumber',F.split('PassengerNumber', ',')) #pull max number of passenger id's needed
    df = df.withColumn("PassengerNumber", F.expr("slice(PassengerNumber,1, PaxNumMax)")) #pull correct number of passenger numbers needed for each PNR
    people = df.select(df.RecordLocator,F.explode(df.PassengerID)) #explode list for pax ids
    no = df.select(df.RecordLocator,F.explode(df.PassengerNumber)) #explode list for pax #
    people = people.withColumn("id", F.monotonically_increasing_id()).withColumnRenamed('col','PassengerID') #give unique id per row
    no = no.withColumn("id", F.monotonically_increasing_id()).withColumnRenamed('col','PassengerNumber') #give unique id per row
    df = people.join(no,['RecordLocator','ID']) #join on unique id and record locator
    df = df.drop('id')
    df = df.withColumn('PassengerNumber', F.col('PassengerNumber').cast(IntegerType())).withColumn('PassengerID', F.col('PassengerID').cast(IntegerType()))
    return df

''' builds out list of sequential seats dataset'''
def seqseats(df,seat_priority,type):
    if type =='through':
        groupby = ['InventoryKey','Row']
        cols = ['InventoryKey','Row','EmptySeats','Seats','Priority','rowcap','seqcap','seqseats']
    else:
        groupby = ['InventoryLegID','Row']
        cols = ['InventoryLegID','Row','EmptySeats','Seats','Priority','rowcap','seqcap','seqseats']
    availableseats = df.join(seat_priority,['Equipment','LF','UnitDesignator']) #join with seat priority
    availableseats = availableseats.withColumn('Row',F.expr("substring(UnitDesignator, 1, length(UnitDesignator)-1)")) #pull row number
    availableseats = availableseats.withColumn('Seats',F.col('UnitDesignator').substr(-1,1)) #pull seat letter
    availableseats_pr = availableseats.groupby(groupby).sum('Priority')
    availableseats_pr = availableseats_pr.withColumnRenamed('sum(Priority)','Priority')
    availableseats = availableseats.sort(F.col('Priority'))
    availableseats_seats = availableseats.groupby(groupby).agg(F.concat_ws(",", F.collect_list(availableseats.UnitDesignator)),F.concat_ws(",", F.collect_list(availableseats.Seats))) #put sum seats into row
    availableseats = availableseats_seats.join(availableseats_pr,groupby)
    availableseats = availableseats.withColumnRenamed('concat_ws(,, collect_list(UnitDesignator))','EmptySeats').withColumnRenamed('concat_ws(,, collect_list(Seats))','Seats')
    availableseats = availableseats.withColumn('EmptySeats', F.split('EmptySeats', ',')) #string to list
    availableseats = availableseats.withColumn('rowcap',F.size(F.col('EmptySeats'))) #create row capacity size
    availableseats = availableseats.withColumn('Priority',F.col('Priority')/F.col('rowcap'))
    availableseats = availableseats.withColumn('seqseats', 
                                            F.when(F.col('Seats').contains('A,B,C,D,E,F'),'A,B,C,D,E,F')
                                            .when(F.col('Seats').contains('F,E,D,C,B,A'),'F,E,D,C,B,A')
                                            .when(F.col('Seats').contains('B,C,D,E,F'),'B,C,D,E,F')
                                            .when(F.col('Seats').contains('F,E,D,C,B'),'F,E,D,C,B')
                                            .when(F.col('Seats').contains('A,B,C,D,E'),'A,B,C,D,E')
                                            .when(F.col('Seats').contains('E,D,C,B,A'),'E,D,C,B,A')
                                            .when(F.col('Seats').contains('C,D,E,F'),'C,D,E,F')
                                            .when(F.col('Seats').contains('F,E,D,C'),'F,E,D,C')
                                            .when(F.col('Seats').contains('A,B,C,D'),'A,B,C,D')
                                            .when(F.col('Seats').contains('D,C,B,A'),'D,C,B,A')
                                            .when(F.col('Seats').contains('B,C,D,E'),'B,C,D,E')
                                            .when(F.col('Seats').contains('E,D,C,B'),'E,D,C,B')
                                            .when(F.col('Seats').contains('A,B,C'),'A,B,C')
                                            .when(F.col('Seats').contains('C,B,A'),'C,B,A')
                                            .when(F.col('Seats').contains('B,C,D'),'B,C,D')
                                            .when(F.col('Seats').contains('D,C,B'),'D,C,B')
                                            .when(F.col('Seats').contains('C,D,E'),'C,D,E')
                                            .when(F.col('Seats').contains('E,D,C'),'E,D,C')
                                            .when(F.col('Seats').contains('D,E,F'),'D,E,F')
                                            .when(F.col('Seats').contains('F,E,D'),'F,E,D')
                                            .when(F.col('Seats').contains('E,F'),'E,F')
                                            .when(F.col('Seats').contains('F,E'),'F,E')
                                            .when(F.col('Seats').contains('D,E'),'D,E')
                                            .when(F.col('Seats').contains('E,D'),'E,D')
                                            .when(F.col('Seats').contains('A,B'),'A,B')
                                            .when(F.col('Seats').contains('B,A'),'B,A')
                                            .when(F.col('Seats').contains('B,C'),'B,C')
                                            .when(F.col('Seats').contains('C,B'),'C,B')
                                            .when(F.col('Seats').contains('A'),'A')
                                            .when(F.col('Seats').contains('B'),'B')
                                            .when(F.col('Seats').contains('C'),'C')
                                            .when(F.col('Seats').contains('D'),'D')
                                            .when(F.col('Seats').contains('E'),'E')
                                            .when(F.col('Seats').contains('F'),'F').otherwise(F.lit('none'))) #create sequential list of seats
    availableseats = availableseats.withColumn('seqseats', F.split('seqseats', ',')) #string to list
    availableseats = availableseats.withColumn('seqcap',F.size(F.col('seqseats'))) #create row capacity size
    availableseats = availableseats.withColumn("seqseats",F.expr('''concat_ws(',',transform(seqseats, x->concat(Row,x)))''')) #combine row with sequential list
    availableseats = availableseats.withColumn('seqseats', F.split('seqseats', ',')) #string to list
    availableseats = availableseats[cols]
    availableseats = availableseats.filter(F.col('seqcap')>0)
    return availableseats

'''builds out passenger groups'''
def paxgroup(familyPNR,type):
    if type =='through':
        groupby = ['InventoryKey','RecordLocator']
        cols1 = ['InventoryKey','RecordLocator','SquadLeader','Hobbits']
        cols2 = ['InventoryKey','RecordLocator','AdultosGroup1','KidGroup1']
        cols3 = ['InventoryKey','RecordLocator','AdultosGroup2','KidGroup2']
    else:
        groupby = ['InventoryLegID','RecordLocator']
        cols1 = ['InventoryLegID','RecordLocator','SquadLeader','Hobbits']
        cols2 = ['InventoryLegID','RecordLocator','AdultosGroup1','KidGroup1']
        cols3 = ['InventoryLegID','RecordLocator','AdultosGroup2','KidGroup2']

    #removal of single passengers
    multi_pax_filter = familyPNR.groupby(groupby).count() #groupby pnr to get pax count
    multi_pax_filter = multi_pax_filter.filter(F.col('count') > 1) #select only pnrs with more than one pax
    multi_pax_filter = multi_pax_filter.drop('count')
    familyPNR = familyPNR.join(multi_pax_filter,groupby)

    #removal of kid only PNRs
    kids_only = familyPNR.groupby(groupby).max('Age') #groupby pnr to get max age
    kids_only = kids_only.filter(F.col('max(Age)')>17)
    kids_only = kids_only.drop('max(Age)')
    familyPNR = familyPNR.join(kids_only,groupby)

    #remove assigned/purchased
    familyPNR = familyPNR.filter(F.col('UnitDesignator')=='')
    #create PaxType 
    familyPNR = familyPNR.withColumn('KidCount', F.when(F.col('Age')<14,1).otherwise(0))
    familyPNR = familyPNR.withColumn('Adults', F.when(F.col('Age')>=18,1).otherwise(0))
    familyPNR = familyPNR.withColumn('Adultos', F.when(F.col('Adults')==1,F.col('PassengerID')).otherwise(F.lit('x')))
    familyPNR = familyPNR.withColumn('Hobbits', F.when(F.col('KidCount')==1,F.col('PassengerID')).otherwise(F.lit('x')))

    #pull all adults
    adultos = familyPNR.filter(F.col('Adults')>0)
    adultos = adultos.sort(F.col('Age').desc()) #sort list so oldest adult is most likely to be paired with a kid
    adultos = adultos.groupby(groupby).agg(F.concat_ws(",", F.collect_list(adultos.Adultos))) #group all adults into single row per pnr
    adultos = adultos.withColumnRenamed('concat_ws(,, collect_list(Adultos))','Adultos')
    adultos = adultos.withColumn('Adultos', F.regexp_replace('Adultos',"x,","")).withColumn('Adultos', F.regexp_replace('Adultos',",x","")).withColumn('Adultos', F.regexp_replace('Adultos',"x","")) #replace null values in string
    adultos = adultos.withColumn('Adultos', F.split('Adultos', ',')) #convert string to list

    #pull all kids
    hobbitses = familyPNR.filter(F.col('Hobbits')>0)
    hobbitses = hobbitses.sort(F.col('Age')) #sort so youngest kids come first
    hobbitses = hobbitses.groupby(groupby).agg(F.concat_ws(",", F.collect_list(hobbitses.Hobbits))) #group all adults into single row per pnr
    hobbitses = hobbitses.withColumnRenamed('concat_ws(,, collect_list(Hobbits))','Hobbits')
    hobbitses = hobbitses.withColumn('Hobbits', F.regexp_replace('Hobbits',"x,","")).withColumn('Hobbits', F.regexp_replace('Hobbits',",x","")).withColumn('Hobbits', F.regexp_replace('Hobbits',"x","")).withColumn('Hobbits', F.regexp_replace('Hobbits',"",'')) #replace null values in string
    hobbitses = hobbitses.withColumn('Hobbits', F.split('Hobbits', ',')) #convert string to list

    #recombine data
    familyPNR = familyPNR.groupby(groupby).sum('KidCount','Adults')
    familyPNR = familyPNR.withColumnRenamed('sum(KidCount)','KidCount').withColumnRenamed('sum(Adults)','GroupMax')
    thegrid = familyPNR.join(adultos, groupby) #add in adult lists
    thegrid = thegrid.join(hobbitses, groupby) #add in kid lists

    #single parent flying with multiple kids
    singleparent = thegrid.filter(F.col('GroupMax')==1) #number of possible groups is 1
    singleparent = singleparent.withColumnRenamed('Adultos','SquadLeader')
    singleparent = singleparent.withColumn("SquadLeader",concat_ws(",",F.col("SquadLeader")))
    singleparent = singleparent[cols1]

    #only one child, just add adult
    pairedriders = thegrid.filter(F.col('KidCount')==1) #filter to pnr with only one kid
    pairedriders = pairedriders.filter(F.col('GroupMax')>1)  #filter to pnr's taken care of by sad parent group
    pairedriders = pairedriders.withColumn('Adultos',F.array([F.col("Adultos")[0]]))
    pairedriders = pairedriders.withColumnRenamed('Adultos','SquadLeader')
    pairedriders = pairedriders.withColumn("SquadLeader",concat_ws(",",F.col("SquadLeader")))
    pairedriders = pairedriders[cols1]

    #PNR with more than one group in them
    groups = thegrid.filter(F.col('GroupMax')>1) #groups with more than one adult
    groups = groups.filter(F.col('KidCount')>1) #groups with more than one kid
    groups = groups.filter(F.col('KidCount')<=10) #groups max at 5 kids in each group. Any larger and we should not be assigning as that would be larger than a row.
    groups = groups.withColumn('AdultosGroup1',F.col("Adultos")[0]).withColumn('AdultosGroup2',F.col("Adultos")[1])
    groups = groups.withColumn("KidGroup1", F.when(F.col('KidCount')==3,F.array([F.col("Hobbits")[0],F.col("Hobbits")[2]]))
                            .when(F.col('KidCount')==4,F.array([F.col("Hobbits")[0],F.col("Hobbits")[2]]))
                            .when(F.col('KidCount')==5,F.array([F.col("Hobbits")[0],F.col("Hobbits")[2],F.col("Hobbits")[4]]))
                            .when(F.col('KidCount')==6,F.array([F.col("Hobbits")[0],F.col("Hobbits")[2],F.col("Hobbits")[4]]))
                            .when(F.col('KidCount')==7,F.array([F.col("Hobbits")[0],F.col("Hobbits")[2],F.col("Hobbits")[4],F.col("Hobbits")[6]]))
                            .when(F.col('KidCount')==8,F.array([F.col("Hobbits")[0],F.col("Hobbits")[2],F.col("Hobbits")[4],F.col("Hobbits")[6]]))
                            .when(F.col('KidCount')==9,F.array([F.col("Hobbits")[0],F.col("Hobbits")[2],F.col("Hobbits")[4],F.col("Hobbits")[6],F.col("Hobbits")[8]]))
                            .when(F.col('KidCount')==10,F.array([F.col("Hobbits")[0],F.col("Hobbits")[2],F.col("Hobbits")[4],F.col("Hobbits")[6],F.col("Hobbits")[8]]))
                            .otherwise(F.array([F.col("Hobbits")[0]]))) ## divide groups up by size into two groups of kids
    groups = groups.withColumn("KidGroup2", F.when(F.col('KidCount')==3,F.array([F.col("Hobbits")[1]]))
                            .when(F.col('KidCount')==4,F.array([F.col("Hobbits")[1],F.col("Hobbits")[3]]))
                            .when(F.col('KidCount')==5,F.array([F.col("Hobbits")[1],F.col("Hobbits")[3]]))
                            .when(F.col('KidCount')==6,F.array([F.col("Hobbits")[1],F.col("Hobbits")[3],F.col("Hobbits")[5]]))
                            .when(F.col('KidCount')==7,F.array([F.col("Hobbits")[1],F.col("Hobbits")[3],F.col("Hobbits")[5]]))
                            .when(F.col('KidCount')==8,F.array([F.col("Hobbits")[1],F.col("Hobbits")[3],F.col("Hobbits")[5],F.col("Hobbits")[7]]))
                            .when(F.col('KidCount')==9,F.array([F.col("Hobbits")[1],F.col("Hobbits")[3],F.col("Hobbits")[5],F.col("Hobbits")[7]]))
                            .when(F.col('KidCount')==10,F.array([F.col("Hobbits")[1],F.col("Hobbits")[3],F.col("Hobbits")[5],F.col("Hobbits")[7],F.col("Hobbits")[9]]))
                            .otherwise(F.array([F.col("Hobbits")[1]]))) ## divide groups up by size into two groups of kids
    groups = groups.withColumn('Hobbits', F.array([F.col('KidGroup1'),F.col('KidGroup2')]))
    groups1 = groups[cols2] #split group 1 from group 2
    groups1 = groups1.withColumnRenamed('AdultosGroup1','Squadleader').withColumnRenamed('KidGroup1','Hobbits')
    groups2 = groups[cols3] #split group 2 from group 1
    groups2 = groups2.withColumnRenamed('AdultosGroup2','Squadleader').withColumnRenamed('KidGroup2','Hobbits')
    df3 = groups1.union(groups2) #recombine both parts of the group dataset
    df3 = df3.union(singleparent) #combine other data sets
    df3 = df3.union(pairedriders) #combine other data sets
    df3 = df3.withColumn("Hobbits",concat_ws(",",F.col("Hobbits")))
    df3 = df3.withColumn('the_squad', F.concat(F.col('Squadleader'),F.lit(","),F.col('Hobbits'))) #combine kid and adult columns to form a squad that will be assigned
    df3 = df3.withColumn('the_squad', F.split('the_squad', ','))
    df3 = df3.withColumn('squadsize',F.size(F.col('the_squad'))) #pull the size of the squad to be tetrised together
    df3 = df3.drop('Squadleader','Hobbits') #remove unneccessary columns
    df3 = df3.filter(F.col('squadsize')<7)
    return df3

## Universal Datasets

In [0]:
#set Date parameter
Date = date.today()
Datestr = Date.strftime('%Y-%m-%d')
MinDate = Date+timedelta(days = 2) 
MinDate = MinDate.strftime('%Y-%m-%d')
FutureDate = Date+timedelta(days = 14)
FutureDate = FutureDate.strftime('%Y-%m-%d')
#the max date we pull from is 2 days from now so that we are not assigning seats 
#into flights that are having people completing check-in
print(MinDate, "-" ,FutureDate)

2023-12-14 - 2023-12-26


## Family Seating

### Query For Unassigned Children

In [0]:
''' This query looks for all passengers under the age of 13. 
It will then be put into a dataframe that removes duplications on RecordLocator. 
This can then be used with a simple inner join in order to filter all passengers down to 
just passengers with a child on the PNR.'''

#to be assigned filter
kid_check_PNR_query = """Insert Query for navitaire that pulls all passengers 13 and younger"""
kid_check = spark.read.jdbc(url=jdbcUrl, table=kid_check_PNR_query)
#write to delta table for efficiency and getting off sql server
kid_check.write.mode("overwrite").saveAsTable("child_seating_filter")

### Pull All PNR's Related to Families 

In [0]:
'''Full passenger query that pulls the passengers as well as their current seat assignment'''

#query for PNR's with children
familyPNR_query = """ Query for all passengers that are flying during that time period"""
familyPNR = spark.read.jdbc(url=jdbcUrl, table=familyPNR_query)
#write to delta table for efficiency and getting off sql server
familyPNR.write.mode("overwrite").saveAsTable("all_passengers")

In [0]:
'''Create a filter for PNR's with children on them as described above'''

filt = spark.sql('select * from child_seating_filter') #pull in child pax query
filt = filt[['RecordLocator']] # reduce data set to just recordlocator
# remove duplication in order to ensure that we are not creating duplicate records in final dataset
filt = filt.dropDuplicates(subset = ['RecordLocator']) 
familyPNR = spark.sql('select * from all_passengers') # pull in full pax query
# join with family filter to remove all PNR that do not have a child on them
familyPNR = familyPNR.join(filt, ['RecordLocator']) 
familyPNR.write.mode("overwrite").saveAsTable("all_passengers") # save new dataset

### Seat Purchases Query

In [0]:
''' Passengers who picked a seat will already have an assignment. 
However, this is also true for all passengers that we have assigned. 
To differentiate and make sure we are not moving any passenger that has purchased a seat, 
we will use this dataset to get a list of pax that purchased 
and remove them from our assign dataset so we do not have to worry about reimbursement for moving one of these pax.'''

#core ancillary query
core_query = """sql query to pull purchased seats"""

core = spark.read.jdbc(url=jdbcUrl, table=core_query)
core = core.write.mode("overwrite").saveAsTable("purchased")

In [0]:
'''This section takes the the seat query and cleans it. 
The final step will be merging the final data set from this with the to assign dataset using a left_anti join
to remove pax from the assign dataset that have purchased seats'''

core = spark.sql('select * from purchased')
core = core.fillna(0, subset = 'InventoryLegID') #fill in any missing inventorylegid's with 0 to denote not a flight
# pull all discounts and surcharges
d1 = core.filter((F.col('ChargeDetail').contains("""(D)""")) | (F.col('ChargeDetail').contains("Fee OR")))
# pull all core fees
core2 = core.filter((~F.col('ChargeDetail').contains("""(D)""")) & (~F.col('ChargeDetail').contains("Fee OR")))
# multiply discounts and surcharges by -1 to correct sign
d1 = d1.withColumn('ChargeAmount', F.col('chargeAmount')*-1)
purchased = purchased.union(d1) #recombine datasets
purchased = purchased.groupby('PassengerID','ChargeCode','FeeNumber','RecordLocator','BookingID','DepartureDate','SegmentNumber','DepartureStation','ArrivalStation','InventoryLegID').sum('ChargeAmount') #sum charges based on grouping to get actual charge amount
#reduce down to necessary columns to filter out the assign dataset
purchased = purchased[['PassengerID','RecordLocator','InventoryLegID']]
purchased = purchased.dropDuplicates() #drop duplication
familyPNR = spark.sql('select * from all_passengers')
#left anti to remove passenegers who purchased
familyPNR = familyPNR.join(purchased, ['PassengerID','RecordLocator','InventoryLegID'], "left_anti") 
familyPNR.write.mode("overwrite").saveAsTable("all_passengers") #write to delta updated 

### Children in Exit Rows Reassign

In [0]:
''' Before assignment we also want to add in and identify all passengers that didn't purchase a seat that
are in the exit rows. This is most often the result of an equipment change. To fix this we will first identify them,
then remove their seat assignment on the assign dataset.'''

exit_row = spark.sql('select * from all_passengers')
exit_row_cols = exit_row.columns # pull exit row columns to put data back in correct order later
exit_row = exit_row.withColumn('Row',F.expr("substring(UnitDesignator, 1, length(UnitDesignator)-1)")) #pull row number
#this creates and identifier combining the plane type and the row. This will then be filtered
exit_row = exit_row.withColumn('ExitFilter', F.concat_ws("-",exit_row.Capacity,exit_row.Row))
# pull all families with seats in exit rows
exit_row = exit_row.filter(F.col('ExitFilter').isin('''<insert list of exit row seats and equipment types>'''))

'''Finally filter out exit row PNR's. We only want to reassign if the group in the exit row has a kid in it.'''
kids = exit_row.filter(F.col('Age')<15)
kids = kids[['RecordLocator','Row']]
kids = kids.dropDuplicates()
exit_row = exit_row.join(kids, ['RecordLocator','Row'])# join on kids filter
exit_row = exit_row.sort(F.col('RecordLocator'),F.col('Age').desc())
# remove the existing seat assignment from the pax data so it can be reassigned as if it had no assignment
exit_row = exit_row.withColumn('UnitDesignator', F.lit('')) 
exit_row = exit_row[[exit_row_cols]] # re-match column order for main dataset
#create filter of what rows from original dataset to removed so there is no duplication 
exit_row_peeps = exit_row[['PassengerID','InventoryLegID']] 
familyPNR = spark.sql('select * from all_passengers')
familyPNR = familyPNR.join(exit_row_peeps, ['PassengerID','InventoryLegID'], 'left_anti')
familyPNR = familyPNR[[exit_row_cols]]
familyPNR = familyPNR.union(exit_row) # add in new exit row that we will reassign
familyPNR.write.mode("overwrite").saveAsTable("all_passengers")

## Seat Assignment Dataset

In [0]:
'''this section imports the seat maps we have created and builds out the dataset of all possible seats
that could be assigned for a given aircraft type and sorts them by the priority we wish to seat people in by load factor.'''

# import seat maps
seat_map = spark.sql('select * from <insert seat_map dataset>') #pull up seat map for each plane type
# takes seat map and breaks into individual seats for each flight type
seat_map = seat_map.withColumn('UnitDesignator', F.split('UnitDesignator', ',')).withColumn('Priority', F.split('Priority', ','))
# build out list of all available seats for each aircraft type
seat_priority_seats = seat_map.select(seat_map.Equipment,seat_map.LF,F.explode(seat_map.UnitDesignator).alias('UnitDesignator'))
seat_priority_seats = seat_priority_seats.withColumn('id',F.monotonically_increasing_id())
# build out list of seat priorities
seat_priority_prio = seat_map.select(seat_map.Equipment,seat_map.LF,F.explode(seat_map.Priority).alias('Priority'))
seat_priority_prio = seat_priority_prio.withColumn('id',F.monotonically_increasing_id())
#merge data sets back with correct priority for seat and the seat designator
seat_priority = seat_priority_seats.join(seat_priority_prio,['id','Equipment','LF'])
seat_priority = seat_priority.drop('id')
seat_priority = seat_priority.withColumn('Priority',F.col('Priority').cast(IntegerType()))


In [0]:

'''this section pulls all passengers and gives us a list of those with seats to subtract from our available seats
as well as giving us a list of our current load factor for those flights'''

takenseat_query = """ sql query to pull all seats that are currently taken for any reason"""

takenseat = spark.read.jdbc(url=jdbcUrl, table=takenseat_query)
takenseat.write.mode("overwrite").saveAsTable("empty_seats")

## Through Flight

In [0]:
'''Begin subtracting seats taken from available. This gives us a list of usable seats we can assign. 
Seat Map is set as Low LF to ensure that we are not back loading the lower of the two LF plane legs'''

'''Through flights or flights that have the same flight number and will have passengers on more than one segment of them
need to be treated differently since we don't want to be moving passengers around if they are staying on same plane.
Goal is to create a ven diagram of seats taken on both parts of the flight and leave only seats open on both parts to seat
passengers that are on both legs. If a passenger is only on one leg it gets treated like a normal assignment'''

#quick clean and aggregation of the passenger info to calculate load factor
through = spark.sql('select * from empty_seats') #pull all passengers to be assigned
through = through[['InventoryLegID','InventoryKey']] #pull inventory key and inventory leg combo
# drop duplicates so that we can get the number of distinct inventorylegID's per InventoryKey
through = through.dropDuplicates()
#Create df of InventoryKeys with their count of InventoryLegs and a list column of their inventory keys.
through = through.groupby('InventoryKey').agg(F.count('InventoryLegID').alias('count'),F.collect_list(through.InventoryLegID).alias('legs'))
# according to networking we should never have a through flight with more than 2 legs.
do_not_assign = through.filter(F.col('count')>2) 
through = through.filter(F.col('count')==2) #pull all through flights ours only have 2 legs.
through_filt = through[['InventoryKey']]
through_filt = through_filt.dropDuplicates()

### Through Flight Seat Group Setup

In [0]:
'''Using the through filter we will grab the through flights then filter out al seats taken on both of the inventory legs. This creates basically a ven diagram of the seats available between the two flights. We will assign only to these seats.'''

#dataset pullin
capacity = spark.sql('select * from empty_seats') 
#pull in list of all pax that have a seat and all that do not have a seat

t_flight_info = capacity[['InventoryKey','Capacity']] #grab unique flightlist
t_flight_info = t_flight_info.dropDuplicates()
#hit flight list with the through flight filter
t_flight_info = t_flight_info.join(through_filt, ['InventoryKey']) 
t_flight_info = t_flight_info.withColumnRenamed('Capacity','Equipment')
#setting all flights as low loadfactor because pattern seems to be one leg is high the other is low.
t_flight_info = t_flight_info.withColumn('LF', F.lit('Low')) 
t_flight_info = t_flight_info.join(seat_map, ['Equipment','LF']) #join seatmap to flights
t_allseats = t_flight_info.select(t_flight_info.InventoryKey, t_flight_info.Equipment,t_flight_info.LF,F.explode(t_flight_info.UnitDesignator).alias('UnitDesignator')) #create unique row for each seat

#split out seats that have already been chosen
taken_seats = capacity.filter(F.col('UnitDesignator') != '') #filter out passengers without seats
taken_seats = taken_seats[['InventoryKey','UnitDesignator']] #create dataset of flight and taken seats
taken_seats = groupset(taken_seats,taken_seats.InventoryKey, taken_seats.UnitDesignator, 'Taken')
#explode list into individual rows
taken = taken_seats.select(taken_seats.InventoryKey,F.explode(taken_seats.Taken).alias('UnitDesignator')) 


#filter down to seats left
#remove taken seats from seat map on each flight
availableseats = t_allseats.join(taken, ['InventoryKey','UnitDesignator'], "left_anti") 
availableseats = seqseats(availableseats,seat_priority,'through') #use defined function to save code space

availableseats.write.mode("overwrite").saveAsTable("seat_flights_through")

### Through Flight Pax Group Setup

In [0]:
'''This takes the through filter and will grab out passengers that are only on both legs of the through flight. 
If they are not on both, they can be assigned like any other passenger.'''

familyPNR = spark.sql('select * from all_passengers')
through_family = familyPNR.join(through_filt, ['InventoryKey']) # hit passengers with through flight list
# create count of how many legs of the through flight they are on
through_family = through_family.groupby('PassengerID','InventoryKey').count()
#get rid of passengers who are only on one leg of flight
through_family = through_family.filter(F.col('count')==2).drop('count') 
#create final dataset for through pax based on passenger has to be on both legs and has to be a through flight
familyPNR = familyPNR.join(through_family, ['PassengerID','InventoryKey']) 
familyPNR = familyPNR[['PassengerID','RecordLocator','InventoryKey','Age','UnitDesignator']]
familyPNR = familyPNR.dropDuplicates()

df = paxgroup(familyPNR,'through') # run defined function to build seat groups
df.write.mode("overwrite").saveAsTable("seatgroups_through")

### Through Seat Assignment

#### Large Group

In [0]:
df3 = spark.sql('select * from seatgroups_through') #pull seat groups
df3 = df3.filter(F.col('squadsize')>3) #grab only groups large enough to go across the aisle
#create column noting the minimum group size we will be assigning
paxlist = df3.groupby('InventoryKey').agg(F.min('squadsize').alias('mincap')) 
flights = spark.sql('select * from seat_flights_through') #pull seat list
flights = flights.join(paxlist, ['InventoryKey']) #filter flights to
#remove temporarily any seat groups that are too small for the family groups being assigned
flights = flights.filter(F.col('seqcap')>=F.col('mincap')) 
df3_window = Window.partitionBy("InventoryKey") #create partition
df3 = df3.sort(F.col('squadsize').desc())
#create index of passenger groups resetting for each inventorykey
df3_indexed = df3.withColumn("count", F.count("*").over(df3_window)).withColumn("id", F.row_number().over(df3_window.orderBy(df3.squadsize.desc()))) 
flights_window = Window.partitionBy("InventoryKey")
flights = flights.sort(F.col('seqcap').desc())
 #create index of flights resetting for each inventorykey
flights_indexed = flights.withColumn("count", F.count("*").over(flights_window)).withColumn("id", F.row_number().over(flights_window.orderBy(flights.seqcap.desc())))
assigned = flights_indexed.join(df3_indexed, ['id','InventoryKey']) #create assignments through joins
assigned = assigned.drop('count')
assigned = assigned.filter(F.col('seqcap')>=F.col('squadsize'))
assigned.write.mode("overwrite").saveAsTable("seat_assign_through")

#### Data Clean for Main Assignment

In [0]:
testassign = spark.sql('select * from seat_assign_through')
testassign = testassign.withColumn('remainder', F.col('seqcap')-F.col('squadsize')) #calculate remainder number of seats
#if seat remainder is > 0 and squad size is >=2 lock in assignment
lockin = testassign.filter((F.col('remainder')>=0) & (F.col('squadsize')>=2)) 
lockin = lockin.withColumn("Seats", F.when(F.col('squadsize')==2,F.array([F.col("seqseats")[0], F.col("seqseats")[1]]))
                           .when(F.col('squadsize')==3,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2]]))
                           .when(F.col('squadsize')==4,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2],F.col("seqseats")[3]]))
                           .when(F.col('squadsize')==5,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2],F.col("seqseats")[3],F.col("seqseats")[4]]))
                           .otherwise(F.col('seqseats')))
lockin.write.mode("overwrite").saveAsTable("seat_lockin_through")
newseatlist = lockin.withColumn('unused', F.array_except('seqseats', 'seats'))
newseatlist = newseatlist.select(newseatlist.InventoryKey,newseatlist.Row,newseatlist.Priority,newseatlist.rowcap,F.explode(newseatlist.unused)) #explode list into individual rows
newseatlist = newseatlist.withColumn('Seats',F.col('col').substr(-1,1)) #pull seat letter
newseatlist = newseatlist.groupby('InventoryKey','Row','Priority','rowcap').agg(F.concat_ws(",", F.collect_list(newseatlist.col)),F.concat_ws(",", F.collect_list(newseatlist.Seats))) #put sum seats into row
newseatlist = newseatlist.withColumnRenamed('concat_ws(,, collect_list(col))','EmptySeats').withColumnRenamed('concat_ws(,, collect_list(Seats))','Seats')
newseatlist = newseatlist.withColumn('seqseats', F.col('EmptySeats'))
newseatlist = newseatlist.withColumn('EmptySeats', F.split('EmptySeats', ',')).withColumn('seqseats', F.split('seqseats', ',')) #string to list
newseatlist = newseatlist.withColumn('seqcap',F.size(F.col('seqseats'))) #create row capacity size
newseatlist = newseatlist[['InventoryKey','Row','EmptySeats','Seats','Priority','rowcap','seqcap','seqseats']]
filt = newseatlist[['InventoryKey','Row']]
old_list = spark.sql('select * from seat_flights_through') #pull seat list
old_list = old_list.join(filt,['InventoryKey','Row'], how = 'left_anti')
availableseats = newseatlist.union(old_list)
availableseats = availableseats.filter(F.col('seqcap')>0)
availableseats.write.mode("overwrite").saveAsTable("seat_flights_through")

#### Main Assignment

In [0]:
''' this section of code take the remaining open seats from the large group assignments and breaks them 
along the center aisle. This is to prevent any small groups of 2 or 3 from being separated by the aisle 
even if the seats are technically sequential. It then assigns passengers to this new seat grouping'''

df3 = spark.sql('select * from seatgroups_through') #pull seat groups
df3 = df3.filter(F.col('squadsize')<4)
paxlist = df3.groupby('InventoryKey').agg(F.min('squadsize').alias('mincap'))
flights = spark.sql('select * from seat_flights_through') #pull seat list
flights = flights.withColumn("seqseats", \
    F.when(F.col('Seats').contains('A,B,C'),F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2]]))
    .when(F.col('Seats').contains('F,E,D'),F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2]]))
    .when(F.col('Seats').contains('C,B,A'),F.array([F.element_at(F.col('seqseats'), -3), F.element_at(F.col('seqseats'), -2),F.element_at(F.col('seqseats'), -1)]))
    .when(F.col('Seats').contains('D,E,F'),F.array([F.element_at(F.col('seqseats'), -3), F.element_at(F.col('seqseats'), -2),F.element_at(F.col('seqseats'), -1)]))
    .when(F.col('Seats').contains('B,C'),F.array([F.col("seqseats")[0], F.col("seqseats")[1]]))
    .when(F.col('Seats').contains('E,D'),F.array([F.col("seqseats")[0], F.col("seqseats")[1]]))
    .when(F.col('Seats').contains('C,B'),F.array([ F.element_at(F.col('seqseats'), -2),F.element_at(F.col('seqseats'), -1)]))
    .when(F.col('Seats').contains('D,E'),F.array([ F.element_at(F.col('seqseats'), -2),F.element_at(F.col('seqseats'), -1)]))
    .when(F.col('Seats').contains('D,C'),F.array([F.col("seqseats")[0]]))
    .when(F.col('Seats').contains('C,D'),F.array([F.col("seqseats")[0]]))
    .otherwise(F.col("seqseats")))
flights = flights.withColumn('seqcap',F.size(F.col('seqseats'))) #create row capacity size
flights = flights.join(paxlist, ['InventoryKey'])
flights = flights.filter(F.col('seqcap')>=F.col('mincap'))
df3_window = Window.partitionBy("InventoryKey")
df3 = df3.sort(F.col('squadsize').desc())
df3_indexed = df3.withColumn("count", F.count("*").over(df3_window)).withColumn("id", F.row_number().over(df3_window.orderBy(df3.squadsize.desc())))
flights_window = Window.partitionBy("InventoryKey")
flights = flights.sort(F.col('seqcap').desc())
flights_indexed = flights.withColumn("count", F.count("*").over(flights_window)).withColumn("id", F.row_number().over(flights_window.orderBy(flights.Priority.desc())))
assigned = flights_indexed.join(df3_indexed, ['id','InventoryKey'])
assigned = assigned.drop('count')
assigned = assigned.filter(F.col('seqcap')>=F.col('squadsize'))
assigned.write.mode("overwrite").saveAsTable("seat_assign_through")

#### Expand Through Data to Leg Data

In [0]:
#Create final lockin for through flights
testassign = spark.sql('select * from seat_assign_through')
#calculate remainder number of seats
testassign = testassign.withColumn('remainder', F.col('seqcap')-F.col('squadsize')) 
#if seat remainder is > 0 and squad size is >=2 lock in assignment
lockin = testassign.filter((F.col('remainder')>=0) & (F.col('squadsize')>=2)) 
lockin = lockin.withColumn("Seats", F.when(F.col('squadsize')==2,F.array([F.col("seqseats")[0], F.col("seqseats")[1]]))
                           .when(F.col('squadsize')==3,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2]]))
                           .when(F.col('squadsize')==4,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2],F.col("seqseats")[3]]))
                           .when(F.col('squadsize')==5,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2],F.col("seqseats")[3],F.col("seqseats")[4]]))
                           .otherwise(F.col('seqseats')))
prev_lockin = spark.sql('select * from seat_lockin_through')
running_lockin = prev_lockin.union(lockin)
running_lockin.write.mode("overwrite").saveAsTable("seat_lockin_through")

'''utilize the filter for through flights. This has both InventoryKey and InventoryLegID's assigned to them. 
By exploding out the list, you can do a simple merge to take the through flight lockin to the same format 
as the regular assignment lockin. This will then put it into the proper format for final assignment 
and allow us to subtract out these seats from the individual inventory legs in the next section.'''

through_merge = through.select(through.InventoryKey,F.explode(through.legs).alias('InventoryLegID'))
lockedin = spark.sql('select * from seat_lockin_through')
expanded = lockedin.join(through_merge, ['InventoryKey'])
cols = spark.sql('select * from seat_lockin')
cols = cols.columns
expanded = expanded[[cols]]
expanded.write.mode("overwrite").option('overwriteschema','true').saveAsTable("seat_lockin")

## Single Leg Assignments

In [0]:
'''This section is a repeat of the through flight family code. But is for passengers only travelling a single leg. 
Thus it repeats the code but substitues inventorylegid for inventorykey.'''

Out[19]: 'This section is a repeat of the through flight family code. But is for passengers only travelling a single leg. Thus it repeats the code but substitues inventorylegid for inventorykey.'

### Single Leg Flight Seat Group Setup

In [0]:
#quick clean and aggregation of the passenger info to calculate load factor
pax = spark.sql('select * from empty_seats')

lf = pax.dropDuplicates(subset = ['PassengerID','InventoryLegID']) #remove duplications
lf = lf.groupby('InventoryLegID').count() #group passengers by flight
lf = lf.withColumnRenamed('count','Pax') #rename of count to Pax

#isolation of flight info dataset
flight_info = pax[['DepartureDate','InventoryLegID','Capacity']] #pull inventory leg id and capacity
flight_info = flight_info.drop_duplicates(subset = ['InventoryLegID']) #remove duplicates to get unique flights
flight_info = flight_info.withColumn('Date', F.lit(date.today())).withColumn('DTD',F.datediff(F.col("DepartureDate"),F.col('Date')))
flight_info = flight_info.withColumnRenamed('Capacity','Equipment')
flight_info = flight_info.drop('Date','DepartureDate')
flight_info = flight_info.join(lf, ['InventoryLegID']) #join with count of passengers
flight_info = flight_info.withColumn('Pax%',F.round(F.col('Pax')/F.col('Equipment'),2)) #raw % creation
flight_info = flight_info.withColumn('LF', F.when((F.col('Pax%')<.35) & (F.col('DTD')>10), 'Low').otherwise(F.lit('High')))
flight_info = flight_info.withColumn('LF', F.when((F.col('Pax%')<.45) & (F.col('DTD')>5), 'Low').otherwise(F.col('LF')))
flight_info = flight_info.withColumn('LF', F.when((F.col('Pax%')<.51) & (F.col('DTD')>2), 'Low').otherwise(F.col('LF')))
flight_info = flight_info.join(seat_map, ['Equipment','LF']) #join seatmap to flights
allseats = flight_info.select(flight_info.DTD, flight_info.InventoryLegID,flight_info.Equipment,flight_info.LF,F.explode(flight_info.UnitDesignator)) #create unique row for each seat
allseats = allseats.withColumnRenamed('col','UnitDesignator')

taken_seats = pax.filter(F.col('UnitDesignator') != '') #filter out passengers without seats
taken_seats = taken_seats[['InventoryLegID','UnitDesignator']] #create dataset of flight and taken seats
taken_seats = groupset(taken_seats,taken_seats.InventoryLegID, taken_seats.UnitDesignator, 'Taken')
taken = taken_seats.select(taken_seats.InventoryLegID,F.explode(taken_seats.Taken)) #explode list into individual rows
taken = taken.withColumnRenamed('col','UnitDesignator')

'''Take all possible seats that can be assigned and subtract out taken seats. There will be a second layer of this to subtract out seats taken by the through seat assigments.'''

#remove taken seats from seat map on each flight
availableseats = allseats.join(taken, ['InventoryLegID','UnitDesignator'], "left_anti")
availableseats = seqseats(availableseats,seat_priority,'single_leg')

availableseats.write.mode("overwrite").saveAsTable("seat_flights")

In [0]:
''' This section uses the established code for removing used seats from the final seats dataset. 
In between the assignments. It takes in the seats that are already locked in from the through 
flight assignment and subtracts them for the individual inventory legs data. 
This is the last bit of data prep to prepare for the available seats side of the main assignment.'''

lockin = spark.sql('select * from seat_lockin') # pull in locked in seating assignments for through flights
used_seats = lockin[['InventoryLegID','Row','Seats']] # pull seats that have been used
used_seats = used_seats.withColumnRenamed('Seats','UsedSeats')
allseats = spark.sql('select * from seat_flights') # pull in full seat list for flights.
newseatlist = allseats.join(used_seats,['InventoryLegID','Row']) # join all and taken seats together
newseatlist = newseatlist.withColumn('unused', F.array_except('seqseats', 'UsedSeats')) #subtract used seats
 #explode list into individual rows
newseatlist = newseatlist.select(newseatlist.InventoryLegID,newseatlist.Row,newseatlist.Priority,newseatlist.rowcap,F.explode(newseatlist.unused))
newseatlist = newseatlist.withColumn('Seats',F.col('col').substr(-1,1)) #pull seat letter
newseatlist = newseatlist.groupby('InventoryLegID','Row','Priority','rowcap').agg(F.concat_ws(",", F.collect_list(newseatlist.col)),F.concat_ws(",", F.collect_list(newseatlist.Seats))) #put sum seats into row
newseatlist = newseatlist.withColumnRenamed('concat_ws(,, collect_list(col))','EmptySeats').withColumnRenamed('concat_ws(,, collect_list(Seats))','Seats')
newseatlist = newseatlist.withColumn('seqseats', F.col('EmptySeats')) #create new empty seats column
newseatlist = newseatlist.withColumn('EmptySeats', F.split('EmptySeats', ',')).withColumn('seqseats', F.split('seqseats', ',')) #string to list
newseatlist = newseatlist.withColumn('seqcap',F.size(F.col('seqseats'))) #create row capacity size
newseatlist = newseatlist[['InventoryLegID','Row','EmptySeats','Seats','Priority','rowcap','seqcap','seqseats']]
filt = newseatlist[['InventoryLegID','Row']]
old_list = spark.sql('select * from seat_flights') #pull seat list
 #take original dataset and remove all rows that have been adjusted
old_list = old_list.join(filt,['InventoryLegID','Row'], how = 'left_anti')
availableseats = newseatlist.union(old_list) # add back in adjusted rows
# remove any empty rows that might be in there from the roster.
availableseats = availableseats.filter(F.col('seqcap')>0)
availableseats.write.mode("overwrite").saveAsTable("seat_flights")

### Single Leg Passenger Setup

In [0]:
#Create Family Seating Groups
familyPNR = spark.sql('select * from all_passengers')
through_family = familyPNR.join(through_filt, ['InventoryKey'])
through_family = through_family.groupby('PassengerID','InventoryKey').count()
through_family = through_family.filter(F.col('count')==2).drop('count')
# remove all the through passengers so that they are not double assigned.
familyPNR = familyPNR.join(through_family, ['PassengerID','InventoryKey'], how = 'left_anti') 

df = paxgroup(familyPNR,'single_leg') # run defined function to build seat groups
df3.write.mode("overwrite").saveAsTable("seatgroups")

### Seat Assignment

#### Large Group

In [0]:
df3 = spark.sql('select * from seatgroups') #pull seat groups
df3 = df3.filter(F.col('squadsize')>3)
paxlist = df3.groupby('InventoryLegID').agg(F.min('squadsize').alias('mincap'))
flights = spark.sql('select * from seat_flights') #pull seat list
flights = flights.join(paxlist, ['InventoryLegID'])
flights = flights.filter(F.col('seqcap')>=F.col('mincap'))
df3_window = Window.partitionBy("InventoryLegID")
df3 = df3.sort(F.col('squadsize').desc())
df3_indexed = df3.withColumn("count", F.count("*").over(df3_window)).withColumn("id", F.row_number().over(df3_window.orderBy(df3.squadsize.desc())))
flights_window = Window.partitionBy("InventoryLegID")
flights = flights.sort(F.col('seqcap').desc())
flights_indexed = flights.withColumn("count", F.count("*").over(flights_window)).withColumn("id", F.row_number().over(flights_window.orderBy(flights.seqcap.desc())))
assigned = flights_indexed.join(df3_indexed, ['id','InventoryLegID'])
assigned = assigned.drop('count')
assigned = assigned.filter(F.col('seqcap')>=F.col('squadsize'))
assigned.write.mode("overwrite").option('overwriteschema', 'true').saveAsTable("seat_assign")

In [0]:
''' data clean section to prep available seats for next round of assignments'''
testassign = spark.sql('select * from seat_assign')
testassign = testassign.withColumn('remainder', F.col('seqcap')-F.col('squadsize')) #calculate remainder number of seats
#if seat remainder is > 0 and squad size is >=2 lock in assignment
lockin = testassign.filter((F.col('remainder')>=0) & (F.col('squadsize')>=2)) 
lockin = lockin.withColumn("Seats", F.when(F.col('squadsize')==2,F.array([F.col("seqseats")[0], F.col("seqseats")[1]]))
                           .when(F.col('squadsize')==3,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2]]))
                           .when(F.col('squadsize')==4,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2],F.col("seqseats")[3]]))
                           .when(F.col('squadsize')==5,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2],F.col("seqseats")[3],F.col("seqseats")[4]]))
                           .otherwise(F.col('seqseats')))
prev_lockin = spark.sql('select * from seat_lockin')
running_lockin = prev_lockin.union(lockin)
running_lockin.write.mode("overwrite").saveAsTable("seat_lockin")
lockin = spark.sql('select * from seat_lockin')
used_seats = lockin[['InventoryLegID','Row','Seats']]
used_seats = used_seats.withColumnRenamed('Seats','UsedSeats')
allseats = spark.sql('select * from seat_flights')
newseatlist = allseats.join(used_seats,['InventoryLegID','Row'])
newseatlist = newseatlist.withColumn('unused', F.array_except('seqseats', 'UsedSeats'))
newseatlist = newseatlist.select(newseatlist.InventoryLegID,newseatlist.Row,newseatlist.Priority,newseatlist.rowcap,F.explode(newseatlist.unused)) #explode list into individual rows
newseatlist = newseatlist.withColumn('Seats',F.col('col').substr(-1,1)) #pull seat letter
newseatlist = newseatlist.groupby('InventoryLegID','Row','Priority','rowcap').agg(F.concat_ws(",", F.collect_list(newseatlist.col)),F.concat_ws(",", F.collect_list(newseatlist.Seats))) #put sum seats into row
newseatlist = newseatlist.withColumnRenamed('concat_ws(,, collect_list(col))','EmptySeats').withColumnRenamed('concat_ws(,, collect_list(Seats))','Seats')
newseatlist = newseatlist.withColumn('seqseats', F.col('EmptySeats'))
newseatlist = newseatlist.withColumn('EmptySeats', F.split('EmptySeats', ',')).withColumn('seqseats', F.split('seqseats', ',')) #string to list
newseatlist = newseatlist.withColumn('seqcap',F.size(F.col('seqseats'))) #create row capacity size
newseatlist = newseatlist[['InventoryLegID','Row','EmptySeats','Seats','Priority','rowcap','seqcap','seqseats']]
filt = newseatlist[['InventoryLegID','Row']]
old_list = spark.sql('select * from seat_flights') #pull seat list
old_list = old_list.join(filt,['InventoryLegID','Row'], how = 'left_anti')
availableseats = newseatlist.union(old_list)
availableseats = availableseats.filter(F.col('seqcap')>0)
availableseats.write.mode("overwrite").saveAsTable("seat_flights")

#### Main Assignment

In [0]:
df3 = spark.sql('select * from seatgroups') #pull seat groups
df3 = df3.filter(F.col('squadsize')<4)
paxlist = df3.groupby('InventoryLegID').agg(F.min('squadsize').alias('mincap'))
flights = spark.sql('select * from seat_flights') #pull seat list
flights = flights.withColumn("seqseats", \
    F.when(F.col('Seats').contains('A,B,C'),F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2]]))
    .when(F.col('Seats').contains('F,E,D'),F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2]]))
    .when(F.col('Seats').contains('C,B,A'),F.array([F.element_at(F.col('seqseats'), -3), F.element_at(F.col('seqseats'), -2),F.element_at(F.col('seqseats'), -1)]))
    .when(F.col('Seats').contains('D,E,F'),F.array([F.element_at(F.col('seqseats'), -3), F.element_at(F.col('seqseats'), -2),F.element_at(F.col('seqseats'), -1)]))
    .when(F.col('Seats').contains('B,C'),F.array([F.col("seqseats")[0], F.col("seqseats")[1]]))
    .when(F.col('Seats').contains('E,D'),F.array([F.col("seqseats")[0], F.col("seqseats")[1]]))
    .when(F.col('Seats').contains('C,B'),F.array([ F.element_at(F.col('seqseats'), -2),F.element_at(F.col('seqseats'), -1)]))
    .when(F.col('Seats').contains('D,E'),F.array([ F.element_at(F.col('seqseats'), -2),F.element_at(F.col('seqseats'), -1)]))
    .when(F.col('Seats').contains('D,C'),F.array([F.col("seqseats")[0]]))
    .when(F.col('Seats').contains('C,D'),F.array([F.col("seqseats")[0]]))
    .otherwise(F.col("seqseats")))
flights = flights.withColumn('seqcap',F.size(F.col('seqseats'))) #create row capacity size
flights = flights.join(paxlist, ['InventoryLegID'])
flights = flights.filter(F.col('seqcap')>=F.col('mincap'))
df3_window = Window.partitionBy("InventoryLegID")
df3 = df3.sort(F.col('squadsize').desc())
df3_indexed = df3.withColumn("count", F.count("*").over(df3_window)).withColumn("id", F.row_number().over(df3_window.orderBy(df3.squadsize.desc())))
flights_window = Window.partitionBy("InventoryLegID")
flights = flights.sort(F.col('seqcap').desc())
flights_indexed = flights.withColumn("count", F.count("*").over(flights_window)).withColumn("id", F.row_number().over(flights_window.orderBy(flights.Priority.desc())))
assigned = flights_indexed.join(df3_indexed, ['id','InventoryLegID'])
assigned = assigned.drop('count')
assigned = assigned.filter(F.col('seqcap')>=F.col('squadsize'))
assigned.write.mode("overwrite").saveAsTable("seat_assign")

In [0]:
testassign = spark.sql('select * from seat_assign')
testassign = testassign.withColumn('remainder', F.col('seqcap')-F.col('squadsize')) #calculate remainder number of seats
lockin = testassign.filter((F.col('remainder')>=0) & (F.col('squadsize')>=2)) #if seat remainder is > 0 and squad size is >=2 lock in assignment
lockin = lockin.withColumn("Seats", F.when(F.col('squadsize')==2,F.array([F.col("seqseats")[0], F.col("seqseats")[1]])).when(F.col('squadsize')==3,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2]])).when(F.col('squadsize')==4,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2],F.col("seqseats")[3]])).when(F.col('squadsize')==5,F.array([F.col("seqseats")[0], F.col("seqseats")[1],F.col("seqseats")[2],F.col("seqseats")[3],F.col("seqseats")[4]])).otherwise(F.col('seqseats')))
prev_lockin = spark.sql('select * from seat_lockin')
running_lockin = prev_lockin.union(lockin)
running_lockin = running_lockin.dropDuplicates()
running_lockin.write.mode("overwrite").saveAsTable("seat_lockin")
lockin = spark.sql('select * from seat_lockin')
used_seats = lockin[['InventoryLegID','Row','Seats']]
used_seats = used_seats.withColumnRenamed('Seats','UsedSeats')
allseats = spark.sql('select * from seat_flights')
newseatlist = allseats.join(used_seats,['InventoryLegID','Row'])
newseatlist = newseatlist.withColumn('unused', F.array_except('seqseats', 'UsedSeats'))
newseatlist = newseatlist.select(newseatlist.InventoryLegID,newseatlist.Row,newseatlist.Priority,newseatlist.rowcap,F.explode(newseatlist.unused)) #explode list into individual rows
newseatlist = newseatlist.withColumn('Seats',F.col('col').substr(-1,1)) #pull seat letter
newseatlist = newseatlist.groupby('InventoryLegID','Row','Priority','rowcap').agg(F.concat_ws(",", F.collect_list(newseatlist.col)),F.concat_ws(",", F.collect_list(newseatlist.Seats))) #put sum seats into row
newseatlist = newseatlist.withColumnRenamed('concat_ws(,, collect_list(col))','EmptySeats').withColumnRenamed('concat_ws(,, collect_list(Seats))','Seats')
newseatlist = newseatlist.withColumn('seqseats', F.col('EmptySeats'))
newseatlist = newseatlist.withColumn('EmptySeats', F.split('EmptySeats', ',')).withColumn('seqseats', F.split('seqseats', ',')) #string to list
newseatlist = newseatlist.withColumn('seqcap',F.size(F.col('seqseats'))) #create row capacity size
newseatlist = newseatlist[['InventoryLegID','Row','EmptySeats','Seats','Priority','rowcap','seqcap','seqseats']]
filt = newseatlist[['InventoryLegID','Row']]
old_list = spark.sql('select * from seat_flights') #pull seat list
old_list = old_list.join(filt,['InventoryLegID','Row'], how = 'left_anti')
availableseats = newseatlist.union(old_list)
availableseats = availableseats.filter(F.col('seqcap')>0)
availableseats.write.mode("overwrite").saveAsTable("seat_flights")

## Pax Number Assignment

In [0]:
filt = spark.sql('select * from seat_lockin')
filt = filt[['RecordLocator']]
filt = filt.dropDuplicates(subset = ['RecordLocator'])

In [0]:
#paxnumber query
familyPNR_query = """ sql query for what record locators will be assigned"""
familyPNR = spark.read.jdbc(url=jdbcUrl, table=familyPNR_query)
familyPNR.write.mode("overwrite").saveAsTable("pax_number_raw")

In [0]:
pax_numbers = spark.sql('select PassengerID,RecordLocator from pax_number_raw')
pax_numbers = pax_numbers.join(filt, ['RecordLocator'])
pax_numbers.write.mode("overwrite").saveAsTable("pax_number_raw")
pax_numbers = spark.sql('select PassengerID,RecordLocator from pax_number_raw')
pax_numbers = paxnumbers(pax_numbers)
pax_numbers.write.mode("overwrite").saveAsTable("pax_numbers") #assign numbers to holdout dataset

## Final Assembly

In [0]:
running_lockin = spark.sql('select * from seat_lockin')
seats = running_lockin.select(running_lockin.InventoryLegID,running_lockin.RecordLocator,F.explode(running_lockin.Seats)) #explode seats
peoples = running_lockin.select(running_lockin.InventoryLegID,running_lockin.RecordLocator,F.explode(running_lockin.the_squad)) #explode passengers
seats = seats.withColumn("id", F.monotonically_increasing_id()) #unique id
seats = seats.withColumnRenamed("col", 'UnitDesignator')
peoples = peoples.withColumn("id", F.monotonically_increasing_id()) #unique id
peoples = peoples.withColumnRenamed("col", 'PassengerID')
final = seats.join(peoples,['id','InventoryLegID','RecordLocator']) #join seats to people
final = final.dropDuplicates() #drop duplications
numbers = spark.sql('select * from pax_numbers') #pull in passenger number
numbers = numbers.withColumn('PassengerNumber',F.col('PassengerNumber').astype(StringType()))
final = final.join(numbers,['RecordLocator','PassengerID']) #merge dataset to add passenger number

In [0]:
familyPNR_query = """ insert query for flight identification information"""

familyPNR = spark.read.jdbc(url=jdbcUrl, table=familyPNR_query)
familyPNR.write.mode("overwrite").option('overwriteschema','true').saveAsTable("pax_info_raw")

In [0]:
info_raw = spark.sql('select * from pax_info_raw')
flightdata = info_raw[['InventoryLegID','DepartureDate','FlightNumber','DepartureStation','ArrivalStation','STD','Capacity']]
flightdata = flightdata.dropDuplicates(subset = ['InventoryLegID'])

seat_loc = spark.sql('select * from seat_loc')
seat_loc = seat_loc.withColumnRenamed('LocationConcat','SeatLoc').withColumnRenamed('Seat','UnitDesignator')
final = final.join(flightdata, ['InventoryLegID'])  #join flight data
final = final.withColumnRenamed('RecordLocator','PNR')
final = final.withColumn('STDdate',F.to_date(F.col('STD'))).withColumn('STDtime', F.date_format('STD', 'HH:mm:ss'))
final = final.withColumn('STDdate',F.col('STDdate').cast(StringType())).withColumn('STDtime',F.col('STDtime').cast(StringType()))
final = final.withColumn('STD', F.concat(F.col('STDdate'),F.lit('T'), F.col('STDtime')))
final = final.withColumn('FlightNumber', F.col('FlightNumber').cast(StringType()))
final = final[['PNR','PassengerID','DepartureDate','DepartureStation','STD','ArrivalStation','FlightNumber','InventoryLegID','UnitDesignator','PassengerNumber']]
final = final.dropDuplicates()
final.write.mode("overwrite").saveAsTable("final_seat_assign")