### DataFrame Transformations
---------
The material in this notebook was extracted from
* Spark The Definitive Guide Big Data Processing Made Simple (2018)
---------

When working with individual DataFrames, there are some fundamental objectives. These break down into several core operations:

- Add rows or columns
- Remove rows or columns
- Transform rows into columns (or vice versa)
- Sort data by values


#### Creating DataFrames

We can create DataFrames from raw data sources

In [1]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
data_path = os.path.join('..', '..', '..', 'FuentesDeDatos', 'classes', 'class_notes', 'class_18', 'data')

In [2]:
# spark.read.csv(./data/flights, sep=',', header=True, inferSchema=True)
df = (spark.read
.format('csv')
.option('header', True)
.option('inferSchema', True)
.load(os.path.join(data_path, 'flights.csv')))
df.dtypes

                                                                                

[('YEAR', 'int'),
 ('MONTH', 'int'),
 ('DAY', 'int'),
 ('DAY_OF_WEEK', 'int'),
 ('AIRLINE', 'string'),
 ('FLIGHT_NUMBER', 'int'),
 ('TAIL_NUMBER', 'string'),
 ('ORIGIN_AIRPORT', 'string'),
 ('DESTINATION_AIRPORT', 'string'),
 ('SCHEDULED_DEPARTURE', 'int'),
 ('DEPARTURE_TIME', 'int'),
 ('DEPARTURE_DELAY', 'int'),
 ('TAXI_OUT', 'int'),
 ('WHEELS_OFF', 'int'),
 ('SCHEDULED_TIME', 'int'),
 ('ELAPSED_TIME', 'int'),
 ('AIR_TIME', 'int'),
 ('DISTANCE', 'int'),
 ('WHEELS_ON', 'int'),
 ('TAXI_IN', 'int'),
 ('SCHEDULED_ARRIVAL', 'int'),
 ('ARRIVAL_TIME', 'int'),
 ('ARRIVAL_DELAY', 'int'),
 ('DIVERTED', 'int'),
 ('CANCELLED', 'int'),
 ('CANCELLATION_REASON', 'string'),
 ('AIR_SYSTEM_DELAY', 'int'),
 ('SECURITY_DELAY', 'int'),
 ('AIRLINE_DELAY', 'int'),
 ('LATE_AIRCRAFT_DELAY', 'int'),
 ('WEATHER_DELAY', 'int')]

In [3]:
df.show(5, truncate=False)

22/11/29 17:21:03 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+----------

In [4]:
(df
 .select(F.col('YEAR').alias('year'), F.col('MONTH').alias('month'), F.col('DAY').alias('day'))
 .select('*', F.concat(F.col('year'), F.lit('-'), F.col('month'), F.lit('-'), F.col('day')).alias('date'))
).show()

NameError: name 'F' is not defined

But we can also create DataFrames on the fly

In [None]:
import pyspark.sql.functions as F

In [None]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

mySchema = StructType([
    StructField('col_1', StringType(), True),
    StructField('col_2', StringType(), True),
    StructField('col_3', LongType(), False)
])

myRows = [Row('Hello', None, 1), 
          Row('World', '!', 2), 
          Row('HOW', None, 3)]


myDf = spark.createDataFrame(myRows, mySchema)
myDf.show()

In [9]:
myDf.toPandas().head() # collect

Unnamed: 0,col_1,col_2,col_3
0,Hello,,1
1,World,!,2
2,HOW,,3


##### Select and selectExpr

```select``` and ```selectExpr```allow you to do the DataFrame equivalent of SQL queries on a table

In [10]:
df.select('DESTINATION_AIRPORT').show(2)

+-------------------+
|DESTINATION_AIRPORT|
+-------------------+
|                SEA|
|                PBI|
+-------------------+
only showing top 2 rows



In [15]:
df.dtypes

[('YEAR', 'int'),
 ('MONTH', 'int'),
 ('DAY', 'int'),
 ('DAY_OF_WEEK', 'int'),
 ('AIRLINE', 'string'),
 ('FLIGHT_NUMBER', 'int'),
 ('TAIL_NUMBER', 'string'),
 ('ORIGIN_AIRPORT', 'string'),
 ('DESTINATION_AIRPORT', 'string'),
 ('SCHEDULED_DEPARTURE', 'int'),
 ('DEPARTURE_TIME', 'int'),
 ('DEPARTURE_DELAY', 'int'),
 ('TAXI_OUT', 'int'),
 ('WHEELS_OFF', 'int'),
 ('SCHEDULED_TIME', 'int'),
 ('ELAPSED_TIME', 'int'),
 ('AIR_TIME', 'int'),
 ('DISTANCE', 'int'),
 ('WHEELS_ON', 'int'),
 ('TAXI_IN', 'int'),
 ('SCHEDULED_ARRIVAL', 'int'),
 ('ARRIVAL_TIME', 'int'),
 ('ARRIVAL_DELAY', 'int'),
 ('DIVERTED', 'int'),
 ('CANCELLED', 'int'),
 ('CANCELLATION_REASON', 'string'),
 ('AIR_SYSTEM_DELAY', 'int'),
 ('SECURITY_DELAY', 'int'),
 ('AIRLINE_DELAY', 'int'),
 ('LATE_AIRCRAFT_DELAY', 'int'),
 ('WEATHER_DELAY', 'int')]

In [23]:
# df.select(cols).show(5)
# Display all the columns with numeric type in lowercase. 
df.select([F.col(c).alias(c.lower()) for c, t in df.dtypes if t in ('int', 'double')]).show(5)

+----+-----+---+-----------+-------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+----------------+--------------+-------------+-------------------+-------------+
|year|month|day|day_of_week|flight_number|scheduled_departure|departure_time|departure_delay|taxi_out|wheels_off|scheduled_time|elapsed_time|air_time|distance|wheels_on|taxi_in|scheduled_arrival|arrival_time|arrival_delay|diverted|cancelled|air_system_delay|security_delay|airline_delay|late_aircraft_delay|weather_delay|
+----+-----+---+-----------+-------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+----------------+--------------+-------------+-------------------+-------------+
|2015|    1|  1|          4|      

In [11]:
df.select('DESTINATION_AIRPORT', 'AIR_TIME').show(2)  

+-------------------+--------+
|DESTINATION_AIRPORT|AIR_TIME|
+-------------------+--------+
|                SEA|     169|
|                PBI|     263|
+-------------------+--------+
only showing top 2 rows



You can refer columns in multiple ways

In [5]:
from pyspark.sql.functions import expr, col, column, lit
# F.expr, F.col, F.col...
df.select(
    expr('DESTINATION_AIRPORT'),
    col('ORIGIN_AIRPORT'),
    column('DISTANCE')).show(2)

+-------------------+--------------+--------+
|DESTINATION_AIRPORT|ORIGIN_AIRPORT|DISTANCE|
+-------------------+--------------+--------+
|                SEA|           ANC|    1448|
|                PBI|           LAX|    2330|
+-------------------+--------------+--------+
only showing top 2 rows



And with a little bit more flexibility

In [27]:
# df.select(expr(...))
# .alias
# as 
# withColumnRenamed
df.selectExpr(
    '*', # Include all original columns
    'DESTINATION_AIRPORT as destiny',
    'DISTANCE*2 as round_trip').explain()

== Physical Plan ==
*(1) Project [YEAR#17, MONTH#18, DAY#19, DAY_OF_WEEK#20, AIRLINE#21, FLIGHT_NUMBER#22, TAIL_NUMBER#23, ORIGIN_AIRPORT#24, DESTINATION_AIRPORT#25, SCHEDULED_DEPARTURE#26, DEPARTURE_TIME#27, DEPARTURE_DELAY#28, TAXI_OUT#29, WHEELS_OFF#30, SCHEDULED_TIME#31, ELAPSED_TIME#32, AIR_TIME#33, DISTANCE#34, WHEELS_ON#35, TAXI_IN#36, SCHEDULED_ARRIVAL#37, ARRIVAL_TIME#38, ARRIVAL_DELAY#39, DIVERTED#40, ... 9 more fields]
+- InMemoryTableScan [AIRLINE#21, AIRLINE_DELAY#45, AIR_SYSTEM_DELAY#43, AIR_TIME#33, ARRIVAL_DELAY#39, ARRIVAL_TIME#38, CANCELLATION_REASON#42, CANCELLED#41, DAY#19, DAY_OF_WEEK#20, DEPARTURE_DELAY#28, DEPARTURE_TIME#27, DESTINATION_AIRPORT#25, DISTANCE#34, DIVERTED#40, ELAPSED_TIME#32, FLIGHT_NUMBER#22, LATE_AIRCRAFT_DELAY#46, MONTH#18, ORIGIN_AIRPORT#24, SCHEDULED_ARRIVAL#37, SCHEDULED_DEPARTURE#26, SCHEDULED_TIME#31, SECURITY_DELAY#44, ... 7 more fields]
      +- InMemoryRelation [YEAR#17, MONTH#18, DAY#19, DAY_OF_WEEK#20, AIRLINE#21, FLIGHT_NUMBER#22, TAI

In [30]:
df1 = df.cache()
df1.selectExpr(
    'DESTINATION_AIRPORT as destiny',
    'ORIGIN_AIRPORT',
    'DISTANCE',
    'DISTANCE < 1000 as near_by').explain()
#.filter(col('near_by')).show(5)

22/11/22 18:24:17 WARN CacheManager: Asked to cache already cached data.
== Physical Plan ==
*(1) Project [DESTINATION_AIRPORT#25 AS destiny#3180, ORIGIN_AIRPORT#24, DISTANCE#34, (DISTANCE#34 < 1000) AS near_by#3181]
+- InMemoryTableScan [DESTINATION_AIRPORT#25, DISTANCE#34, ORIGIN_AIRPORT#24]
      +- InMemoryRelation [YEAR#17, MONTH#18, DAY#19, DAY_OF_WEEK#20, AIRLINE#21, FLIGHT_NUMBER#22, TAIL_NUMBER#23, ORIGIN_AIRPORT#24, DESTINATION_AIRPORT#25, SCHEDULED_DEPARTURE#26, DEPARTURE_TIME#27, DEPARTURE_DELAY#28, TAXI_OUT#29, WHEELS_OFF#30, SCHEDULED_TIME#31, ELAPSED_TIME#32, AIR_TIME#33, DISTANCE#34, WHEELS_ON#35, TAXI_IN#36, SCHEDULED_ARRIVAL#37, ARRIVAL_TIME#38, ARRIVAL_DELAY#39, DIVERTED#40, ... 7 more fields], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- FileScan csv [YEAR#17,MONTH#18,DAY#19,DAY_OF_WEEK#20,AIRLINE#21,FLIGHT_NUMBER#22,TAIL_NUMBER#23,ORIGIN_AIRPORT#24,DESTINATION_AIRPORT#25,SCHEDULED_DEPARTURE#26,DEPARTURE_TIME#27,DEPARTURE_DELAY#28,TAXI_OUT#29,

With ```selectExpr```we can also compute aggregates

In [33]:
(df1.selectExpr('avg(DISTANCE) as mean_distance', 
               'count(distinct(ORIGIN_AIRPORT)) as dist_org', 
               'count(distinct(DESTINATION_AIRPORT)) as dist_dest')
 .select('*', F.round('mean_distance', 2).alias('round_distance'))
).show()

+-----------------+--------+---------+--------------+
|    mean_distance|dist_org|dist_dest|round_distance|
+-----------------+--------+---------+--------------+
|822.3564947305235|     628|      629|        822.36|
+-----------------+--------+---------+--------------+





##### Adding Columns

We can add columns to a Spark DataFrame within the select statement:

In [34]:
df1.select(col('ORIGIN_AIRPORT'), 
           lit(33).alias('col_2') 
           ).withColumn('pasted_together', 
                        F.concat('ORIGIN_AIRPORT', F.lit('-'), 'col_2')).show(4)

+--------------+-----+---------------+
|ORIGIN_AIRPORT|col_2|pasted_together|
+--------------+-----+---------------+
|           ANC|   33|         ANC-33|
|           LAX|   33|         LAX-33|
|           SFO|   33|         SFO-33|
|           LAX|   33|         LAX-33|
+--------------+-----+---------------+
only showing top 4 rows



Or using the more formal function ```withColumn```

In [35]:
df.select(col('ORIGIN_AIRPORT')).withColumn('One', lit(1)).show(2)

+--------------+---+
|ORIGIN_AIRPORT|One|
+--------------+---+
|           ANC|  1|
|           LAX|  1|
+--------------+---+
only showing top 2 rows



The full functionality of ```withColumn```can be enhanced by using ```udfs```

##### Renaming columns

We can rename columns either with an alias, a ```withColumn``` or ```withColumnRenamed```

In [17]:
df.select(col('DESTINATION_AIRPORT').alias('destiny')).show(2)

+-------+
|destiny|
+-------+
|    SEA|
|    PBI|
+-------+
only showing top 2 rows



In [18]:
df.select('DESTINATION_AIRPORT').withColumn('destiny', col('DESTINATION_AIRPORT')).show(2)

+-------------------+-------+
|DESTINATION_AIRPORT|destiny|
+-------------------+-------+
|                SEA|    SEA|
|                PBI|    PBI|
+-------------------+-------+
only showing top 2 rows



In [36]:
df.select('DESTINATION_AIRPORT').withColumnRenamed('DESTINATION_AIRPORT', 'destiny').show(2)

+-------+
|destiny|
+-------+
|    SEA|
|    PBI|
+-------+
only showing top 2 rows



##### Removing columns

We can remove one or multiple columns with ```drop````


In [5]:
df.select(col('DESTINATION_AIRPORT').alias('dest'), 
          col('ORIGIN_AIRPORT').alias('origin'), 
          col('DISTANCE'), 
          expr('DISTANCE < 1000').alias('near_by')).drop(*['dest', 'origin']).show(2)

+--------+-------+
|DISTANCE|near_by|
+--------+-------+
|    1448|  false|
|    2330|  false|
+--------+-------+
only showing top 2 rows



##### Changing a Column's type


In [6]:
df.select(col('DISTANCE').cast('float')).show(2)

+--------+
|DISTANCE|
+--------+
|  1448.0|
|  2330.0|
+--------+
only showing top 2 rows



##### Filtering rows

In [8]:
df.select(col('DESTINATION_AIRPORT'), 
          col('ORIGIN_AIRPORT'), 
          col('DISTANCE'), 
          expr('DISTANCE < 1000').alias('near_by'))\
.where(col('near_by')).show(5)

+-------------------+--------------+--------+-------+
|DESTINATION_AIRPORT|ORIGIN_AIRPORT|DISTANCE|near_by|
+-------------------+--------------+--------+-------+
|                DFW|           PHX|     868|   true|
|                DFW|           PHX|     868|   true|
|                SEA|           GEG|     224|   true|
|                ITO|           HNL|     216|   true|
|                SFO|           ONT|     363|   true|
+-------------------+--------------+--------+-------+
only showing top 5 rows



In [8]:
import pyspark.sql.functions as F
(df
 .groupBy(col('DESTINATION_AIRPORT'), 
          col('ORIGIN_AIRPORT'))
 .agg(F.count('*').alias('all_trips'))
 .where(col('all_trips') > 100)).show(2)

[Stage 7:===>                                                     (1 + 15) / 16]

+-------------------+--------------+---------+
|DESTINATION_AIRPORT|ORIGIN_AIRPORT|all_trips|
+-------------------+--------------+---------+
|                ATL|           GSP|     2471|
|                ORD|           PDX|     2164|
+-------------------+--------------+---------+
only showing top 2 rows



                                                                                

In [9]:
df.groupBy(col('DESTINATION_AIRPORT'), 
           col('ORIGIN_AIRPORT')).count()\
.where(col('count') > 100).show(2)

[Stage 10:===>                                                    (1 + 15) / 16]

+-------------------+--------------+-----+
|DESTINATION_AIRPORT|ORIGIN_AIRPORT|count|
+-------------------+--------------+-----+
|                ATL|           GSP| 2471|
|                ORD|           PDX| 2164|
+-------------------+--------------+-----+
only showing top 2 rows





In [10]:
(df.groupBy(col('DESTINATION_AIRPORT'), 
           col('ORIGIN_AIRPORT'))
 .count()
.where((col('count') > 100)) 
.where(col('count') < 1000)).show(2)

[Stage 13:===>                                                    (1 + 15) / 16]

+-------------------+--------------+-----+
|DESTINATION_AIRPORT|ORIGIN_AIRPORT|count|
+-------------------+--------------+-----+
|                PBI|           DCA|  978|
|                MDW|           MEM|  626|
+-------------------+--------------+-----+
only showing top 2 rows





##### Getting unique rows

In [11]:
df.select('ORIGIN_AIRPORT', 'DESTINATION_AIRPORT').distinct().show(5) # coalesce

[Stage 16:===>                                                    (1 + 15) / 16]

+--------------+-------------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|
+--------------+-------------------+
|           BQN|                MCO|
|           PHL|                MCO|
|           MCI|                IAH|
|           SPI|                ORD|
|           SNA|                PHX|
+--------------+-------------------+
only showing top 5 rows





In [21]:
df.select(~(F.col('FLIGHT_NUMBER') % 2).cast('boolean')).show(5)

+------------------------------------------+
|(NOT CAST((FLIGHT_NUMBER % 2) AS BOOLEAN))|
+------------------------------------------+
|                                      true|
|                                      true|
|                                      true|
|                                      true|
|                                     false|
+------------------------------------------+
only showing top 5 rows



In [25]:
df.select(F.col('FLIGHT_NUMBER'), 
          (F.when(~(F.col('FLIGHT_NUMBER') % 2).cast('boolean'), 'div 2')
           .when(~(F.col('FLIGHT_NUMBER') % 3).cast('boolean'), 'div 3')
           .otherwise('not div by 2 or 3')
          ).alias('div_col')).show(30)

+-------------+-----------------+
|FLIGHT_NUMBER|          div_col|
+-------------+-----------------+
|           98|            div 2|
|         2336|            div 2|
|          840|            div 2|
|          258|            div 2|
|          135|            div 3|
|          806|            div 2|
|          612|            div 2|
|         2013|            div 3|
|         1112|            div 2|
|         1173|            div 3|
|         2336|            div 2|
|         1674|            div 2|
|         1434|            div 2|
|         2324|            div 2|
|         2440|            div 2|
|          108|            div 2|
|         1560|            div 2|
|         1197|            div 3|
|          122|            div 2|
|         1670|            div 2|
|          520|            div 2|
|          371|not div by 2 or 3|
|          214|            div 2|
|          115|not div by 2 or 3|
|         1450|            div 2|
|         1545|            div 3|
|          130

##### Aggregations and Grouping with maps

In [26]:
from pyspark.sql.functions import col, count, countDistinct, approx_count_distinct, first, last, min, max, sum, sumDistinct, avg, collect_set, collect_list

The first function worth going over is **count**, with the sole exception that in this example it will perform as a transformation instead of an action. 

In [27]:
from pyspark.sql.functions import col, count, countDistinct, approx_count_distinct, first, last, min, max, sum, sumDistinct, avg, collect_set, collect_list

df.select(count('AIRLINE').alias('all_carriers')).show()


[Stage 27:>                                                       (0 + 16) / 16]

+------------+
|all_carriers|
+------------+
|     5819079|
+------------+



[Stage 27:===>                                                    (1 + 15) / 16]                                                                                

Sometimes we are not interested in the total count but in the number of different instances of the variable. 

In [28]:
df.select(countDistinct('AIRLINE').alias('unique_carriers')).show()
df.groupBy('AIRLINE').count().count()

                                                                                

+---------------+
|unique_carriers|
+---------------+
|             14|
+---------------+



                                                                                

14

If the dataset is very large, the exact count of different instances might be irrelevant, but an approximation with certain degreee of accuracy might be good enough

In [30]:
help(approx_count_distinct)

Help on function approx_count_distinct in module pyspark.sql.functions:

approx_count_distinct(col: 'ColumnOrName', rsd: Optional[float] = None) -> pyspark.sql.column.Column
    Aggregate function: returns a new :class:`~pyspark.sql.Column` for approximate distinct count
    of column `col`.
    
    .. versionadded:: 2.1.0
    
    Parameters
    ----------
    col : :class:`~pyspark.sql.Column` or str
    rsd : float, optional
        maximum relative standard deviation allowed (default = 0.05).
        For rsd < 0.01, it is more efficient to use :func:`count_distinct`
    
    Examples
    --------
    >>> df.agg(approx_count_distinct(df.age).alias('distinct_ages')).collect()
    [Row(distinct_ages=2)]



In [31]:
(df
 .select(approx_count_distinct('TAIL_NUMBER')
         .alias('unique_approx')).show())

[Stage 45:>                                                       (0 + 16) / 16]

+-------------+
|unique_approx|
+-------------+
|         4715|
+-------------+



[Stage 45:===>                                                    (1 + 15) / 16]                                                                                

You can get the first and last values from a DataFrame

In [32]:
df.select(
    first('TAIL_NUMBER').alias('first_tailnum'),
    last('TAIL_NUMBER').alias('last_tailnum')
).show()

[Stage 48:>                                                       (0 + 16) / 16]

+-------------+------------+
|first_tailnum|last_tailnum|
+-------------+------------+
|       N407AS|      N534JB|
+-------------+------------+





You can also compute the min, max, sum, avg values

In [33]:
df.select(
    min(col('DISTANCE')).alias('min_distance'),
    sum(col('DISTANCE')).alias('sum_distance'),
    avg(col('DISTANCE')).alias('avg_distance'),
    max(col('DISTANCE')).alias('max_distance'),
).show()

df.selectExpr('min(DISTANCE) as min_distance', 
              'sum(DISTANCE) as sum_distance', 
              'avg(DISTANCE) as avg_distance',
              'max(DISTANCE) as max_distance').show()

                                                                                

+------------+------------+-----------------+------------+
|min_distance|sum_distance|     avg_distance|max_distance|
+------------+------------+-----------------+------------+
|          21|  4785357409|822.3564947305235|        4983|
+------------+------------+-----------------+------------+



[Stage 54:>                                                       (0 + 16) / 16]

+------------+------------+-----------------+------------+
|min_distance|sum_distance|     avg_distance|max_distance|
+------------+------------+-----------------+------------+
|          21|  4785357409|822.3564947305235|        4983|
+------------+------------+-----------------+------------+



[Stage 54:===>                                                    (1 + 15) / 16]                                                                                

And some more complex aggregates

In [50]:
test_data = df.select(
    collect_set('TAIL_NUMBER').alias('unique_planes')
    ).toPandas()
test_data.head()

                                                                                

Unnamed: 0,unique_planes
0,"[N557AS, N579JB, N3AGAA, N533US, N612JB, N827U..."


### Window Functions

Window functions allow you to apply aggregated computations through different data partitions and to interoperate them with individual registries. These are particularly useful since they allow you to get rid of loops (for, while,...).

Let's take a look at some examples:



In [47]:
from pyspark.sql.functions import col, avg, abs, monotonically_increasing_id, ntile, sum, lit, dense_rank, collect_list
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

So, there are multiple columns among which we could perform some interesting computations. For example, it might be of interest to know which planes tend to have larger delays than the average plane per carrier. In order to answer this question, we must perform the following steps:

* Define the fields among which we are going to partition the data in order to compute the aggregates. In this case, we are interested in the variable _carrier_

* Define the function to apply and how it is going to interplay with the individual entries. 

* Apply the function


In [62]:
flights = spark.read.csv('../class_17/data/flights.csv', header=True)
flights.show(5)

+-----+---+-------+-------+-------+--------+---------+---------+--------+----+---------+---------+--------+---------+
|MONTH|DAY|WEEKDAY|AIRLINE|ORG_AIR|DEST_AIR|SCHED_DEP|DEP_DELAY|AIR_TIME|DIST|SCHED_ARR|ARR_DELAY|DIVERTED|CANCELLED|
+-----+---+-------+-------+-------+--------+---------+---------+--------+----+---------+---------+--------+---------+
|    1|  1|      4|     WN|    LAX|     SLC|     1625|     58.0|    94.0| 590|     1905|     65.0|       0|        0|
|    1|  1|      4|     UA|    DEN|     IAD|      823|      7.0|   154.0|1452|     1333|    -13.0|       0|        0|
|    1|  1|      4|     MQ|    DFW|     VPS|     1305|     36.0|    85.0| 641|     1453|     35.0|       0|        0|
|    1|  1|      4|     AA|    DFW|     DCA|     1555|      7.0|   126.0|1192|     1935|     -7.0|       0|        0|
|    1|  1|      4|     WN|    LAX|     MCI|     1720|     48.0|   166.0|1363|     2225|     39.0|       0|        0|
+-----+---+-------+-------+-------+--------+---------+--

In [104]:
w_rank = Window().partitionBy(F.col('airline')).orderBy(F.col('n_trips').desc())

In [107]:
# Get the combination of origin destination with more flights per airline.
t_data = (flights
          .groupBy(
              F.col('AIRLINE').alias('airline'), 
              F.col('ORG_AIR').alias('origin'), 
              F.col('DEST_AIR').alias('destiny'))
          .agg(F.count('*').alias('n_trips'))
          .orderBy(F.col('airline'), F.col('n_trips').desc())
          .select('*', F.dense_rank().over(w_rank).alias('top_n'))
          .filter(F.col('top_n') <= 5)
)

t_data.show()

+-------+------+-------+-------+-----+
|airline|origin|destiny|n_trips|top_n|
+-------+------+-------+-------+-----+
|     AA|   DFW|    LAX|    175|    1|
|     AA|   LAX|    DFW|    175|    1|
|     AA|   DFW|    ORD|    163|    2|
|     AA|   ORD|    DFW|    160|    3|
|     AA|   ORD|    LGA|    152|    4|
|     AA|   DFW|    AUS|    146|    5|
|     AS|   LAX|    SEA|    157|    1|
|     AS|   LAS|    SEA|     80|    2|
|     AS|   LAX|    PDX|     76|    3|
|     AS|   SFO|    SEA|     74|    4|
|     AS|   DEN|    SEA|     50|    5|
|     B6|   LAX|    JFK|     71|    1|
|     B6|   SFO|    JFK|     50|    2|
|     B6|   SFO|    LGB|     36|    3|
|     B6|   LAS|    LGB|     35|    4|
|     B6|   ORD|    JFK|     34|    5|
|     DL|   ATL|    MCO|    187|    1|
|     DL|   ATL|    LGA|    154|    2|
|     DL|   ATL|    TPA|    147|    3|
|     DL|   ATL|    FLL|    141|    4|
+-------+------+-------+-------+-----+
only showing top 20 rows



##### Select variables of interest

In [48]:
ex1_data = df.select(col('MONTH'),
                     col('YEAR'),
                     col('TAIL_NUMBER'), 
                     col('AIRLINE'), 
                     col('ARRIVAL_DELAY'))\
.filter(col('TAIL_NUMBER').isNotNull())\
.filter(col('ARRIVAL_DELAY').isNotNull())

ex1_data.show(5)

+-----+----+-----------+-------+-------------+
|MONTH|YEAR|TAIL_NUMBER|AIRLINE|ARRIVAL_DELAY|
+-----+----+-----------+-------+-------------+
|    1|2015|     N407AS|     AS|          -22|
|    1|2015|     N3KUAA|     AA|           -9|
|    1|2015|     N171US|     US|            5|
|    1|2015|     N3HYAA|     AA|           -9|
|    1|2015|     N527AS|     AS|          -21|
+-----+----+-----------+-------+-------------+
only showing top 5 rows



##### Determine variables for partitioning the data

In [80]:
windowSpec = Window.partitionBy(ex1_data['AIRLINE']).orderBy(ex1_data['AIRLINE'])

##### Define function to apply


In this case we are interested in finding out how the delay of each plane compares with the average delay of the given airline, thus, we are going to compute the difference between the average and the particular delays. 


In [81]:
average_delay = avg(ex1_data['ARRIVAL_DELAY']).over(windowSpec) 
average_delay_diff = avg(ex1_data['ARRIVAL_DELAY']).over(windowSpec) - ex1_data['ARRIVAL_DELAY']

##### Apply function

In [82]:
ex1_data = ex1_data.select(col('MONTH'),
                         col('YEAR'),
                         col('TAIL_NUMBER'), 
                         col('AIRLINE'), 
                         col('ARRIVAL_DELAY'), 
                        average_delay.alias('AVERAGE_DELAY'),
                        average_delay_diff.alias('DELAY_DEVIATION'))
ex1_data.sort(col('AIRLINE'), 
              col('DELAY_DEVIATION')).show(10)



+-----+----+-----------+-------+-------------+------------------+-------------------+
|MONTH|YEAR|TAIL_NUMBER|AIRLINE|ARRIVAL_DELAY|     AVERAGE_DELAY|    DELAY_DEVIATION|
+-----+----+-----------+-------+-------------+------------------+-------------------+
|    1|2015|     N598AA|     AA|         1971|3.4513721447256764|-1967.5486278552744|
|    8|2015|     N479AA|     AA|         1898|3.4513721447256764|-1894.5486278552744|
|    9|2015|     N3CAAA|     AA|         1665|3.4513721447256764|-1661.5486278552744|
|   11|2015|     N489AA|     AA|         1638|3.4513721447256764|-1634.5486278552744|
|    7|2015|     N3LEAA|     AA|         1636|3.4513721447256764|-1632.5486278552744|
|   12|2015|     N4XKAA|     AA|         1636|3.4513721447256764|-1632.5486278552744|
|    2|2015|     N028AA|     AA|         1627|3.4513721447256764|-1623.5486278552744|
|    3|2015|     N559AA|     AA|         1598|3.4513721447256764|-1594.5486278552744|
|    1|2015|     N5DGAA|     AA|         1593|3.451372



Now we have an idea of which planes have the largest delays! Let us suppose that we are interested in comparing the top 5 planes with the largest delays between airlines. We can do this with another window function.

##### Determine variables for partitioning the data


In [83]:
windowSpec = Window.partitionBy(ex1_data['AIRLINE']).orderBy(ex1_data['AIRLINE'], 
                                                             ex1_data['DELAY_DEVIATION'])

##### Define function to apply

In this case, we are interested in computing a ranking

In [84]:
rank_func = dense_rank().over(windowSpec)

##### Apply function

In [85]:
top_delay = 5 # We are interested in the top 5

ex2_data = ex1_data.filter(col('DELAY_DEVIATION') < 0)\
.select(col('MONTH'),
        col('YEAR'),
        col('TAIL_NUMBER'), 
        col('AIRLINE'), 
        col('ARRIVAL_DELAY'), 
        col('DELAY_DEVIATION'), 
        rank_func.alias('DELAY_RANK'))\
.filter(col('DELAY_RANK') <= top_delay)\
.sort(col('AIRLINE'),  col('DELAY_RANK'))

ex2_data.show(30)



+-----+----+-----------+-------+-------------+-------------------+----------+
|MONTH|YEAR|TAIL_NUMBER|AIRLINE|ARRIVAL_DELAY|    DELAY_DEVIATION|DELAY_RANK|
+-----+----+-----------+-------+-------------+-------------------+----------+
|    1|2015|     N598AA|     AA|         1971|-1967.5486278552744|         1|
|    8|2015|     N479AA|     AA|         1898|-1894.5486278552744|         2|
|    9|2015|     N3CAAA|     AA|         1665|-1661.5486278552744|         3|
|   11|2015|     N489AA|     AA|         1638|-1634.5486278552744|         4|
|   12|2015|     N4XKAA|     AA|         1636|-1632.5486278552744|         5|
|    7|2015|     N3LEAA|     AA|         1636|-1632.5486278552744|         5|
|    6|2015|     N307AS|     AS|          950| -950.9765630924119|         1|
|    3|2015|     N760AS|     AS|          853| -853.9765630924119|         2|
|    4|2015|     N408AS|     AS|          820| -820.9765630924119|         3|
|    7|2015|     N708AS|     AS|          813| -813.976563092411

                                                                                

In [94]:
# df.groupby('AIRLINE').select(F.expr('percentile_approx(DISTANCE, array(.25, .5, .75))')).show()
# [373, 647, 1065]
ws = Window.partitionBy(F.col('AIRLINE')).orderBy(F.col('DISTANCE').asc())

(df
 .select(F.col('TAIL_NUMBER'),
         F.col('AIRLINE'),
         F.col('DISTANCE'),
         F.percent_rank().over(ws).alias('percent_dist'))
 .filter(F.col('percent_dist') > .5)
).show()

+-----------+-------+--------+------------------+
|TAIL_NUMBER|AIRLINE|DISTANCE|      percent_dist|
+-----------+-------+--------+------------------+
|     N515NK|     NK|     965|0.5070456133176575|
|     N532NK|     NK|     965|0.5070456133176575|
|     N532NK|     NK|     965|0.5070456133176575|
|     N529NK|     NK|     965|0.5070456133176575|
|     N529NK|     NK|     965|0.5070456133176575|
|     N529NK|     NK|     965|0.5070456133176575|
|     N529NK|     NK|     965|0.5070456133176575|
|     N505NK|     NK|     965|0.5070456133176575|
|     N505NK|     NK|     965|0.5070456133176575|
|     N505NK|     NK|     965|0.5070456133176575|
|     N505NK|     NK|     965|0.5070456133176575|
|     N515NK|     NK|     965|0.5070456133176575|
|     N515NK|     NK|     965|0.5070456133176575|
|     N526NK|     NK|     965|0.5070456133176575|
|     N526NK|     NK|     965|0.5070456133176575|
|     N510NK|     NK|     965|0.5070456133176575|
|     N510NK|     NK|     965|0.5070456133176575|


In [14]:
### Otras funciones
from pyspark.sql.functions import col, count, countDistinct, \
approx_count_distinct, first, last, min, max, sum, sumDistinct, avg,\
collect_set, collect_list, percent_rank, lag, lead, when, size, element_at


In [95]:
empsalary = spark.createDataFrame([
  ("sales",     1,  "Alice",  5000, ["game",  "ski"]),
  ("personnel", 2,  "Olivia", 3900, ["game",  "ski"]),
  ("sales",     3,  "Ella",   4800, ["skate", "ski"]),
  ("sales",     4,  "Ebba",   4800, ["game",  "ski"]),
  ("personnel", 5,  "Lilly",  3500, ["climb", "ski"]),
  ("develop",   7,  "Astrid", 4200, ["game",  "ski"]),
  ("develop",   8,  "Saga",   6000, ["kajak", "ski"]),
  ("develop",   9,  "Freja",  4500, ["game",  "kajak"]),
  ("develop",   10, "Wilma",  5200, ["game",  "ski"]),
  ("develop",   11, "Maja",   5200, ["game",  "farming"])
]).toDF("depName", "empNo", "name", "salary", "hobby")
  

In [98]:
ws = Window.partitionBy(F.col('depName'))
empsalary.select(F.col('name'), 
                 F.col('depName'), 
                 F.col('salary'), 
                 F.mean(F.col('salary')).over(ws).alias('mean_dep_salary')).show()

+------+---------+------+-----------------+
|  name|  depName|salary|  mean_dep_salary|
+------+---------+------+-----------------+
|Astrid|  develop|  4200|           5020.0|
|  Saga|  develop|  6000|           5020.0|
| Freja|  develop|  4500|           5020.0|
| Wilma|  develop|  5200|           5020.0|
|  Maja|  develop|  5200|           5020.0|
|Olivia|personnel|  3900|           3700.0|
| Lilly|personnel|  3500|           3700.0|
| Alice|    sales|  5000|4866.666666666667|
|  Ella|    sales|  4800|4866.666666666667|
|  Ebba|    sales|  4800|4866.666666666667|
+------+---------+------+-----------------+



Multiples funciones sobre una misma partición

In [99]:
overCategory = Window.partitionBy("depName")
df = empsalary\
.withColumn("salaries", collect_list("salary").over(overCategory))\
.withColumn("average_salary", (avg("salary").over(overCategory)).cast("int"))\
.withColumn("total_salary", sum("salary").over(overCategory))\
.select("depName", "empNo", "name", "salary", 
        "salaries", "average_salary", "total_salary")
df.show(20, False)

+---------+-----+------+------+------------------------------+--------------+------------+
|depName  |empNo|name  |salary|salaries                      |average_salary|total_salary|
+---------+-----+------+------+------------------------------+--------------+------------+
|develop  |7    |Astrid|4200  |[4200, 6000, 4500, 5200, 5200]|5020          |25100       |
|develop  |8    |Saga  |6000  |[4200, 6000, 4500, 5200, 5200]|5020          |25100       |
|develop  |9    |Freja |4500  |[4200, 6000, 4500, 5200, 5200]|5020          |25100       |
|develop  |10   |Wilma |5200  |[4200, 6000, 4500, 5200, 5200]|5020          |25100       |
|develop  |11   |Maja  |5200  |[4200, 6000, 4500, 5200, 5200]|5020          |25100       |
|personnel|2    |Olivia|3900  |[3900, 3500]                  |3700          |7400        |
|personnel|5    |Lilly |3500  |[3900, 3500]                  |3700          |7400        |
|sales    |1    |Alice |5000  |[5000, 4800, 4800]            |4866          |14600       |

Order by: Default acumulado hasta la fila actual

In [101]:
overCategory = Window.partitionBy("depName").orderBy(col("salary").asc())
df = empsalary\
.withColumn("salaries", collect_list("salary").over(overCategory))\
.withColumn("msalary", (avg("salary").over(overCategory)).cast("int"))\
.withColumn("tsalary", sum("salary").over(overCategory))\
.withColumn("ntile", ntile(3).over(overCategory))\
.withColumn("prank", F.percent_rank().over(overCategory))\
.withColumn("drank", dense_rank().over(overCategory))\
.select("depName", "empNo",  "salary", 
        "salaries", "msalary", "tsalary", 
        "ntile", "prank", "drank")
df.show(20, False)

+---------+-----+------+------------------------------+-------+-------+-----+-----+-----+
|depName  |empNo|salary|salaries                      |msalary|tsalary|ntile|prank|drank|
+---------+-----+------+------------------------------+-------+-------+-----+-----+-----+
|develop  |7    |4200  |[4200]                        |4200   |4200   |1    |0.0  |1    |
|develop  |9    |4500  |[4200, 4500]                  |4350   |8700   |1    |0.25 |2    |
|develop  |10   |5200  |[4200, 4500, 5200, 5200]      |4775   |19100  |2    |0.5  |3    |
|develop  |11   |5200  |[4200, 4500, 5200, 5200]      |4775   |19100  |2    |0.5  |3    |
|develop  |8    |6000  |[4200, 4500, 5200, 5200, 6000]|5020   |25100  |3    |1.0  |4    |
|personnel|5    |3500  |[3500]                        |3500   |3500   |1    |0.0  |1    |
|personnel|2    |3900  |[3500, 3900]                  |3700   |7400   |2    |1.0  |2    |
|sales    |3    |4800  |[4800, 4800]                  |4800   |9600   |1    |0.0  |1    |
|sales    

Lag y lead para sacar interacción entre filas

In [103]:
# import pyspark.sql.functions as F
overCategory = Window.partitionBy("depName").orderBy(col("salary").asc())
df = empsalary\
.withColumn("lead", F.lead("salary", 1).over(overCategory))\
.withColumn("lag", F.lag("salary", 1).over(overCategory))\
.select("depName", "empNo", "name", "salary", "lead", "lag")
df.show(20, False)

+---------+-----+------+------+----+----+
|depName  |empNo|name  |salary|lead|lag |
+---------+-----+------+------+----+----+
|develop  |7    |Astrid|4200  |4500|null|
|develop  |9    |Freja |4500  |5200|4200|
|develop  |10   |Wilma |5200  |5200|4500|
|develop  |11   |Maja  |5200  |6000|5200|
|develop  |8    |Saga  |6000  |null|5200|
|personnel|5    |Lilly |3500  |3900|null|
|personnel|2    |Olivia|3900  |null|3500|
|sales    |3    |Ella  |4800  |4800|null|
|sales    |4    |Ebba  |4800  |5000|4800|
|sales    |1    |Alice |5000  |null|4800|
+---------+-----+------+------+----+----+



In [19]:
overCategory = Window.partitionBy("depName").orderBy(col("salary").desc())
df = empsalary\
.withColumn("lead", lead("salary", 1).over(overCategory))\
.withColumn("lag", lag("salary", 1).over(overCategory))\
.withColumn("h_t_n", 
            when(col("lead").isNull(), 0).otherwise(col("salary") - col("lead")))\
.withColumn("l_t_p", 
            when(col("lag").isNull(), 0).otherwise(col("lag") - col("salary")))\
.select("depName", "empNo", "name", "salary", "lead", "lag", "h_t_n", "l_t_p")
df.show(20, False)

+---------+-----+------+------+----+----+-----+-----+
|depName  |empNo|name  |salary|lead|lag |h_t_n|l_t_p|
+---------+-----+------+------+----+----+-----+-----+
|develop  |8    |Saga  |6000  |5200|null|800  |0    |
|develop  |10   |Wilma |5200  |5200|6000|0    |800  |
|develop  |11   |Maja  |5200  |4500|5200|700  |0    |
|develop  |9    |Freja |4500  |4200|5200|300  |700  |
|develop  |7    |Astrid|4200  |null|4500|0    |300  |
|personnel|2    |Olivia|3900  |3500|null|400  |0    |
|personnel|5    |Lilly |3500  |null|3900|0    |400  |
|sales    |1    |Alice |5000  |4800|null|200  |0    |
|sales    |3    |Ella  |4800  |4800|5000|0    |200  |
|sales    |4    |Ebba  |4800  |null|4800|0    |0    |
+---------+-----+------+------+----+----+-----+-----+



Aplicación de funciones sobre rangos

In [0]:
overCategory = Window.partitionBy("depName").orderBy("salary")\
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)