**Stack - Data Engineering**

Neste vídeo veremos sobre:
  - Conhecendo um mnotebook no databricks
  - PySpark Dataframe
  - Lendo um dataset
  - Checando os datatypes de algumas colunas de um dataframe pyspark
  - Selecionando colunas e índices
  - Visualizando estatísticas descritivas de dados de dataframes
  - Adicionando colunas em Dataframes
  - Removendo colunas em Dataframes
  - Renomeando colunas em Dataframes
  - Trabalhando com missing values

In [0]:
print("Olá Mundo")

Olá Mundo


In [0]:
# visualizando datasets de exemplos da databricks
display(dbutils.fs.ls("/databricks-datasets"))

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/,COVID/,0,0
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,0
dbfs:/databricks-datasets/airlines/,airlines/,0,0
dbfs:/databricks-datasets/amazon/,amazon/,0,0
dbfs:/databricks-datasets/asa/,asa/,0,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,0
dbfs:/databricks-datasets/bikeSharing/,bikeSharing/,0,0


In [0]:
# lendo o arquivo de dados
arquivo = "dbfs:/databricks-datasets/flights/"

In [0]:
# lendo o arquivo de dados
# inferSchema = True
# header = True

df = spark \
.read.format("csv")\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [0]:
# imprime os datatypes das colunas do dataframe
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [0]:
# imprime o tipo da variável df
type(df)

pyspark.sql.dataframe.DataFrame

In [0]:
# retorna as primeiras 5 linhas do dataframe em formarto de array.
df.take(5)

[Row(date='01011245', delay='6', distance='602', origin='ABE', destination='ATL'),
 Row(date='01020600', delay='-8', distance='369', origin='ABE', destination='DTW'),
 Row(date='01021245', delay='-2', distance='602', origin='ABE', destination='ATL'),
 Row(date='01020605', delay='-4', distance='602', origin='ABE', destination='ATL'),
 Row(date='01031245', delay='-4', distance='602', origin='ABE', destination='ATL')]

In [0]:
# usando o comando display
display(df.show(3))

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 3 rows



In [0]:
# imprime a quantidade de linhas no dataframe
df.count()

1392106

In [0]:
# lendo o arquivo previamente com a opção inferSchema desligada
df = spark \
  .read\
  .option("inferSchema", "False")\
  .option("header", "True")\
  .csv(arquivo)

# Consultando dados do dataframe

In [0]:
from pyspark.sql.functions import max

df.select(max("delay")).take(1)

[Row(max(delay)='995')]

In [0]:
# filtrando linhas de um dataframe usando o filter
df.filter("delay < 2").show(2)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 2 rows



In [0]:
# usando where (um alias para o método filter)
df.where("delay < 2").show(2)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 2 rows



In [0]:
# ordena o dataframe pela coluna delay
df.sort("delay").show(5)

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford\tBC\tC...| NULL|    NULL|  NULL|       NULL|
|Aberdeen\tSD\tUSA...| NULL|    NULL|  NULL|       NULL|
|Abilene\tTX\tUSA\...| NULL|    NULL|  NULL|       NULL|
| Akron\tOH\tUSA\tCAK| NULL|    NULL|  NULL|       NULL|
|Alamosa\tCO\tUSA\...| NULL|    NULL|  NULL|       NULL|
+--------------------+-----+--------+------+-----------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import desc, asc, expr

# ordenando por ordem crescente
df.orderBy(expr("delay desc")).show(10)

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford\tBC\tC...| NULL|    NULL|  NULL|       NULL|
|Aberdeen\tSD\tUSA...| NULL|    NULL|  NULL|       NULL|
|Abilene\tTX\tUSA\...| NULL|    NULL|  NULL|       NULL|
| Akron\tOH\tUSA\tCAK| NULL|    NULL|  NULL|       NULL|
|Alamosa\tCO\tUSA\...| NULL|    NULL|  NULL|       NULL|
|Albany\tGA\tUSA\tABY| NULL|    NULL|  NULL|       NULL|
|Albany\tNY\tUSA\tALB| NULL|    NULL|  NULL|       NULL|
|Albuquerque\tNM\t...| NULL|    NULL|  NULL|       NULL|
|Alexandria\tLA\tU...| NULL|    NULL|  NULL|       NULL|
|Allentown\tPA\tUS...| NULL|    NULL|  NULL|       NULL|
+--------------------+-----+--------+------+-----------+
only showing top 10 rows



In [0]:
# visualizando estatísticas descritivas
df.describe().show()

+-------+--------------------+--------------------+--------------------+-------+-----------+
|summary|                date|               delay|            distance| origin|destination|
+-------+--------------------+--------------------+--------------------+-------+-----------+
|  count|             1392106|             1391580|             1391579|1391578|    1391578|
|   mean|   2180446.584000322|  12.079802928761449|   690.5508264718184|   NULL|       NULL|
| stddev|   838031.1536741031|   38.80773374985648|    513.662815366331|   NULL|       NULL|
|    min|"Cap-aux-Meules, ...| airline and rout...| dataset can be f...|    ABE|        ABE|
|    max|  Yuma\tAZ\tUSA\tYUM|                 995|                 999|    YUM|        YUM|
+-------+--------------------+--------------------+--------------------+-------+-----------+



In [0]:
# iterando sobre todas as linhas do dataframe
for i in df.collect():
  print(i)
  print(i[0], i[1] * 2)

Row(date='01011245', delay='6', distance='602', origin='ABE', destination='ATL')
01011245 66
Row(date='01020600', delay='-8', distance='369', origin='ABE', destination='DTW')
01020600 -8-8
Row(date='01021245', delay='-2', distance='602', origin='ABE', destination='ATL')
01021245 -2-2
Row(date='01020605', delay='-4', distance='602', origin='ABE', destination='ATL')
01020605 -4-4
Row(date='01031245', delay='-4', distance='602', origin='ABE', destination='ATL')
01031245 -4-4
Row(date='01030605', delay='0', distance='602', origin='ABE', destination='ATL')
01030605 00
Row(date='01041243', delay='10', distance='602', origin='ABE', destination='ATL')
01041243 1010
Row(date='01040605', delay='28', distance='602', origin='ABE', destination='ATL')
01040605 2828
Row(date='01051245', delay='88', distance='602', origin='ABE', destination='ATL')
01051245 8888
Row(date='01050605', delay='9', distance='602', origin='ABE', destination='ATL')
01050605 99
Row(date='01061215', delay='-6', distance='602', 

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-1970076144345517>, line 4[0m
[1;32m      2[0m [38;5;28;01mfor[39;00m i [38;5;129;01min[39;00m df[38;5;241m.[39mcollect():
[1;32m      3[0m   [38;5;28mprint[39m(i)
[0;32m----> 4[0m   [38;5;28mprint[39m(i[[38;5;241m0[39m], [43mi[49m[43m[[49m[38;5;241;43m1[39;49m[43m][49m[43m [49m[38;5;241;43m*[39;49m[43m [49m[38;5;241;43m2[39;49m)

[0;31mTypeError[0m: unsupported operand type(s) for *: 'NoneType' and 'int'

In [0]:
# adicionando uma coluna ao dataframe
df = df.withColumn("Nova Coluna", df["delay"] + 2)
df.show(10)

+--------+-----+--------+------+-----------+-----------+
|    date|delay|distance|origin|destination|Nova Coluna|
+--------+-----+--------+------+-----------+-----------+
|01011245|    6|     602|   ABE|        ATL|        8.0|
|01020600|   -8|     369|   ABE|        DTW|       -6.0|
|01021245|   -2|     602|   ABE|        ATL|        0.0|
|01020605|   -4|     602|   ABE|        ATL|       -2.0|
|01031245|   -4|     602|   ABE|        ATL|       -2.0|
|01030605|    0|     602|   ABE|        ATL|        2.0|
|01041243|   10|     602|   ABE|        ATL|       12.0|
|01040605|   28|     602|   ABE|        ATL|       30.0|
|01051245|   88|     602|   ABE|        ATL|       90.0|
|01050605|    9|     602|   ABE|        ATL|       11.0|
+--------+-----+--------+------+-----------+-----------+
only showing top 10 rows



In [0]:
# renomeando uma coluna no dataframe
df.withColumnRenamed("Nova Coluna", "New Column").show()

+--------+-----+--------+------+-----------+----------+
|    date|delay|distance|origin|destination|New Column|
+--------+-----+--------+------+-----------+----------+
|01011245|    6|     602|   ABE|        ATL|       8.0|
|01020600|   -8|     369|   ABE|        DTW|      -6.0|
|01021245|   -2|     602|   ABE|        ATL|       0.0|
|01020605|   -4|     602|   ABE|        ATL|      -2.0|
|01031245|   -4|     602|   ABE|        ATL|      -2.0|
|01030605|    0|     602|   ABE|        ATL|       2.0|
|01041243|   10|     602|   ABE|        ATL|      12.0|
|01040605|   28|     602|   ABE|        ATL|      30.0|
|01051245|   88|     602|   ABE|        ATL|      90.0|
|01050605|    9|     602|   ABE|        ATL|      11.0|
|01061215|   -6|     602|   ABE|        ATL|      -4.0|
|01061725|   69|     602|   ABE|        ATL|      71.0|
|01061230|    0|     369|   ABE|        DTW|       2.0|
|01060625|   -3|     602|   ABE|        ATL|      -1.0|
|01070600|    0|     369|   ABE|        DTW|    

In [0]:
# Removendo uma coluna
df = df.drop("Nova Coluna")
df.show(10)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 10 rows



# Trabalhando com Missing Values

In [0]:
# filtrando os valores missing
df.filter(df.delay.isNull()).show(10)

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford\tBC\tC...| NULL|    NULL|  NULL|       NULL|
|Aberdeen\tSD\tUSA...| NULL|    NULL|  NULL|       NULL|
|Abilene\tTX\tUSA\...| NULL|    NULL|  NULL|       NULL|
| Akron\tOH\tUSA\tCAK| NULL|    NULL|  NULL|       NULL|
|Alamosa\tCO\tUSA\...| NULL|    NULL|  NULL|       NULL|
|Albany\tGA\tUSA\tABY| NULL|    NULL|  NULL|       NULL|
|Albany\tNY\tUSA\tALB| NULL|    NULL|  NULL|       NULL|
|Albuquerque\tNM\t...| NULL|    NULL|  NULL|       NULL|
|Alexandria\tLA\tU...| NULL|    NULL|  NULL|       NULL|
|Allentown\tPA\tUS...| NULL|    NULL|  NULL|       NULL|
+--------------------+-----+--------+------+-----------+
only showing top 10 rows



# Preenchendo dados Missing

In [0]:
# preenche os dados missing com o valor 0
df.na.fill(value=0).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
# preenche valores missing com valor 0 apenas da coluna delay
df.na.fill(value=0, subset=["delay"]).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
# preenche os dados com valores de string vazia
df.na.fill("").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
# remove qualquer linha nula de qualquer coluna
df.na.drop().show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|