### DataFrame Transformations
---------
The material in this notebook was extracted from
* Spark The Definitive Guida Big Data Processing Made Simple (2018)
---------

When working with individual DataFrames, there are some fundamental objectives. These break down into several core operations:

- Add rows or columns
- Remove rows or columns
- Transform rows into columns (or vice versa)
- Sort data by values


#### Creating DataFrames

We can create DataFrames from raw data sources

In [4]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [1]:
df = spark.read\
.format('csv')\
.option('header', True)\
.option('inferSchema', True)\
.load('./data/flights.csv')
df.dtypes

                                                                                

[('YEAR', 'int'),
 ('MONTH', 'int'),
 ('DAY', 'int'),
 ('DAY_OF_WEEK', 'int'),
 ('AIRLINE', 'string'),
 ('FLIGHT_NUMBER', 'int'),
 ('TAIL_NUMBER', 'string'),
 ('ORIGIN_AIRPORT', 'string'),
 ('DESTINATION_AIRPORT', 'string'),
 ('SCHEDULED_DEPARTURE', 'int'),
 ('DEPARTURE_TIME', 'int'),
 ('DEPARTURE_DELAY', 'int'),
 ('TAXI_OUT', 'int'),
 ('WHEELS_OFF', 'int'),
 ('SCHEDULED_TIME', 'int'),
 ('ELAPSED_TIME', 'int'),
 ('AIR_TIME', 'int'),
 ('DISTANCE', 'int'),
 ('WHEELS_ON', 'int'),
 ('TAXI_IN', 'int'),
 ('SCHEDULED_ARRIVAL', 'int'),
 ('ARRIVAL_TIME', 'int'),
 ('ARRIVAL_DELAY', 'int'),
 ('DIVERTED', 'int'),
 ('CANCELLED', 'int'),
 ('CANCELLATION_REASON', 'string'),
 ('AIR_SYSTEM_DELAY', 'int'),
 ('SECURITY_DELAY', 'int'),
 ('AIRLINE_DELAY', 'int'),
 ('LATE_AIRCRAFT_DELAY', 'int'),
 ('WEATHER_DELAY', 'int')]

But we can also create DataFrames on the fly

In [6]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

mySchema = StructType([
    StructField('some', StringType(), True),
    StructField('col', StringType(), True),
    StructField('names', LongType(), False)
])

myRows = [Row('Hello', None, 1), 
          Row('World', '!', 2), 
          Row('HOW', None, 3)]


myDf = spark.createDataFrame(myRows, mySchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
|World|   !|    2|
|  HOW|null|    3|
+-----+----+-----+



##### Select and selectExpr

```select``` and ```selectExpr```allow you to do the DataFrame equivalent of SQL queries on a table

In [7]:
df.select('DESTINATION_AIRPORT').show(2)

+-------------------+
|DESTINATION_AIRPORT|
+-------------------+
|                SEA|
|                PBI|
+-------------------+
only showing top 2 rows



In [8]:
df.select('DESTINATION_AIRPORT', 'AIR_TIME').show(2)  

+-------------------+--------+
|DESTINATION_AIRPORT|AIR_TIME|
+-------------------+--------+
|                SEA|     169|
|                PBI|     263|
+-------------------+--------+
only showing top 2 rows



You can refer columns in multiple ways

In [9]:
from pyspark.sql.functions import expr, col, column, lit
df.select(
    expr('DESTINATION_AIRPORT'),
    col('ORIGIN_AIRPORT'),
    column('DISTANCE')).show(2)

+-------------------+--------------+--------+
|DESTINATION_AIRPORT|ORIGIN_AIRPORT|DISTANCE|
+-------------------+--------------+--------+
|                SEA|           ANC|    1448|
|                PBI|           LAX|    2330|
+-------------------+--------------+--------+
only showing top 2 rows



And with a little bit more flexibility

In [10]:
# df.select(expr(...))
df.selectExpr(
    '*', # Include all original columns
    'DESTINATION_AIRPORT as destiny',
    'DISTANCE*2 as round_trip').show(2)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+-------+----------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|destiny|round_trip|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+

22/05/03 13:38:17 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [11]:
df1 = df.cache()
df1.selectExpr(
    'DESTINATION_AIRPORT as destiny',
    'ORIGIN_AIRPORT',
    'DISTANCE',
    'DISTANCE < 1000 as near_by').filter(col('near_by')).show(5)

[Stage 11:>                                                         (0 + 1) / 1]

+-------+--------------+--------+-------+
|destiny|ORIGIN_AIRPORT|DISTANCE|near_by|
+-------+--------------+--------+-------+
|    DFW|           PHX|     868|   true|
|    DFW|           PHX|     868|   true|
|    SEA|           GEG|     224|   true|
|    ITO|           HNL|     216|   true|
|    SFO|           ONT|     363|   true|
+-------+--------------+--------+-------+
only showing top 5 rows



                                                                                

With ```selectExpr```we can also compute aggregates

In [12]:
df1.selectExpr('avg(DISTANCE) as mean_distance', 
               'count(distinct(ORIGIN_AIRPORT)) as dist_org', 
               'count(distinct(DESTINATION_AIRPORT)) as dist_dest').show()



+-----------------+--------+---------+
|    mean_distance|dist_org|dist_dest|
+-----------------+--------+---------+
|822.3564947305235|     628|      629|
+-----------------+--------+---------+



                                                                                

##### Adding Columns

We can add columns to a Spark DataFrame within the select statement:

In [13]:
df1.select(col('ORIGIN_AIRPORT'), 
           lit(33).alias('Two')).show(4)

+--------------+---+
|ORIGIN_AIRPORT|Two|
+--------------+---+
|           ANC| 33|
|           LAX| 33|
|           SFO| 33|
|           LAX| 33|
+--------------+---+
only showing top 4 rows



Or using the more formal function ```withColumn```

In [14]:
df.select(col('ORIGIN_AIRPORT')).withColumn('One', lit(1)).show(2)

+--------------+---+
|ORIGIN_AIRPORT|One|
+--------------+---+
|           ANC|  1|
|           LAX|  1|
+--------------+---+
only showing top 2 rows



The full functionality of ```withColumn```can be enhanced by using ```udfs```

##### Renaming columns

We can rename columns either with an alias, a ```withColumn``` or ```withColumnRenamed```

In [15]:
df.select(col('DESTINATION_AIRPORT').alias('destiny')).show(2)

+-------+
|destiny|
+-------+
|    SEA|
|    PBI|
+-------+
only showing top 2 rows



In [16]:
df.select('DESTINATION_AIRPORT').withColumn('destiny', col('DESTINATION_AIRPORT')).show(2)

+-------------------+-------+
|DESTINATION_AIRPORT|destiny|
+-------------------+-------+
|                SEA|    SEA|
|                PBI|    PBI|
+-------------------+-------+
only showing top 2 rows



In [17]:
df.select('DESTINATION_AIRPORT').withColumnRenamed('DESTINATION_AIRPORT', 'destiny').show(2)

+-------+
|destiny|
+-------+
|    SEA|
|    PBI|
+-------+
only showing top 2 rows



##### Removing columns

We can remove one or multiple columns with ```drop````


In [18]:
df.select(col('DESTINATION_AIRPORT').alias('dest'), 
          col('ORIGIN_AIRPORT').alias('origin'), 
          col('DISTANCE'), 
          expr('DISTANCE < 1000').alias('near_by')).drop('dest', 'origin').show(2)

+--------+-------+
|DISTANCE|near_by|
+--------+-------+
|    1448|  false|
|    2330|  false|
+--------+-------+
only showing top 2 rows



##### Changing a Column's type


In [19]:
df.select(col('DISTANCE').cast('float')).show(2)

+--------+
|DISTANCE|
+--------+
|  1448.0|
|  2330.0|
+--------+
only showing top 2 rows



##### Filtering rows

In [20]:
df.select(col('DESTINATION_AIRPORT'), 
          col('ORIGIN_AIRPORT'), 
          col('DISTANCE'), 
          expr('DISTANCE < 1000').alias('near_by'))\
.filter(col('near_by')).show(2)

+-------------------+--------------+--------+-------+
|DESTINATION_AIRPORT|ORIGIN_AIRPORT|DISTANCE|near_by|
+-------------------+--------------+--------+-------+
|                DFW|           PHX|     868|   true|
|                DFW|           PHX|     868|   true|
+-------------------+--------------+--------+-------+
only showing top 2 rows



In [21]:
df.groupBy(col('DESTINATION_AIRPORT'), 
           col('ORIGIN_AIRPORT')).count().filter(col('count') > 100).show(2)



+-------------------+--------------+-----+
|DESTINATION_AIRPORT|ORIGIN_AIRPORT|count|
+-------------------+--------------+-----+
|                ATL|           GSP| 2471|
|                ORD|           PDX| 2164|
+-------------------+--------------+-----+
only showing top 2 rows



                                                                                

In [22]:
df.groupBy(col('DESTINATION_AIRPORT'), 
           col('ORIGIN_AIRPORT')).count()\
.where(col('count') > 100).show(2)

+-------------------+--------------+-----+
|DESTINATION_AIRPORT|ORIGIN_AIRPORT|count|
+-------------------+--------------+-----+
|                ATL|           GSP| 2471|
|                ORD|           PDX| 2164|
+-------------------+--------------+-----+
only showing top 2 rows





In [26]:
(df.groupBy(col('DESTINATION_AIRPORT'), 
           col('ORIGIN_AIRPORT'))
 .count()
.where(col('count') > 100) 
.where(col('count') < 1000)).show(2)

+-------------------+--------------+-----+
|DESTINATION_AIRPORT|ORIGIN_AIRPORT|count|
+-------------------+--------------+-----+
|                PBI|           DCA|  978|
|                MDW|           MEM|  626|
+-------------------+--------------+-----+
only showing top 2 rows



##### Getting unique rows

In [27]:
df.select('ORIGIN_AIRPORT', 'DESTINATION_AIRPORT').distinct().show(5)

+--------------+-------------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|
+--------------+-------------------+
|           BQN|                MCO|
|           PHL|                MCO|
|           MCI|                IAH|
|           SPI|                ORD|
|           SNA|                PHX|
+--------------+-------------------+
only showing top 5 rows



##### Aggregations and Grouping with maps

In [4]:
from pyspark.sql.functions import col, count, countDistinct, approx_count_distinct, first, last, min, max, sum, sumDistinct, avg, collect_set, collect_list

The first function worth going over is **count**, with the sole exception that in this example it will perform as a transformation instead of an action. 

In [3]:
from pyspark.sql.functions import col, count, countDistinct, approx_count_distinct, first, last, min, max, sum, sumDistinct, avg, collect_set, collect_list

df.select(count('AIRLINE').alias('all_carriers')).show()


[Stage 2:>                                                        (0 + 12) / 12]

+------------+
|all_carriers|
+------------+
|     5819079|
+------------+



                                                                                

Sometimes we are not interested in the total count but in the number of different instances of the variable. 

In [30]:
df.select(countDistinct('AIRLINE').alias('unique_carriers')).show()
df.groupBy('AIRLINE').count().count()

                                                                                

+---------------+
|unique_carriers|
+---------------+
|             14|
+---------------+



14

If the dataset is very large, the exact count of different instances might be irrelevant, but an approximation with certain degreee of accuracy might be good enough

In [31]:
(df
 .select(approx_count_distinct('TAIL_NUMBER', 0.1)
         .alias('unique_approx')).show())

[Stage 53:>                                                       (0 + 12) / 12]

+-------------+
|unique_approx|
+-------------+
|         4819|
+-------------+



[Stage 53:====>                                                   (1 + 11) / 12]                                                                                

You can get the first and last values from a DataFrame

In [32]:
df.select(
    first('TAIL_NUMBER').alias('first_tailnum'),
    last('TAIL_NUMBER').alias('last_tailnum')
).show()

+-------------+------------+
|first_tailnum|last_tailnum|
+-------------+------------+
|       N407AS|      N534JB|
+-------------+------------+



[Stage 56:>                                                       (0 + 12) / 12]                                                                                

You can also compute the min, max, sum, avg values

In [33]:
df.select(
    min(col('DISTANCE')).alias('min_distance'),
    sum(col('DISTANCE')).alias('sum_distance'),
    avg(col('DISTANCE')).alias('avg_distance'),
    max(col('DISTANCE')).alias('max_distance'),
).show()

df.selectExpr('min(DISTANCE) as min_distance', 
              'sum(DISTANCE) as sum_distance', 
              'avg(DISTANCE) as avg_distance',
              'max(DISTANCE) as max_distance').show()

[Stage 59:>                                                       (0 + 12) / 12]                                                                                

+------------+------------+-----------------+------------+
|min_distance|sum_distance|     avg_distance|max_distance|
+------------+------------+-----------------+------------+
|          21|  4785357409|822.3564947305235|        4983|
+------------+------------+-----------------+------------+

+------------+------------+-----------------+------------+
|min_distance|sum_distance|     avg_distance|max_distance|
+------------+------------+-----------------+------------+
|          21|  4785357409|822.3564947305235|        4983|
+------------+------------+-----------------+------------+



And some more complex aggregates

In [5]:
df.select(
   collect_list('AIRLINE').alias('carriers'),
   collect_set('TAIL_NUMBER').alias('unique_planes')
).show()

                                                                                

+--------------------+--------------------+
|            carriers|       unique_planes|
+--------------------+--------------------+
|[AS, AA, US, AA, ...|[N557AS, N579JB, ...|
+--------------------+--------------------+



### Window Functions

Window functions allow you to apply aggregated computations through different data partitions and to interoperate them with individual registries. These are particularly useful since they allow you to get rid of loops (for, while,...).

Let's take a look at some examples:



In [6]:
from pyspark.sql.functions import col, avg, abs, monotonically_increasing_id, ntile, sum, lit, dense_rank, collect_list
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

So, there are multiple columns among which we could perform some interesting computations. For example, it might be of interest to know which planes tend to have larger delays than the average plane per carrier. In order to answer this question, we must perform the following steps:

* Define the fields among which we are going to partition the data in order to compute the aggregates. In this case, we are interested in the variable _carrier_

* Define the function to apply and how it is going to interplay with the individual entries. 

* Apply the function


##### Select variables of interest

In [7]:
ex1_data = df.select(col('MONTH'),
                     col('YEAR'),
                     col('TAIL_NUMBER'), 
                     col('AIRLINE'), 
                     col('ARRIVAL_DELAY'))\
.filter(col('TAIL_NUMBER').isNotNull())\
.filter(col('ARRIVAL_DELAY').isNotNull())

ex1_data.show(5)

+-----+----+-----------+-------+-------------+
|MONTH|YEAR|TAIL_NUMBER|AIRLINE|ARRIVAL_DELAY|
+-----+----+-----------+-------+-------------+
|    1|2015|     N407AS|     AS|          -22|
|    1|2015|     N3KUAA|     AA|           -9|
|    1|2015|     N171US|     US|            5|
|    1|2015|     N3HYAA|     AA|           -9|
|    1|2015|     N527AS|     AS|          -21|
+-----+----+-----------+-------+-------------+
only showing top 5 rows



##### Determine variables for partitioning the data

In [8]:
windowSpec = Window.partitionBy(ex1_data['AIRLINE']).orderBy(ex1_data['AIRLINE'])

##### Define function to apply


In this case we are interested in finding out how the delay of each plane compares with the average delay of the given airline, thus, we are going to compute the difference between the average and the particular delays. 


In [9]:
average_delay = avg(ex1_data['ARRIVAL_DELAY']).over(windowSpec) 
average_delay_diff = avg(ex1_data['ARRIVAL_DELAY']).over(windowSpec) - ex1_data['ARRIVAL_DELAY']

##### Apply function

In [10]:
ex1_data = ex1_data.select(col('MONTH'),
                         col('YEAR'),
                         col('TAIL_NUMBER'), 
                         col('AIRLINE'), 
                         col('ARRIVAL_DELAY'), 
                        average_delay.alias('AVERAGE_DELAY'),
                        average_delay_diff.alias('DELAY_DEVIATION'))
ex1_data.sort(col('AIRLINE'), 
              col('DELAY_DEVIATION')).show(10)



+-----+----+-----------+-------+-------------+------------------+-------------------+
|MONTH|YEAR|TAIL_NUMBER|AIRLINE|ARRIVAL_DELAY|     AVERAGE_DELAY|    DELAY_DEVIATION|
+-----+----+-----------+-------+-------------+------------------+-------------------+
|    1|2015|     N598AA|     AA|         1971|3.4513721447256764|-1967.5486278552744|
|    8|2015|     N479AA|     AA|         1898|3.4513721447256764|-1894.5486278552744|
|    9|2015|     N3CAAA|     AA|         1665|3.4513721447256764|-1661.5486278552744|
|   11|2015|     N489AA|     AA|         1638|3.4513721447256764|-1634.5486278552744|
|    7|2015|     N3LEAA|     AA|         1636|3.4513721447256764|-1632.5486278552744|
|   12|2015|     N4XKAA|     AA|         1636|3.4513721447256764|-1632.5486278552744|
|    2|2015|     N028AA|     AA|         1627|3.4513721447256764|-1623.5486278552744|
|    3|2015|     N559AA|     AA|         1598|3.4513721447256764|-1594.5486278552744|
|    1|2015|     N5DGAA|     AA|         1593|3.451372



Now we have an idea of which planes have the largest delays! Let us suppose that we are interested in comparing the top 5 planes with the largest delays between airlines. We can do this with another window function.

##### Determine variables for partitioning the data


In [11]:
windowSpec = Window.partitionBy(ex1_data['AIRLINE']).orderBy(ex1_data['AIRLINE'], 
                                                             ex1_data['DELAY_DEVIATION'])

##### Define function to apply

In this case, we are interested in computing a ranking

In [12]:
rank_func = dense_rank().over(windowSpec)

##### Apply function

In [13]:
top_delay = 5 # We are interested in the top 5

ex2_data = ex1_data.filter(col('DELAY_DEVIATION') < 0)\
.select(col('MONTH'),
        col('YEAR'),
        col('TAIL_NUMBER'), 
        col('AIRLINE'), 
        col('ARRIVAL_DELAY'), 
        col('DELAY_DEVIATION'), 
        rank_func.alias('DELAY_RANK'))\
.filter(col('DELAY_RANK') <= top_delay)\
.sort(col('AIRLINE'),  col('DELAY_RANK'))

ex2_data.show(30)



+-----+----+-----------+-------+-------------+-------------------+----------+
|MONTH|YEAR|TAIL_NUMBER|AIRLINE|ARRIVAL_DELAY|    DELAY_DEVIATION|DELAY_RANK|
+-----+----+-----------+-------+-------------+-------------------+----------+
|    1|2015|     N598AA|     AA|         1971|-1967.5486278552744|         1|
|    8|2015|     N479AA|     AA|         1898|-1894.5486278552744|         2|
|    9|2015|     N3CAAA|     AA|         1665|-1661.5486278552744|         3|
|   11|2015|     N489AA|     AA|         1638|-1634.5486278552744|         4|
|   12|2015|     N4XKAA|     AA|         1636|-1632.5486278552744|         5|
|    7|2015|     N3LEAA|     AA|         1636|-1632.5486278552744|         5|
|    6|2015|     N307AS|     AS|          950| -950.9765630924119|         1|
|    3|2015|     N760AS|     AS|          853| -853.9765630924119|         2|
|    4|2015|     N408AS|     AS|          820| -820.9765630924119|         3|
|    7|2015|     N708AS|     AS|          813| -813.976563092411

                                                                                

In [14]:
### Otras funciones
from pyspark.sql.functions import col, count, countDistinct, \
approx_count_distinct, first, last, min, max, sum, sumDistinct, avg,\
collect_set, collect_list, percent_rank, lag, lead, when, size, element_at


In [15]:
empsalary = spark.createDataFrame([
  ("sales",     1,  "Alice",  5000, ["game",  "ski"]),
  ("personnel", 2,  "Olivia", 3900, ["game",  "ski"]),
  ("sales",     3,  "Ella",   4800, ["skate", "ski"]),
  ("sales",     4,  "Ebba",   4800, ["game",  "ski"]),
  ("personnel", 5,  "Lilly",  3500, ["climb", "ski"]),
  ("develop",   7,  "Astrid", 4200, ["game",  "ski"]),
  ("develop",   8,  "Saga",   6000, ["kajak", "ski"]),
  ("develop",   9,  "Freja",  4500, ["game",  "kajak"]),
  ("develop",   10, "Wilma",  5200, ["game",  "ski"]),
  ("develop",   11, "Maja",   5200, ["game",  "farming"])
]).toDF("depName", "empNo", "name", "salary", "hobby")
  

Multiples funciones sobre una misma partición

In [16]:
overCategory = Window.partitionBy("depName")
df = empsalary\
.withColumn("salaries", collect_list("salary").over(overCategory))\
.withColumn("average_salary", (avg("salary").over(overCategory)).cast("int"))\
.withColumn("total_salary", sum("salary").over(overCategory))\
.select("depName", "empNo", "name", "salary", 
        "salaries", "average_salary", "total_salary")
df.show(20, False)

[Stage 15:>                                                       (0 + 12) / 12]

+---------+-----+------+------+------------------------------+--------------+------------+
|depName  |empNo|name  |salary|salaries                      |average_salary|total_salary|
+---------+-----+------+------+------------------------------+--------------+------------+
|develop  |7    |Astrid|4200  |[4200, 6000, 4500, 5200, 5200]|5020          |25100       |
|develop  |8    |Saga  |6000  |[4200, 6000, 4500, 5200, 5200]|5020          |25100       |
|develop  |9    |Freja |4500  |[4200, 6000, 4500, 5200, 5200]|5020          |25100       |
|develop  |10   |Wilma |5200  |[4200, 6000, 4500, 5200, 5200]|5020          |25100       |
|develop  |11   |Maja  |5200  |[4200, 6000, 4500, 5200, 5200]|5020          |25100       |
|personnel|2    |Olivia|3900  |[3900, 3500]                  |3700          |7400        |
|personnel|5    |Lilly |3500  |[3900, 3500]                  |3700          |7400        |
|sales    |1    |Alice |5000  |[5000, 4800, 4800]            |4866          |14600       |

                                                                                

Order by: Default acumulado hasta la fila actual

In [17]:
overCategory = Window.partitionBy("depName").orderBy(col("salary").desc())
df = empsalary\
.withColumn("salaries", collect_list("salary").over(overCategory))\
.withColumn("msalary", (avg("salary").over(overCategory)).cast("int"))\
.withColumn("tsalary", sum("salary").over(overCategory))\
.withColumn("ntile", ntile(3).over(overCategory))\
.withColumn("prank", percent_rank().over(overCategory))\
.withColumn("drank", dense_rank().over(overCategory))\
.select("depName", "empNo",  "salary", 
        "salaries", "msalary", "tsalary", 
        "ntile", "prank", "drank")
df.show(20, False)

+---------+-----+------+------------------------------+-------+-------+-----+-----+-----+
|depName  |empNo|salary|salaries                      |msalary|tsalary|ntile|prank|drank|
+---------+-----+------+------------------------------+-------+-------+-----+-----+-----+
|develop  |8    |6000  |[6000]                        |6000   |6000   |1    |0.0  |1    |
|develop  |10   |5200  |[6000, 5200, 5200]            |5466   |16400  |1    |0.25 |2    |
|develop  |11   |5200  |[6000, 5200, 5200]            |5466   |16400  |2    |0.25 |2    |
|develop  |9    |4500  |[6000, 5200, 5200, 4500]      |5225   |20900  |2    |0.75 |3    |
|develop  |7    |4200  |[6000, 5200, 5200, 4500, 4200]|5020   |25100  |3    |1.0  |4    |
|personnel|2    |3900  |[3900]                        |3900   |3900   |1    |0.0  |1    |
|personnel|5    |3500  |[3900, 3500]                  |3700   |7400   |2    |1.0  |2    |
|sales    |1    |5000  |[5000]                        |5000   |5000   |1    |0.0  |1    |
|sales    

Lag y lead para sacar interacción entre filas

In [18]:
overCategory = Window.partitionBy("depName").orderBy(col("salary").desc())
df = empsalary\
.withColumn("lead", lead("salary", 1).over(overCategory))\
.withColumn("lag", lag("salary", 1).over(overCategory))\
.select("depName", "empNo", "name", "salary", "lead", "lag")
df.show(20, False)

+---------+-----+------+------+----+----+
|depName  |empNo|name  |salary|lead|lag |
+---------+-----+------+------+----+----+
|develop  |8    |Saga  |6000  |5200|null|
|develop  |10   |Wilma |5200  |5200|6000|
|develop  |11   |Maja  |5200  |4500|5200|
|develop  |9    |Freja |4500  |4200|5200|
|develop  |7    |Astrid|4200  |null|4500|
|personnel|2    |Olivia|3900  |3500|null|
|personnel|5    |Lilly |3500  |null|3900|
|sales    |1    |Alice |5000  |4800|null|
|sales    |3    |Ella  |4800  |4800|5000|
|sales    |4    |Ebba  |4800  |null|4800|
+---------+-----+------+------+----+----+



In [19]:
overCategory = Window.partitionBy("depName").orderBy(col("salary").desc())
df = empsalary\
.withColumn("lead", lead("salary", 1).over(overCategory))\
.withColumn("lag", lag("salary", 1).over(overCategory))\
.withColumn("h_t_n", 
            when(col("lead").isNull(), 0).otherwise(col("salary") - col("lead")))\
.withColumn("l_t_p", 
            when(col("lag").isNull(), 0).otherwise(col("lag") - col("salary")))\
.select("depName", "empNo", "name", "salary", "lead", "lag", "h_t_n", "l_t_p")
df.show(20, False)

+---------+-----+------+------+----+----+-----+-----+
|depName  |empNo|name  |salary|lead|lag |h_t_n|l_t_p|
+---------+-----+------+------+----+----+-----+-----+
|develop  |8    |Saga  |6000  |5200|null|800  |0    |
|develop  |10   |Wilma |5200  |5200|6000|0    |800  |
|develop  |11   |Maja  |5200  |4500|5200|700  |0    |
|develop  |9    |Freja |4500  |4200|5200|300  |700  |
|develop  |7    |Astrid|4200  |null|4500|0    |300  |
|personnel|2    |Olivia|3900  |3500|null|400  |0    |
|personnel|5    |Lilly |3500  |null|3900|0    |400  |
|sales    |1    |Alice |5000  |4800|null|200  |0    |
|sales    |3    |Ella  |4800  |4800|5000|0    |200  |
|sales    |4    |Ebba  |4800  |null|4800|0    |0    |
+---------+-----+------+------+----+----+-----+-----+



Aplicación de funciones sobre rangos

In [0]:
overCategory = Window.partitionBy("depName").orderBy("salary")\
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)