In [None]:
'''
Given a dataset with columns PERSON, TYPE, and AGE,
create an output where the oldest adult is paired with the youngest child, producing pairs of ADULT and CHILD while ensuring appropriate data matching.

ðŸ’¡ Check out the input and output in the table below!

Input:--->

| PERSON | TYPE | AGE |
| ------ | ------ | --- |
| A1 | ADULT | 54 |
| A2 | ADULT | 53 |
| A3 | ADULT | 52 |
| A4 | ADULT | 58 |
| A5 | ADULT | 54 |
| C1 | CHILD | 20 |
| C2 | CHILD | 19 |
| C3 | CHILD | 22 |
| C4 | CHILD | 15 |


Expected Output:--->

| ADULT | CHILD |
| ----- | ----- |
| A4 | C4 |
| A5 | C2 |
| A1 | C1 |
| A2 | C3 |
| A3 | NULL |
'''

In [22]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, col, desc, asc
from pyspark.sql.window import Window

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder.master('local[*]').getOrCreate()

class AdultChildPair:
    
    def createData(self):
        column = ['person','type','age']
        data = [
            ("A1","ADULT",54),
            ("A2","ADULT",53),
            ("A3","ADULT",52),
            ("A4","ADULT",58),
            ("A5","ADULT",54),
            ("C1","CHILD",20),
            ("C2","CHILD",19),
            ("C3","CHILD",22),
            ("C4","CHILD",15)
        ]
        inputDf = spark.createDataFrame(data , column)
        return inputDf
    
    def adult_child_pair_sql(self, inputDf):
        inputDf.createOrReplaceTempView('adult_child')
        query = """
        WITH ADULT_CTE as (
        select person, row_number () over (order by age desc, person) as rnk
        from adult_child
        where type='ADULT'
        ),
        CHILD_CTE as (
        select person, row_number () over (order by age, person) as rnk 
        from adult_child
        where type='CHILD'
        )
        select 
        a1.person, c1.person
        from ADULT_CTE a1 full join CHILD_CTE c1
        on a1.rnk = c1.rnk
        """  
        return spark.sql(query)
    
    def adult_child_pair_pyspark(self, inputDf):
        adultWindowSpec = Window.orderBy(desc('age')).orderBy(asc('person'))
        
        adultDf = inputDf.filter(col('type').__eq__('ADULT')) \
            .withColumn('rnk', row_number().over(adultWindowSpec))
        
        adultDf.show()
                        
        childWindowSpec = Window.orderBy(asc('age')).orderBy(asc('person'))
        
        childDf = inputDf.filter(col('type').__eq__('CHILD'))\
                        .withColumn('rnk', row_number().over(childWindowSpec))
        childDf.show()

        resultDf = adultDf.alias('A').join(childDf.alias('C'), on='rnk', how='full')\
                        .select(col('A.person'),
                                col('C.person'))
        return resultDf
        
a1 = AdultChildPair()
inputDf = a1.createData()
inputDf.show(10)
#resultDf = a1.adult_child_pair_sql(inputDf)
#resultDf.show(10)

resultDf2 = a1.adult_child_pair_pyspark(inputDf)
resultDf2.show(10)

+------+-----+---+
|person| type|age|
+------+-----+---+
|    A1|ADULT| 54|
|    A2|ADULT| 53|
|    A3|ADULT| 52|
|    A4|ADULT| 58|
|    A5|ADULT| 54|
|    C1|CHILD| 20|
|    C2|CHILD| 19|
|    C3|CHILD| 22|
|    C4|CHILD| 15|
+------+-----+---+

+------+-----+---+---+
|person| type|age|rnk|
+------+-----+---+---+
|    A1|ADULT| 54|  1|
|    A2|ADULT| 53|  2|
|    A3|ADULT| 52|  3|
|    A4|ADULT| 58|  4|
|    A5|ADULT| 54|  5|
+------+-----+---+---+

+------+-----+---+---+
|person| type|age|rnk|
+------+-----+---+---+
|    C1|CHILD| 20|  1|
|    C2|CHILD| 19|  2|
|    C3|CHILD| 22|  3|
|    C4|CHILD| 15|  4|
+------+-----+---+---+

+------+------+
|person|person|
+------+------+
|    A1|    C1|
|    A2|    C2|
|    A3|    C3|
|    A4|    C4|
|    A5|  null|
+------+------+

