## Leer archivo de datos raw-flight-data.csv y crear el data schema

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

#create session
appName = "Pre-procesamiento de datos en Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])

flights = spark.read.csv('/FileStore/tables/raw_flight_data-eb9f8.csv', 
                         schema=flightSchema, header=True)
flights.show(2)

## Cargamos el archivo airports.csv en el dataframe airports

In [4]:
airportSchema = StructType([
  StructField("airport_id", IntegerType(), False),
  StructField("city", StringType(), False),
  StructField("state", StringType(), False),
  StructField("name", StringType(), False),
])

airports = spark.read.csv('dataset/airports.csv', header=True, 
                          schema=airportSchema)
airports.show(2)

## Cruzamos los 2 dataframes (flights and airports), y mostramos cuantos vuelos hay desde cada ciudad

In [6]:
flightsByOrigin = flights.join(airports,
                               flights.OriginAirportID == 
                               airports.airport_id).groupBy("city").count()
flightsByOrigin.show(10)

## Manipulando data duplicada
Eliminar data duplicada y calcular cuantos registros duplicados existen

In [8]:
#Cantidad original de registros
n1 = flights.count()
print("Número original de registros: ", n1)

#Cuenta los registros luego de eliminar la data duplicada
n2 = flights.dropDuplicates().count()
print("Número de registros luego de eliminar la data duplicadad: ", n2)

n3 = n1 - n2
print("Número de registros duplicados eliminados: ", n3)

Especificamos el criterio para registros duplicados

In [10]:
df = spark.createDataFrame([("Rony",27, 168), 
                            ("Rony",15, 165), 
                            ("Rony",27, 168)], 
                           ["name","age","height"])
df.show()
df.dropDuplicates().show()

Eliminamos duplicados por un solo campo

In [12]:
df.dropDuplicates(['name']).show()

## Manipulando data en missing

1. Elimina la fila si al menos existe una columna en missing

In [15]:
flightsNoMissingValue = flights.dropDuplicates().dropna(
    how="any", subset=["ArrDelay", "DepDelay"])
# usar how="all" si se busca que todas las columnas estén en missing
# usar how="any" si se busca que al menos una columna esté en missins

numberOfMissingValueAny = n1 - flightsNoMissingValue.count()

print("Número de filas con al menos un valor en missing: ", numberOfMissingValueAny)

2. Completar la data en missing usando el valor del Promedio de la columna respectiva

In [17]:
#Obtener el valor promedio
meanArrDelay = flights.groupBy().avg("ArrDelay").take(1)[0][0]

print("Promedio de ArrDelay: ", meanArrDelay)

meanDepDelay = flights.groupBy().avg("DepDelay").take(1)[0][0]

print("Promedio de DepDelay: ", meanDepDelay)

#Eliminar data duplicada y completarla con el valor promedio del campo
flightsCleanData=flights.fillna(
    {'ArrDelay': meanArrDelay, 'DepDelay': meanDepDelay})

#El promedio de ArrDelay se mantiene
flights.groupBy().avg("ArrDelay").show()

## Explorar las estadísticas de nuestra data

In [19]:
flightsCleanData.describe('DepDelay','ArrDelay').show()

También podemos calcular la correlación entre dos variables para sabes si una variable está relacionada a la otra

In [21]:
correlation = flightsCleanData.corr('DepDelay', 'ArrDelay')
print("correlation between departure delay and arrival delay: ", 
      correlation)