# Projeto inicial (Spark-Databricks) - Engenharia de Dados

* Convertendo os arquivos em csv para parquet e enviando para processing zone.
* base de dados: https://www.kaggle.com/nhs/general-practice-prescribing-data

In [0]:
# Lendo os arquivos .csv do dbfs com spark

df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema", "True")\
.load("/FileStore/tables/raw/*.csv")

In [0]:
df.printSchema()

root
 |-- practice: integer (nullable = true)
 |-- bnf_code: integer (nullable = true)
 |-- bnf_name: integer (nullable = true)
 |-- items: integer (nullable = true)
 |-- nic: double (nullable = true)
 |-- act_cost: double (nullable = true)
 |-- quantity: integer (nullable = true)



In [0]:
# Imprime as 10 primeiras linhas do dataframe
display(df.head(10))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
5668,8092,592,2,44.1,40.84,189
1596,17512,16983,2,1.64,1.64,35
1596,25587,16124,1,1.26,1.28,42
1596,12551,1282,2,0.86,1.02,42
1596,18938,10575,1,1.85,1.82,56
1596,8777,21507,1,3.31,3.18,56
1596,9369,12008,1,63.15,58.56,56
1596,27926,17643,2,158.66,147.07,56
1596,26148,10230,1,0.35,0.44,14
1596,9148,3381,1,0.26,0.35,7


In [0]:
# Conta a quantidade de linhas
df.count()

Out[6]: 131004583

## Leva todos os dados convertidos em parquet para a Processing Zone

In [0]:
# Converte para o formato parquet
df.write.format("parquet")\
.mode("overwrite")\
.save("/FileStore/tables/processing/df-parquet-file-2.parquet")

In [0]:
# Lendo os arquivos parquet
df_parquet = spark.read.format("parquet")\
.load("/FileStore/tables/processing/df-parquet-file-2.parquet")

In [0]:
# Conta a quantidade de linhas do dataframe
df_parquet.count()

Out[9]: 131004583

In [0]:
# Imprime as 20 primeiras linhas
display(df_parquet.head(20))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
3626,12090,20521,3,8.4,7.82,168
3626,23511,11576,1,32.18,29.81,28
3626,14802,14672,162,141.13,133.93,4760
3626,14590,10011,17,15.01,14.12,532
3626,24483,13726,69,57.57,54.67,2121
3626,7768,22070,155,113.03,109.41,4144
3626,1877,13598,102,68.5,67.4,2370
3626,18110,3990,189,156.66,150.44,5222
3626,14058,2144,23,23.52,22.48,588
3626,4558,5695,32,116.64,109.21,756


### Criando um schema

In [0]:
# Visualizando datasets (exemplo Databricks)
display(dbutils.fs.ls("/databricks-datasets"))

path,name,size,modificationTime
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-datasets/COVID/,COVID/,0,0
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,0
dbfs:/databricks-datasets/airlines/,airlines/,0,0
dbfs:/databricks-datasets/amazon/,amazon/,0,0
dbfs:/databricks-datasets/asa/,asa/,0,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,0


In [0]:
# Lendo o arquivo de dados
arquivo ="dbfs:/databricks-datasets/flights/"

In [0]:
# lendo automaticamente o inferSchema
df = spark \
.read \
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [0]:
# imprime o schema do dataframe (.option("inferSchema", "True")\)
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [0]:
display(df)

date,delay,distance,origin,destination
1011245,6,602,ABE,ATL
1020600,-8,369,ABE,DTW
1021245,-2,602,ABE,ATL
1020605,-4,602,ABE,ATL
1031245,-4,602,ABE,ATL
1030605,0,602,ABE,ATL
1041243,10,602,ABE,ATL
1040605,28,602,ABE,ATL
1051245,88,602,ABE,ATL
1050605,9,602,ABE,ATL


In [0]:
# usa o objeto StructType para definir o inferSchema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, TimestampType
schema_df = StructType([
    StructField("data", StringType()),
    StructField("delay", IntegerType()),
    StructField("distance", IntegerType()),
    StructField("origin", StringType()),
    StructField("deldestinationay", StringType()),
])

In [0]:
# verificando o tipo da variável schema_df
type(schema_df)

Out[17]: pyspark.sql.types.StructType

In [0]:
# usando o parâmetro StructType
df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.load(arquivo)

In [0]:
# imprime o schema do dataframe
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [0]:
df.show(10)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 10 rows



In [0]:
# imprime o tipo da variável schema_df-
type(df)

Out[21]: pyspark.sql.dataframe.DataFrame

In [0]:
# retorna as primeiras 10 linhas em formato array
df.take(10)

Out[22]: [Row(data='01011245', delay=6, distance=602, origin='ABE', deldestinationay='ATL'),
 Row(data='01020600', delay=-8, distance=369, origin='ABE', deldestinationay='DTW'),
 Row(data='01021245', delay=-2, distance=602, origin='ABE', deldestinationay='ATL'),
 Row(data='01020605', delay=-4, distance=602, origin='ABE', deldestinationay='ATL'),
 Row(data='01031245', delay=-4, distance=602, origin='ABE', deldestinationay='ATL'),
 Row(data='01030605', delay=0, distance=602, origin='ABE', deldestinationay='ATL'),
 Row(data='01041243', delay=10, distance=602, origin='ABE', deldestinationay='ATL'),
 Row(data='01040605', delay=28, distance=602, origin='ABE', deldestinationay='ATL'),
 Row(data='01051245', delay=88, distance=602, origin='ABE', deldestinationay='ATL'),
 Row(data='01050605', delay=9, distance=602, origin='ABE', deldestinationay='ATL')]

In [0]:
# imprime a quantidade de linhas
df.count()

Out[23]: 1392106

In [0]:
# função "max" traz o valor máximo
from pyspark.sql.functions import max
df.select(max("delay")).take(1)

Out[24]: [Row(max(delay)=1642)]

In [0]:
# filtrando linhas usando "filter"
df.filter("delay < 2").show(2)

+--------+-----+--------+------+----------------+
|    data|delay|distance|origin|deldestinationay|
+--------+-----+--------+------+----------------+
|01020600|   -8|     369|   ABE|             DTW|
|01021245|   -2|     602|   ABE|             ATL|
+--------+-----+--------+------+----------------+
only showing top 2 rows



In [0]:
# usando where para filtrar
df.where("delay < 2").show(2)

+--------+-----+--------+------+----------------+
|    data|delay|distance|origin|deldestinationay|
+--------+-----+--------+------+----------------+
|01020600|   -8|     369|   ABE|             DTW|
|01021245|   -2|     602|   ABE|             ATL|
+--------+-----+--------+------+----------------+
only showing top 2 rows



In [0]:
3 # ordena o df #sort
df.sort("delay").show(10)

+--------------------+-----+--------+------+----------------+
|                data|delay|distance|origin|deldestinationay|
+--------------------+-----+--------+------+----------------+
|Abbotsford\tBC\tC...| null|    null|  null|            null|
|Aberdeen\tSD\tUSA...| null|    null|  null|            null|
|Abilene\tTX\tUSA\...| null|    null|  null|            null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|            null|
|Alamosa\tCO\tUSA\...| null|    null|  null|            null|
|Albany\tGA\tUSA\tABY| null|    null|  null|            null|
|Albany\tNY\tUSA\tALB| null|    null|  null|            null|
|Albuquerque\tNM\t...| null|    null|  null|            null|
|Alexandria\tLA\tU...| null|    null|  null|            null|
|Allentown\tPA\tUS...| null|    null|  null|            null|
+--------------------+-----+--------+------+----------------+
only showing top 10 rows



In [0]:
# funções para ordenação
from pyspark.sql.functions import desc, asc, expr
df.orderBy(expr("delay desc")).show(10)

+--------------------+-----+--------+------+----------------+
|                data|delay|distance|origin|deldestinationay|
+--------------------+-----+--------+------+----------------+
|Abbotsford\tBC\tC...| null|    null|  null|            null|
|Aberdeen\tSD\tUSA...| null|    null|  null|            null|
|Abilene\tTX\tUSA\...| null|    null|  null|            null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|            null|
|Alamosa\tCO\tUSA\...| null|    null|  null|            null|
|Albany\tGA\tUSA\tABY| null|    null|  null|            null|
|Albany\tNY\tUSA\tALB| null|    null|  null|            null|
|Albuquerque\tNM\t...| null|    null|  null|            null|
|Alexandria\tLA\tU...| null|    null|  null|            null|
|Allentown\tPA\tUS...| null|    null|  null|            null|
+--------------------+-----+--------+------+----------------+
only showing top 10 rows



In [0]:
df.orderBy(expr("delay asc")).show(10)

+--------------------+-----+--------+------+----------------+
|                data|delay|distance|origin|deldestinationay|
+--------------------+-----+--------+------+----------------+
|Abbotsford\tBC\tC...| null|    null|  null|            null|
|Aberdeen\tSD\tUSA...| null|    null|  null|            null|
|Abilene\tTX\tUSA\...| null|    null|  null|            null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|            null|
|Alamosa\tCO\tUSA\...| null|    null|  null|            null|
|Albany\tGA\tUSA\tABY| null|    null|  null|            null|
|Albany\tNY\tUSA\tALB| null|    null|  null|            null|
|Albuquerque\tNM\t...| null|    null|  null|            null|
|Alexandria\tLA\tU...| null|    null|  null|            null|
|Allentown\tPA\tUS...| null|    null|  null|            null|
+--------------------+-----+--------+------+----------------+
only showing top 10 rows



In [0]:
# estatísticas descritivas
df.describe().show()

+-------+--------------------+------------------+-----------------+-------+----------------+
|summary|                data|             delay|         distance| origin|deldestinationay|
+-------+--------------------+------------------+-----------------+-------+----------------+
|  count|             1392106|           1391578|          1391578|1391578|         1391578|
|   mean|   2180446.584000322|12.079802928761449|690.5508264718184|   null|            null|
| stddev|   838031.1536741031| 38.80773374985648| 513.662815366331|   null|            null|
|    min|"Cap-aux-Meules, ...|              -112|               21|    ABE|             ABE|
|    max|  Yuma\tAZ\tUSA\tYUM|              1642|             4330|    YUM|             YUM|
+-------+--------------------+------------------+-----------------+-------+----------------+



In [0]:
# iterando sobre todas as linhas do df
for i in df.collect():
    #print(i)
    print(i[0], i[1], i[2] * 2)

01011245 6 1204
01020600 -8 738
01021245 -2 1204
01020605 -4 1204
01031245 -4 1204
01030605 0 1204
01041243 10 1204
01040605 28 1204
01051245 88 1204
01050605 9 1204
01061215 -6 1204
01061725 69 1204
01061230 0 738
01060625 -3 1204
01070600 0 738
01071725 0 1204
01071230 0 738
01070625 0 1204
01071219 0 1138
01080600 0 738
01081230 33 738
01080625 1 1204
01080607 5 1138
01081219 54 1138
01091215 43 1204
01090600 151 738
01091725 0 1204
01091230 -4 738
01090625 8 1204
01091219 83 1138
01101215 -5 1204
01100600 -5 738
01101725 7 1204
01101230 -8 738
01100625 52 1204
01101219 0 1138
01111215 127 1204
01110600 -9 738
01110625 -4 1204
01121215 -5 1204
01121725 -1 1204
01131215 14 1204
01130600 -7 738
01131725 -6 1204
01131230 -13 738
01130625 29 1204
01131219 -8 1138
01140600 -9 738
01141725 -9 1204
01141230 -8 738
01140625 -5 1204
01141219 -10 1138
01150600 0 738
01151725 -6 1204
01151230 0 738
01150625 0 1204
01150607 0 1138
01151219 0 1138
01161215 -10 1204
01160600 -1 738
01161725 -6 12

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<command-2192716957500650>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      2[0m [0;32mfor[0m [0mi[0m [0;32min[0m [0mdf[0m[0;34m.[0m[0mcollect[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0;31m#print(i)[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 4[0;31m     [0mprint[0m[0;34m([0m[0mi[0m[0;34m[[0m[0;36m0[0m[0;34m][0m[0;34m,[0m [0mi[0m[0;34m[[0m[0;36m1[0m[0;34m][0m[0;34m,[0m [0mi[0m[0;34m[[0m[0;36m2[0m[0;34m][0m [0;34m*[0m [0;36m2[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mTypeError[0m: unsupported operand type(s) for *: 'NoneType' and 'int'

In [0]:
# adiciona uma coluna no df
df = df.withColumn('Nova_Coluna', df['delay']+2)
df.show()

+--------+-----+--------+------+----------------+-----------+
|    data|delay|distance|origin|deldestinationay|Nova_Coluna|
+--------+-----+--------+------+----------------+-----------+
|01011245|    6|     602|   ABE|             ATL|          8|
|01020600|   -8|     369|   ABE|             DTW|         -6|
|01021245|   -2|     602|   ABE|             ATL|          0|
|01020605|   -4|     602|   ABE|             ATL|         -2|
|01031245|   -4|     602|   ABE|             ATL|         -2|
|01030605|    0|     602|   ABE|             ATL|          2|
|01041243|   10|     602|   ABE|             ATL|         12|
|01040605|   28|     602|   ABE|             ATL|         30|
|01051245|   88|     602|   ABE|             ATL|         90|
|01050605|    9|     602|   ABE|             ATL|         11|
|01061215|   -6|     602|   ABE|             ATL|         -4|
|01061725|   69|     602|   ABE|             ATL|         71|
|01061230|    0|     369|   ABE|             DTW|          2|
|0106062

In [0]:
# removendo a coluna
df = df.drop('Nova_Coluna')
df.show(10)

+--------+-----+--------+------+----------------+
|    data|delay|distance|origin|deldestinationay|
+--------+-----+--------+------+----------------+
|01011245|    6|     602|   ABE|             ATL|
|01020600|   -8|     369|   ABE|             DTW|
|01021245|   -2|     602|   ABE|             ATL|
|01020605|   -4|     602|   ABE|             ATL|
|01031245|   -4|     602|   ABE|             ATL|
|01030605|    0|     602|   ABE|             ATL|
|01041243|   10|     602|   ABE|             ATL|
|01040605|   28|     602|   ABE|             ATL|
|01051245|   88|     602|   ABE|             ATL|
|01050605|    9|     602|   ABE|             ATL|
+--------+-----+--------+------+----------------+
only showing top 10 rows



In [0]:
# renomeando uma coluna
df.withColumnRenamed('Nova_Coluna', 'New_Column').show(10)

+--------+-----+--------+------+----------------+----------+
|    data|delay|distance|origin|deldestinationay|New_Column|
+--------+-----+--------+------+----------------+----------+
|01011245|    6|     602|   ABE|             ATL|         8|
|01020600|   -8|     369|   ABE|             DTW|        -6|
|01021245|   -2|     602|   ABE|             ATL|         0|
|01020605|   -4|     602|   ABE|             ATL|        -2|
|01031245|   -4|     602|   ABE|             ATL|        -2|
|01030605|    0|     602|   ABE|             ATL|         2|
|01041243|   10|     602|   ABE|             ATL|        12|
|01040605|   28|     602|   ABE|             ATL|        30|
|01051245|   88|     602|   ABE|             ATL|        90|
|01050605|    9|     602|   ABE|             ATL|        11|
+--------+-----+--------+------+----------------+----------+
only showing top 10 rows



### Trabalhando com missing values

In [0]:
# checa valores null
df.filter('delay is null').show(10)

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford\tBC\tC...| null|    null|  null|       null|
|Aberdeen\tSD\tUSA...| null|    null|  null|       null|
|Abilene\tTX\tUSA\...| null|    null|  null|       null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|       null|
|Alamosa\tCO\tUSA\...| null|    null|  null|       null|
|Albany\tGA\tUSA\tABY| null|    null|  null|       null|
|Albany\tNY\tUSA\tALB| null|    null|  null|       null|
|Albuquerque\tNM\t...| null|    null|  null|       null|
|Alexandria\tLA\tU...| null|    null|  null|       null|
|Allentown\tPA\tUS...| null|    null|  null|       null|
+--------------------+-----+--------+------+-----------+
only showing top 10 rows



In [0]:
# conta a quantidade de linhas nulas

delay_nulos = df.filter('delay is null').count()
date_nulos = df.filter("date is null").count()
distance_nulos = df.filter("distance is null").count()
origin_nulos = df.filter("origin is null").count()
destination_nulos = df.filter("destination is null").count()

print('Valores nulos coluna delay: {0}'.format(df.filter('delay is null').count()))
print(f'Valores nulos coluna date: {date_nulos}')
print(f'Valores nulos coluna distance: {distance_nulos}')
print(f'Valores nulos coluna origin: {origin_nulos}')
print(f'Valores nulos coluna deldestinationay: {destination_nulos}')

Valores nulos coluna delay: 526
Valores nulos coluna date: 0
Valores nulos coluna distance: 527
Valores nulos coluna origin: 528
Valores nulos coluna deldestinationay: 528


In [0]:
# completa os dados missing com '0'
df = df.na.fill(value = 0)

In [0]:
# checa valores null
df.filter('delay is null').show(10)

+----+-----+--------+------+----------------+-----------+
|data|delay|distance|origin|deldestinationay|Nova_Coluna|
+----+-----+--------+------+----------------+-----------+
+----+-----+--------+------+----------------+-----------+



In [0]:
# completa os dados missing com '0' na coluna delay
df.na.fill(value=0, subset=['delay']).show(20)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
df.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
df.na.fill("").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
# remove linhas nulas do df
df.na.drop().show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
# quantidade de linhas antes de apagar as linhas com valores null
df.count()

Out[69]: 1392106

In [0]:
# remove linhas nulas do df
df = df.na.drop()
df.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
# quantidade de linhas após de apagar as linhas com valores null
df.count()

Out[72]: 1391578

In [0]:
# retorna o valor máximo da coluna 'delay'
from pyspark.sql.functions import max
df.select(max('delay')).take(1)

Out[73]: [Row(max(delay)='995')]

In [0]:
# filtra linhas
df.filter('delay > 2').show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01081230|   33|     369|   ABE|        DTW|
|01080607|    5|     569|   ABE|        ORD|
|01081219|   54|     569|   ABE|        ORD|
|01091215|   43|     602|   ABE|        ATL|
|01090600|  151|     369|   ABE|        DTW|
|01090625|    8|     602|   ABE|        ATL|
|01091219|   83|     569|   ABE|        ORD|
|01101725|    7|     602|   ABE|        ATL|
|01100625|   52|     602|   ABE|        ATL|
|01111215|  127|     602|   ABE|        ATL|
|01131215|   14|     602|   ABE|        ATL|
|01130625|   29|     602|   ABE|        ATL|
|01161219|   68|     569|   ABE|        ORD|
|01171725|