In [1]:
from pyspark.sql import SparkSession

spark = (SparkSession
        .builder
        .appName("PythonMnMCount")
        .getOrCreate())

In [4]:
from pyspark.sql import functions as F

flightsDF = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("C:/Users/laura.serrano/Desktop/flights-jan-apr-2018.csv") 
flightsDF = flightsDF.withColumn("ArrDelay",\
                                 F.when(F.rand(seed = 123) < 0.1, "NA").otherwise(F.col("ArrDelay")))

flightsDF.printSchema()

root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightDate: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCity: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DestCity: string (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: double (nullable = true)
 |-- LateAircraftDelay: double (nullable = true)



In [5]:
# Extraemos los nombres de columna. Esto son solo metadatos de DataFrame, y están en el driver. No es necesaria ninguna
# operación sobre el cluster para recuperar la variable interna columns de cualquier DataFrame
print("Los datos tienen {0} columnas".format(len(flightsDF.columns)))

flightsDF.cache()        # Esta línea no hace cálculos, pero Spark anota que debe mantener este DF en memoria tras la primera vez que sea materializado
rows = flightsDF.count() # Esto es una acción que obligará a que flightsDF sea materializado. Para ello, habrá que llevar a cabo
                         # las transformaciones que lo generan en la celda anterior: read y withColumn, que están pendientes

print("Los datos tienen {0} filas".format(rows))

Los datos tienen 23 columnas
Los datos tienen 2503113 filas


In [6]:
#Seleccionar columnas por nombre

In [7]:
flightsDF.select("Month", "DayofMonth", "ArrTime")\
         .show() # los nombres son sensibles a mayúsculas

+-----+----------+-------+
|Month|DayofMonth|ArrTime|
+-----+----------+-------+
|    1|        14|   null|
|    1|         3|   1506|
|    1|         6|   1543|
|    1|         7|   1455|
|    1|         8|   1509|
|    1|         9|   1504|
|    1|        10|   1455|
|    1|        11|   1452|
|    1|        12|   1748|
|    1|        13|   1514|
|    1|        15|   1456|
|    1|        16|   1511|
|    1|        17|   1622|
|    1|        18|   1509|
|    1|        19|   1449|
|    1|        20|   1533|
|    1|        21|   1508|
|    1|        22|   1504|
|    1|        23|   1616|
|    1|        24|   1515|
+-----+----------+-------+
only showing top 20 rows



In [8]:
#Filtramos (retenemos) filas en base a los valores de una o varias columnas

In [9]:
# La función col se utiliza para decir que nos estamos refiriendo a la columna cuyo nombre se pasa como argumento

flightsJanuary20 = flightsDF\
                      .where((F.col("DayofMonth") == 20) & (F.col("Month") == 1))\
                      .select("Month", "ArrTime") # encadenamos dos transformaciones: esto no desencadena ninguna operación

In [11]:
# Cúantos vuelos hay el 20 de enero de 2018
# La operación count() es una acción, así que obliga a materializar flightsJanuary20. Para ello es necesario ejecutar
# las transformaciones where() y select() que pusimos en esta celda, aplicadas a flightsDF. De hecho, si no hubiésemos
# cacheado flightsDF en las celdas anteriores, también habría que materializarlo otra vez, y para eso se leería de 
# nuevo el CSV desde HDFS
rowsJanuary20 = flightsJanuary20.count()

print("Hubo {0} vuelos el 20 de enero de 2018".format(rowsJanuary20))

# Esto es otra acción aplicada sobre el DataFrame flightsJanuary. Como flightsJanuary NO ha sido cacheado,
# entonces las operaciones "where" y "select" se necesitan ejecutar DE NUEVO para poder hacer el show()
flightsJanuary20.show(3)

Hubo 16176 vuelos el 20 de enero de 2018
+-----+-------+
|Month|ArrTime|
+-----+-------+
|    1|   1533|
|    1|   1734|
|    1|   2025|
+-----+-------+
only showing top 3 rows



In [12]:
# También podemos indicar el filtrado como un string con un trozo de código SQL
# Recordemos que where y filter son exactamente equivalentes
flightsJanuary31 = flightsDF.filter("DayofMonth = 31 and Month = 1")
# transformación filter: Spark no ejecuta nada

flightsJanuary31.count() 
# acción count: obliga a materializar flightsJanuary31 para lo cual se tiene que ejecutar filter

20519

In [13]:
# muestra por pantalla el retraso en la llegada, el aeropuerto de origen y de destino de aquellos vuelos que tuvieron lugar en Domingo y con un retraso a la llegada mayor de 15 minutos. Muestra el esquema de dicho DataFrame resultante.

In [15]:
flightsDF.select("Origin", "Dest","ArrDelay")\
        .where((F.col("DayofWeek") == 7) & (F.col("ArrDelay") > 15)).show()

+------+----+--------+
|Origin|Dest|ArrDelay|
+------+----+--------+
|   BOS| BUF|    21.0|
|   BOS| JFK|    16.0|
|   LGA| MSN|    22.0|
|   BOS| JFK|   136.0|
|   DTW| JFK|   119.0|
|   CVG| DTW|    90.0|
|   GNV| ATL|    61.0|
|   ATL| SHV|    48.0|
|   MSP| EWR|   158.0|
|   LAS| FLL|    17.0|
|   MCO| BOS|   238.0|
|   BOS| FLL|    61.0|
|   SLC| JFK|    34.0|
|   JFK| PWM|    80.0|
|   MCO| EWR|    23.0|
|   OAK| JFK|    19.0|
|   SJU| BDL|    63.0|
|   LGB| LAS|    29.0|
|   LAX| JFK|    50.0|
|   JFK| MCO|    61.0|
+------+----+--------+
only showing top 20 rows



In [16]:
#SELECCIONAR FILAS ÚNICAS

In [17]:
distinctFlights = flightsDF.distinct()  
# distinct es una transformación que devuelve el DF sin las filas repetidas
distinctFlightsCount = distinctFlights.count() 
# count es una acción y provoca que se ejecute la transformación distinct

print("Hay {0} filas distintas".format(distinctFlightsCount))

Hay 2483880 filas distintas


In [18]:
# ¿cuántas combinaciones de Origin y Dest existen?

In [22]:
distinctFlights.select("Origin","Dest").distinct().count()

5795

In [None]:
#otra forma de hacerlo

In [32]:
from pyspark.sql.functions import countDistinct
distinctFlights.select(countDistinct("Origin","Dest")).show()

+----------------------------+
|count(DISTINCT Origin, Dest)|
+----------------------------+
|                        5795|
+----------------------------+



In [33]:
#¿cuántos aeropuertos de origen existen?

In [34]:
distinctFlights.select("Origin").distinct().count()

356

In [35]:
#vamos a hacer esto para cada columna, en un bucle, para hacernos a la idea de cuántos valores hay.
#Esto solo tiene sentido en realidad para columnas categóricas y no para numéricas donde casi todos los valores serán distintos.

In [36]:
#defino la funcion
def dfConteos(sparkDF):
    colsRecuento = [F.countDistinct(c).alias(c) for c in sparkDF.columns]
    return sparkDF.select(colsRecuento)

conteosDF = dfConteos(flightsDF)
conteosDF.show()

+-----+----------+---------+----------+------+----------+----+--------+-------+--------+-------+--------+---------+----------------+--------+-----------------+-------+--------+------------+------------+--------+-------------+-----------------+
|Month|DayofMonth|DayOfWeek|FlightDate|Origin|OriginCity|Dest|DestCity|DepTime|DepDelay|ArrTime|ArrDelay|Cancelled|CancellationCode|Diverted|ActualElapsedTime|AirTime|Distance|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+-----+----------+---------+----------+------+----------+----+--------+-------+--------+-------+--------+---------+----------------+--------+-----------------+-------+--------+------------+------------+--------+-------------+-----------------+
|    4|        31|        7|       120|   356|       350| 356|     350|   1440|    1378|   1440|    1379|        2|               5|       2|              705|    669|    1466|        1094|         748|     609|          128|              739|
+-----+----------+------

In [45]:
from pyspark.sql.functions import udf
for columnName in conteosDF.columns:

    distinctValues = conteosDF.select(columnName).first()
    
    # No olvidéis indentar este comando para indicar que está dentro del cuerpo del bucle
    print("Existen {0} valores distintos en la columna {1}".format(distinctValues, columnName))

Existen Row(Month=4) valores distintos en la columna Month
Existen Row(DayofMonth=31) valores distintos en la columna DayofMonth
Existen Row(DayOfWeek=7) valores distintos en la columna DayOfWeek
Existen Row(FlightDate=120) valores distintos en la columna FlightDate
Existen Row(Origin=356) valores distintos en la columna Origin
Existen Row(OriginCity=350) valores distintos en la columna OriginCity
Existen Row(Dest=356) valores distintos en la columna Dest
Existen Row(DestCity=350) valores distintos en la columna DestCity
Existen Row(DepTime=1440) valores distintos en la columna DepTime
Existen Row(DepDelay=1378) valores distintos en la columna DepDelay
Existen Row(ArrTime=1440) valores distintos en la columna ArrTime
Existen Row(ArrDelay=1379) valores distintos en la columna ArrDelay
Existen Row(Cancelled=2) valores distintos en la columna Cancelled
Existen Row(CancellationCode=5) valores distintos en la columna CancellationCode
Existen Row(Diverted=2) valores distintos en la columna D

In [43]:
for columnName in flightsDF.columns:

    distinctValues = flightsDF.select(columnName).distinct().count()
    
    # No olvidéis indentar este comando para indicar que está dentro del cuerpo del bucle
    print("Existen {0} valores distintos en la columna {1}".format(distinctValues, columnName))

Existen 4 valores distintos en la columna Month
Existen 31 valores distintos en la columna DayofMonth
Existen 7 valores distintos en la columna DayOfWeek
Existen 120 valores distintos en la columna FlightDate
Existen 356 valores distintos en la columna Origin
Existen 350 valores distintos en la columna OriginCity
Existen 356 valores distintos en la columna Dest
Existen 350 valores distintos en la columna DestCity
Existen 1441 valores distintos en la columna DepTime
Existen 1379 valores distintos en la columna DepDelay
Existen 1441 valores distintos en la columna ArrTime
Existen 1380 valores distintos en la columna ArrDelay
Existen 2 valores distintos en la columna Cancelled
Existen 6 valores distintos en la columna CancellationCode
Existen 2 valores distintos en la columna Diverted
Existen 706 valores distintos en la columna ActualElapsedTime
Existen 670 valores distintos en la columna AirTime
Existen 1466 valores distintos en la columna Distance
Existen 1095 valores distintos en la co

In [46]:
# Crear una nueva columna o reemplazar una existente por el resultado de operar con columnas existentes

In [47]:
# withColumn is a transformation returing a new DataFrame with one extra column appended on the right
flightsWithKm = flightsDF.withColumn("DistanceKm", F.col("Distance") * 1.61)

flightsWithKm.printSchema()

flightsWithKm.show(3)

root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightDate: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCity: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DestCity: string (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: double (nullable = true)
 |-- LateAircraftDelay: double (nullable = true)
 |-- DistanceKm: double (nullable =

In [48]:
flightsCategoricalDay = flightsDF.withColumn("DayOfWeek", F.when(F.col("DayOfWeek") == 1, "Monday")\
                                                           .when(F.col("DayOfWeek") == 2, "Tuesday")\
                                                           .when(F.col("DayOfWeek") == 3, "Wednesday")\
                                                           .when(F.col("DayOfWeek") == 4, "Thursday")\
                                                           .when(F.col("DayOfWeek") == 5, "Friday")\
                                                           .when(F.col("DayOfWeek") == 6, "Saturday")\
                                                           .otherwise("Sunday"))

flightsCategoricalDay.printSchema() # the column is still in the same position but has now string type

flightsCategoricalDay.select("DayOfWeek", "DepTime", "ArrTime").show()

root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: string (nullable = false)
 |-- FlightDate: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCity: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DestCity: string (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: double (nullable = true)
 |-- LateAircraftDelay: double (nullable = true)

+---------+-------+-------+
|DayOf

In [None]:
#SI EL NOMBRE DE LA COLUMNA PASADA COMO PRIEMR PARAMETRO EN WITHCOLUMN ES EL MISMO 
#DE UNA YA EXISTENTE REEMPALZA EL VALOR SIN CREA OTRO

In [49]:
flightsFindeLaborable = flightsDF.withColumn("Laborable", F.when(F.col("DayOfWeek")<=5, "Laborable")\
                                             .otherwise("Finde"))
flightsFindeLaborable.printSchema()
flightsFindeLaborable.select("DayOfWeek", "Laborable").show()

root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightDate: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCity: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DestCity: string (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: double (nullable = true)
 |-- LateAircraftDelay: double (nullable = true)
 |-- Laborable: string (nullable = 

In [50]:
#CREAR Y SELECCIONAR COLUMNAS AL VUELO

In [51]:
flightsDF.selectExpr("Origin", "Dest", "1.6*Distance AS DistanceKm").show()

+------+----+----------+
|Origin|Dest|DistanceKm|
+------+----+----------+
|   SYR| DTW|     598.4|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
|   SYR| LGA|     316.8|
+------+----+----------+
only showing top 20 rows



In [52]:
# crear un DataFrame con tres columnas seleccionando "Origin", 
# "OriginCity" y una nueva columna de tipo string creada concatenando Origin
#y OriginCity con un guión "-". Utilizar `withColumn` y dentro la función 
#`concat_ws` (concatenar con separador) con sintaxis 
#F.concat_ws("-", columna1, columna2) del paquete pyspark.sql.functions.

In [54]:
flightsConcat = flightsDF.withColumn("Concat",\
                                     F.concat_ws("-","Origin","OriginCity"))\
                         .select("Origin", "OriginCity", "Concat")
flightsConcat.show()

+------+------------+----------------+
|Origin|  OriginCity|          Concat|
+------+------------+----------------+
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
|   SYR|Syracuse, NY|SYR-Syracuse, NY|
+------+------------+----------------+
only showing top 20 rows



In [55]:
#Convertir el tipo de dato de una columna

In [56]:
naCount = flightsDF.where("ArrDelay = 'NA'").count()

# Esto es sintaxis SQL, pero también podríamos haberla llamado como .where(F.col("ArrDelay") == "NA"). Ambas son equivalentes.

print("Hay ", naCount, "filas con NA en ArrDelay")

Hay  250196 filas con NA en ArrDelay


In [57]:
flightsDF.where(F.col("ArrDelay") != "NA").count()

2196454

In [58]:
#from pyspark.sql.types import DoubleType
from pyspark.sql import types as T

flightsDF = flightsDF.where((F.col("ArrDelay") != "NA"))\
                     .withColumn("ArrDelay", F.col("ArrDelay").cast(T.DoubleType()))

In [59]:
flightsDF.is_cached

False

In [60]:
#Ordenacion respecto a una columna

In [61]:
# Ordenamos los vuelos según ArrDelay
sortedDF = flightsDF.orderBy("ArrDelay")  # equivalente: flightsDF.orderBy(F.col("ArrDelay"))

# Orden ascendentemente por aeropuerto de origen ("Origin") y deshago los empates por arr_delay descendentemente
sortedDF = flightsDF.orderBy(F.col("Origin"), F.col("ArrDelay").desc())  # equivalente: flightsDF.orderBy(F.col("ArrDelay"))

sortedDescDF = flightsDF.orderBy("ArrDelay", ascending = False)
sortedDescDF.select("ArrDelay", "Origin", "Dest").show(10)

+--------+------+----+
|ArrDelay|Origin|Dest|
+--------+------+----+
|  2475.0|   HNL| PPG|
|  2454.0|   PPG| HNL|
|  1778.0|   ORF| DFW|
|  1757.0|   SMF| DFW|
|  1717.0|   HNL| JFK|
|  1704.0|   EGE| DFW|
|  1685.0|   ORF| JFK|
|  1650.0|   ABQ| DFW|
|  1648.0|   SLC| DFW|
|  1576.0|   IAH| MIA|
+--------+------+----+
only showing top 10 rows



In [62]:
primeraFila = sortedDescDF.first()

In [63]:
primeraFila.ArrTime

1445

In [64]:

listaRows = sortedDescDF.take(20)  # devuelve una lista de Python, de objetos Row
distancias = [r.Distance for r in listaRows]  # usamos sintaxis de listas por comprensión
distancias

[2599.0,
 2599.0,
 1212.0,
 1431.0,
 4983.0,
 721.0,
 290.0,
 569.0,
 989.0,
 964.0,
 861.0,
 1810.0,
 282.0,
 1235.0,
 1363.0,
 228.0,
 1172.0,
 861.0,
 715.0,
 258.0]

In [65]:
flightsDF.printSchema()

root
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightDate: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCity: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- DestCity: string (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: double (nullable = true)
 |-- LateAircraftDelay: double (nullable = true)



In [66]:

flightsDF.count()

2196454

In [69]:
flightsDF.select(F.col("Arrdelay").alias("VarArrDelay")).where(F.col("ArrDelay")>15).show()

+-----------+
|VarArrDelay|
+-----------+
|       24.0|
|      149.0|
|       63.0|
|       57.0|
|       21.0|
|       61.0|
|       17.0|
|       25.0|
|      131.0|
|       25.0|
|       16.0|
|       58.0|
|       34.0|
|       43.0|
|       70.0|
|      194.0|
|       32.0|
|       16.0|
|       56.0|
|       61.0|
+-----------+
only showing top 20 rows

