# 1 - Criando o ambiente e importando as bibliotecas

In [None]:
# Importação da biblioteca pandas
import pandas as pd

In [None]:
# Instalação dos requisitos para o PySpark
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
! tar xf spark-3.1.1-bin-hadoop3.2.tgz
! pip install -q findspark

In [None]:
# Configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
# Torna o pyspark "importável"
import findspark
findspark.init()

In [None]:
# iniciar uma sessão local
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Introducao").getOrCreate()

In [None]:
# Verifica o SparkContext
print(spark)

# Exibe a Spark version
print(spark.version)

<pyspark.sql.session.SparkSession object at 0x781cf2a72ce0>
3.1.1


In [None]:
# Definir quais funcões sql pyspark vamos importar
from pyspark.sql.functions import *

# 2 - Criando Dataframes

In [None]:
# Em Pandas
'''
Nesta apostila um objetivo com sufixo pd será um objeto pandas, por
exemplo dfpd
'''
nome = ['Douglas', 'Daniela', 'Pedro', 'Maria', 'Eduardo', 'Ester']
idade = [45, 7, 65, 64, 42, 37]
altura = [1.85, 1.23, 1.75, 1.67, 1.82, 1.73]
peso = [70, 22, 87, 64, 96, 68]
sexo = ['M', 'F', 'M', 'F', 'M', 'F']

dfpd = pd.DataFrame({'nome': nome, 'idade': idade, 'altura': altura,
                     'peso': peso, 'sexo': sexo})
display(dfpd)

Unnamed: 0,nome,idade,altura,peso,sexo
0,Douglas,45,1.85,70,M
1,Daniela,7,1.23,22,F
2,Pedro,65,1.75,87,M
3,Maria,64,1.67,64,F
4,Eduardo,42,1.82,96,M
5,Ester,37,1.73,68,F


In [None]:
# Em ambinete PySpark nosso df se chamará dfps
data = [("Douglas", 45, 1.85, 70, "M"),
        ("Daniela", 7, 1.23, 22, "F"),
        ("Pedro", 65, 1.75, 87, "M"),
        ("Maria", 64, 1.67, 64, "F"),
        ("Eduardo", 42, 1.82, 96, "M"),
        ("Ester", 37, 1.73, 68, "F")]
columns = ["nome", "idade", "altura", "peso", "sexo"]
dfps = spark.createDataFrame(data, columns)
dfps.show()

+-------+-----+------+----+----+
|   nome|idade|altura|peso|sexo|
+-------+-----+------+----+----+
|Douglas|   45|  1.85|  70|   M|
|Daniela|    7|  1.23|  22|   F|
|  Pedro|   65|  1.75|  87|   M|
|  Maria|   64|  1.67|  64|   F|
|Eduardo|   42|  1.82|  96|   M|
|  Ester|   37|  1.73|  68|   F|
+-------+-----+------+----+----+



# 3 - Exibindo linhas e colunas

In [None]:
#Pandas
dfpd.shape

(6, 5)

In [None]:
# PySpark
print(f"Quantidade de linhas e colunas: {dfps.count()}, {len(dfps.columns)}")

Quantidade de linhas e colunas: 6, 5


# 4 - Exibindo aleatoriamente uma linha do DF

In [None]:
# Pandas
dfpd.sample()

Unnamed: 0,nome,idade,altura,peso,sexo
2,Pedro,65,1.75,87,M


In [None]:
# PySpark
dfps.sample(False, 0.9999999999).show(1) # (probabilidade de vazio, chance de ser sorteado novamente a mesma linha)


+-------+-----+------+----+----+
|   nome|idade|altura|peso|sexo|
+-------+-----+------+----+----+
|Douglas|   45|  1.85|  70|   M|
+-------+-----+------+----+----+
only showing top 1 row



# 5 - Exibindo Informações sobre os Tipos de Cada Atributo

In [None]:
# Pandas
dfpd.dtypes

nome       object
idade       int64
altura    float64
peso        int64
sexo       object
dtype: object

In [None]:
# Pandas
dfpd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   nome    6 non-null      object 
 1   idade   6 non-null      int64  
 2   altura  6 non-null      float64
 3   peso    6 non-null      int64  
 4   sexo    6 non-null      object 
dtypes: float64(1), int64(2), object(2)
memory usage: 368.0+ bytes


In [None]:
# PySpark
dfps.printSchema()

root
 |-- nome: string (nullable = true)
 |-- idade: long (nullable = true)
 |-- altura: double (nullable = true)
 |-- peso: long (nullable = true)
 |-- sexo: string (nullable = true)



# 6 - Exibindo Informações do Índice das Linhas

In [None]:
# Pandas
dfpd.index

RangeIndex(start=0, stop=6, step=1)

In [None]:
# PySpark
dfps.rdd.map(lambda row: row).zipWithIndex().toDF().show()

+--------------------+---+
|                  _1| _2|
+--------------------+---+
|{Douglas, 45, 1.8...|  0|
|{Daniela, 7, 1.23...|  1|
|{Pedro, 65, 1.75,...|  2|
|{Maria, 64, 1.67,...|  3|
|{Eduardo, 42, 1.8...|  4|
|{Ester, 37, 1.73,...|  5|
+--------------------+---+



# 7 - Estatísticas Descritivas

In [None]:
# Pandas
medidas = dfpd.describe()
display(medidas)

Unnamed: 0,idade,altura,peso
count,6.0,6.0,6.0
mean,43.333333,1.675,67.833333
std,21.266562,0.22731,25.61575
min,7.0,1.23,22.0
25%,38.25,1.685,65.0
50%,43.5,1.74,69.0
75%,59.25,1.8025,82.75
max,65.0,1.85,96.0


In [None]:
# PySpark
dfps.describe().show()

+-------+-------+------------------+-------------------+------------------+----+
|summary|   nome|             idade|             altura|              peso|sexo|
+-------+-------+------------------+-------------------+------------------+----+
|  count|      6|                 6|                  6|                 6|   6|
|   mean|   null|43.333333333333336|              1.675| 67.83333333333333|null|
| stddev|   null| 21.26656217320201|0.22731036052058867|25.615750363139213|null|
|    min|Daniela|                 7|               1.23|                22|   F|
|    max|  Pedro|                65|               1.85|                96|   M|
+-------+-------+------------------+-------------------+------------------+----+



#  8 - Exibindo Primeiras e Últimas Linhas

In [None]:
# Primeiras Linhas em Pandas
dfpd.head()

Unnamed: 0,nome,idade,altura,peso,sexo
0,Douglas,45,1.85,70,M
1,Daniela,7,1.23,22,F
2,Pedro,65,1.75,87,M
3,Maria,64,1.67,64,F
4,Eduardo,42,1.82,96,M


In [None]:
# Últimas linhas em Pandas
dfpd.tail()

Unnamed: 0,nome,idade,altura,peso,sexo
1,Daniela,7,1.23,22,F
2,Pedro,65,1.75,87,M
3,Maria,64,1.67,64,F
4,Eduardo,42,1.82,96,M
5,Ester,37,1.73,68,F


In [None]:
# Primeiras 5 linhas em PySpark
dfps.show(5)

+-------+-----+------+----+----+
|   nome|idade|altura|peso|sexo|
+-------+-----+------+----+----+
|Douglas|   45|  1.85|  70|   M|
|Daniela|    7|  1.23|  22|   F|
|  Pedro|   65|  1.75|  87|   M|
|  Maria|   64|  1.67|  64|   F|
|Eduardo|   42|  1.82|  96|   M|
+-------+-----+------+----+----+
only showing top 5 rows



In [None]:
# Últimas 5 linhas em PySpark
dfps.orderBy(col("idade").desc()).show(5)

+-------+-----+------+----+----+
|   nome|idade|altura|peso|sexo|
+-------+-----+------+----+----+
|  Pedro|   65|  1.75|  87|   M|
|  Maria|   64|  1.67|  64|   F|
|Douglas|   45|  1.85|  70|   M|
|Eduardo|   42|  1.82|  96|   M|
|  Ester|   37|  1.73|  68|   F|
+-------+-----+------+----+----+
only showing top 5 rows



# 9 - Exibindo Valores de uma Linha pelo Rótulo e Índice

In [None]:
# Rótulo Pandas
dfpd.loc[0]

nome      Douglas
idade          45
altura       1.85
peso           70
sexo            M
Name: 0, dtype: object

In [None]:
# Rótulo Pandas
medidas.loc['count']

idade     6.0
altura    6.0
peso      6.0
Name: count, dtype: float64

In [None]:
# Índice Pandas
dfpd.iloc[0]

nome      Douglas
idade          45
altura       1.85
peso           70
sexo            M
Name: 0, dtype: object

In [None]:
# Índice Pandas
medidas.iloc[0]

idade     6.0
altura    6.0
peso      6.0
Name: count, dtype: float64

In [None]:
# Rótulo PySpark
dfps.where(col("nome") == "Douglas").show()

+-------+-----+------+----+----+
|   nome|idade|altura|peso|sexo|
+-------+-----+------+----+----+
|Douglas|   45|  1.85|  70|   M|
+-------+-----+------+----+----+



In [None]:
# Ìndice PySpark
'''
Em PySpark não temos suporte direto à índices, pois os dados
não estão alocados em disco, mas podemos fazer ajustes técnicos (gambiarra)
como veremos mais pra frente.
'''
dfps.select(dfps.columns[0]).show()

+-------+
|   nome|
+-------+
|Douglas|
|Daniela|
|  Pedro|
|  Maria|
|Eduardo|
|  Ester|
+-------+



# 10 - Exibindo Mais de uma Linha pelo Rótulo e Índice

In [None]:
# Mais de uma linha pelo rótulo Pandas
dfpd.loc[[0, 3, 5]]

Unnamed: 0,nome,idade,altura,peso,sexo
0,Douglas,45,1.85,70,M
3,Maria,64,1.67,64,F
5,Ester,37,1.73,68,F


In [None]:
# Mais de uma linha pelo índice Pandas
dfpd.iloc[[0, 3, 5]]

Unnamed: 0,nome,idade,altura,peso,sexo
0,Douglas,45,1.85,70,M
3,Maria,64,1.67,64,F
5,Ester,37,1.73,68,F


In [None]:
# Mais de uma linha pelo rótulo PySpark
dfps.filter(col("nome").isin("Douglas", "Maria", "Ester")).show()

+-------+-----+------+----+----+------------------+
|   nome|idade|altura|peso|sexo|               imc|
+-------+-----+------+----+----+------------------+
|Douglas|   45|  1.85|  70|   M| 20.45288531775018|
|  Maria|   64|  1.67|  64|   F|22.948115744558788|
|  Ester|   37|  1.73|  68|   F|22.720438370810918|
+-------+-----+------+----+----+------------------+



In [None]:
# Mais de uma linha pelo índice PySpark
'''
Na linha a seguir vamos utilizar a função collect do PySpark para
organizar todoo df em linhas em um objeto rdd
'''
linhas = dfps.collect()
'''
Na linha a seguir vamos pegar esse objeto rdd e iterar (compreensão de lista)
sobre ele para selecionar apenas as linhas 0, 3 e 5 do objeto rdd e armazená-las
em um objeto chamado linhas selecionadas
'''
linhas_selecionadas = [linhas[i] for i in [0, 3, 5]]
'''
Na linha a seguir vamos transformar o objeto linhas selecionadas em um df
pyspark chamado df_ps_linhas_selecionadas
'''
df_ps_linhas_selecionadas = spark.createDataFrame(linhas_selecionadas, dfps.schema)
'''
Agora é só exibí-lo
'''
df_ps_linhas_selecionadas.show()

+-------+-----+------+----+----+
|   nome|idade|altura|peso|sexo|
+-------+-----+------+----+----+
|Douglas|   45|  1.85|  70|   M|
|  Maria|   64|  1.67|  64|   F|
|  Ester|   37|  1.73|  68|   F|
+-------+-----+------+----+----+



# 11 - Fatiando Colunas pelo Rótulo e Índice

In [None]:
# Rótulo Pandas
dfpd.loc[:, ['nome']]

Unnamed: 0,nome
0,Douglas
1,Daniela
2,Pedro
3,Maria
4,Eduardo
5,Ester


In [None]:
# ìndice Pandas
dfpd.iloc[:, [0]]

Unnamed: 0,nome
0,Douglas
1,Daniela
2,Pedro
3,Maria
4,Eduardo
5,Ester


In [None]:
# Rótulo PySpark
dfps.select("nome").show()

+-------+
|   nome|
+-------+
|Douglas|
|Daniela|
|  Pedro|
|  Maria|
|Eduardo|
|  Ester|
+-------+



In [None]:
# Índice PySpark
dfps.select(dfps.columns[0]).show()

+-------+
|   nome|
+-------+
|Douglas|
|Daniela|
|  Pedro|
|  Maria|
|Eduardo|
|  Ester|
+-------+



# 12 - Exibindo Linhas Específicas de uma Coluna pelo Rótulo e Índice

In [None]:
# Rótulo Pandas
dfpd.loc[[0, 2, 4], ['nome']]

Unnamed: 0,nome
0,Douglas
2,Pedro
4,Eduardo


In [None]:
# Índice Pandas
dfpd.iloc[[0, 2, 4], [0]]

Unnamed: 0,nome
0,Douglas
2,Pedro
4,Eduardo


In [None]:
# Rótulo PySpark
dfps.filter(col("nome").isin("Douglas", "Pedro", "Eduardo")).select("nome").show()

+-------+
|   nome|
+-------+
|Douglas|
|  Pedro|
|Eduardo|
+-------+



13 - Agrupamentos e Cálculos Estatísticos

In [None]:
# Pandas
dfpd.groupby('sexo').count()

Unnamed: 0_level_0,nome,idade,altura,peso
sexo,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
F,3,3,3,3
M,3,3,3,3


In [None]:
# Pandas
dfpd[['sexo', 'idade']].groupby('sexo').mean()

Unnamed: 0_level_0,idade
sexo,Unnamed: 1_level_1
F,36.0
M,50.666667


In [None]:
# PySpark
dfps.groupBy("sexo").count().show()

+----+-----+
|sexo|count|
+----+-----+
|   F|    3|
|   M|    3|
+----+-----+



In [None]:
# PySpark
'''
objeto.groupby('coluna_dimensão').agg(tipo_de_agregação('coluna_métrica').alias('nome_a_exibir')).show()
'''
dfps.groupBy("sexo").agg(avg("idade").alias("idade_media")).show()

+----+------------------+
|sexo|       idade_media|
+----+------------------+
|   F|              36.0|
|   M|50.666666666666664|
+----+------------------+



In [None]:
# PySpark
'''
objeto.groupby('coluna_dimensão').agg(tipo_de_agregação('coluna_métrica').alias('nome_a_exibir')).show()
'''
dfps.groupBy("sexo").agg(stddev("idade").alias("idade_media")).show()

+----+------------------+
|sexo|       idade_media|
+----+------------------+
|   F|28.513154858766505|
|   M|12.503332889007368|
+----+------------------+



In [None]:
# PySpark
'''
objeto.groupby('coluna_dimensão').agg(tipo_de_agregação('coluna_métrica').alias('nome_a_exibir')).show()
'''
dfps.groupBy("sexo").agg(min("idade").alias("idade_media")).show()

+----+-----------+
|sexo|idade_media|
+----+-----------+
|   F|          7|
|   M|         42|
+----+-----------+



In [None]:
# PySpark
'''
objeto.groupby('coluna_dimensão').agg(tipo_de_agregação('coluna_métrica').alias('nome_a_exibir')).show()
'''
dfps.groupBy("sexo").agg(max("idade").alias("idade_media")).show()

+----+-----------+
|sexo|idade_media|
+----+-----------+
|   F|         64|
|   M|         65|
+----+-----------+



# 14 - Importando arquivos

In [None]:
from google.colab import files
files.upload()

In [None]:
arquivo = "flights_small.csv"
flights = spark\
        .read.format("csv")\
        .option("inferSchema", "True")\
        .option("header", "True")\
        .csv(arquivo)

In [None]:
#Verificando o shape do pyspark dataframe
print((flights.count(), len(flights.columns)))

(10000, 16)


In [None]:
# Exibindo as 5 primeiras linhas
flights.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

In [None]:
# Exibindo o nome das colunas e seus respectivos tipos
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [None]:
#Retirando as datas e passando colunas para tipos corretos.
'''
Neste bloco vamos criar uma nova coluna com os valores de uma coluna em tipo
diferente, e vamos excluir a coluna com o tipo errado

Resumo:
withColumn("Nome da nova coluna", col("nome da coluna antiga"))
cast(tipo de dado a ser aplicado na nova coluna)
drop(coluna a ser excluída)
'''
flights = flights.\
        withColumn("new_air_time", col("air_time").cast("integer")).drop("air_time")

In [None]:
# Exibindo o nome das colunas e seus respectivos tipos
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- new_air_time: integer (nullable = true)



In [None]:
#renomeando colunas
'''
Renomeando uma coluna
objeto.withColumnRenamed("nome da coluna atual", "nome da nova coluna")
'''
flights = flights.withColumnRenamed("new_air_time","air_time")

In [None]:
# Exibindo o nome das colunas e seus respectivos tipos
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- air_time: integer (nullable = true)



In [None]:
'''
A Criação da view temporária é necessária para que as consultas sql posteriores
funcionem
'''
#Registrando o dataframe em uma view temporária
flights.createOrReplaceTempView("flights")

query = "FROM flights SELECT * LIMIT 10"

# Selecionando as 10 primeiras linhas do dataset
flights10 = spark.sql(query)

# Print o resultado
flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|
|2014|    1| 15|    1037|        7|    1

In [None]:
'''
Para executar essa query a view temporária do bloco anteiror deve ter sido
executada
'''
# Criando um df a partir de uma consulta em outro df
sqlDF = spark.sql("SELECT * FROM flights LIMIT 10")
sqlDF.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|
|2014|    1| 15|    1037|        7|    1

In [None]:
# Criando um df a partir de uma consulta em outro df
sqlDF2 = spark.sql("SELECT arr_time FROM flights LIMIT 5")
sqlDF2.show()

+--------+
|arr_time|
+--------+
|     935|
|    1505|
|    1652|
|    1839|
|    1015|
+--------+



In [None]:
# Convertendo o resultado para pandas
flightspd = flights.toPandas()
display(flightspd)

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,distance,hour,minute,air_time
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,954,6,58,132.0
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,2677,10,40,360.0
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,679,14,43,111.0
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,569,17,5,83.0
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,937,7,54,127.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,2014,6,23,1806,-4,2104,-6,OO,N225AG,3458,SEA,SLC,689,18,6,89.0
9996,2014,8,31,2336,11,452,-13,AA,N3LEAA,1230,SEA,DFW,1660,23,36,178.0
9997,2014,8,8,904,-1,1042,-5,AS,N523AS,360,SEA,SMF,605,9,4,81.0
9998,2014,8,29,1441,26,1820,10,WN,N8647A,2857,SEA,ABQ,1180,14,41,133.0


In [None]:
# Criando pandas dataframe
arq = "airports.csv"
pd_temp = pd.read_csv(arq)
display(pd_temp)

Unnamed: 0,faa,name,lat,lon,alt,tz,dst
0,04G,Lansdowne Airport,41.130472,-80.619583,1044,-5,A
1,06A,Moton Field Municipal Airport,32.460572,-85.680028,264,-5,A
2,06C,Schaumburg Regional,41.989341,-88.101243,801,-6,A
3,06N,Randall Airport,41.431912,-74.391561,523,-5,A
4,09J,Jekyll Island Airport,31.074472,-81.427778,11,-4,A
...,...,...,...,...,...,...,...
1392,ZUN,Black Rock,35.083228,-108.791778,6454,-7,A
1393,ZVE,New Haven Rail Station,41.298669,-72.925992,7,-5,A
1394,ZWI,Wilmington Amtrak Station,39.736667,-75.551667,0,-5,A
1395,ZWU,Washington Union Station,38.897460,-77.006430,76,-5,A


In [None]:
# Cria spark_temp a partir de pd_temp
spark_temp = spark.createDataFrame(pd_temp.to_dict('records'))
spark_temp.show(5)

+----+---+---+----------+-----------+--------------------+---+
| alt|dst|faa|       lat|        lon|                name| tz|
+----+---+---+----------+-----------+--------------------+---+
|1044|  A|04G|41.1304722|-80.6195833|   Lansdowne Airport| -5|
| 264|  A|06A|32.4605722|-85.6800278|Moton Field Munic...| -5|
| 801|  A|06C|41.9893408|-88.1012428| Schaumburg Regional| -6|
| 523|  A|06N| 41.431912|-74.3915611|     Randall Airport| -5|
|  11|  A|09J|31.0744722|-81.4277778|Jekyll Island Air...| -4|
+----+---+---+----------+-----------+--------------------+---+
only showing top 5 rows



In [None]:
# Operações matemáticas com dados numéricos de colunas
'''
Neste bloco estamos fazendo uma pré-visualização de como ficaria uma eventual
nova coluna de duração do tempo de voo em minutos, no entanto da forma como o
código foi construído esta coluna não fica inclusa no df pysapark original
'''
flights.select(flights.air_time/60).show()

+------------------+
|   (air_time / 60)|
+------------------+
|               2.2|
|               6.0|
|              1.85|
|1.3833333333333333|
|2.1166666666666667|
|2.0166666666666666|
|               1.5|
|1.6333333333333333|
|              2.25|
|               3.3|
|2.1666666666666665|
| 2.566666666666667|
|2.1166666666666667|
|              3.05|
|              2.15|
|               1.5|
|1.2666666666666666|
|               3.6|
| 4.833333333333333|
|              1.85|
+------------------+
only showing top 20 rows



In [None]:
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|
|2014|    1| 15|    1037|        7|    1

In [None]:
'''
Neste bloco estamos fazendo uma inseção de uma nova coluna de duração do tempo
de voo em minutos
'''
flights = flights.withColumn("duration_hrs", flights.air_time/60)
flights.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|1.3833333333333333|
|2014|    3|  9|     754|  

In [None]:
dfps.show()

+-------+-----+------+----+----+
|   nome|idade|altura|peso|sexo|
+-------+-----+------+----+----+
|Douglas|   45|  1.85|  70|   M|
|Daniela|    7|  1.23|  22|   F|
|  Pedro|   65|  1.75|  87|   M|
|  Maria|   64|  1.67|  64|   F|
|Eduardo|   42|  1.82|  96|   M|
|  Ester|   37|  1.73|  68|   F|
+-------+-----+------+----+----+



In [None]:
dfps.printSchema()

root
 |-- nome: string (nullable = true)
 |-- idade: long (nullable = true)
 |-- altura: double (nullable = true)
 |-- peso: long (nullable = true)
 |-- sexo: string (nullable = true)



In [None]:
dfps = dfps.withColumn("imc", dfps.peso/(dfps.altura**2))
dfps.show()

+-------+-----+------+----+----+------------------+
|   nome|idade|altura|peso|sexo|               imc|
+-------+-----+------+----+----+------------------+
|Douglas|   45|  1.85|  70|   M| 20.45288531775018|
|Daniela|    7|  1.23|  22|   F|14.541608830722454|
|  Pedro|   65|  1.75|  87|   M|28.408163265306122|
|  Maria|   64|  1.67|  64|   F|22.948115744558788|
|Eduardo|   42|  1.82|  96|   M|28.982007003985025|
|  Ester|   37|  1.73|  68|   F|22.720438370810918|
+-------+-----+------+----+----+------------------+



In [None]:
# Criando um novo df pyspar com apenas algumas colunas a partir do original
sub = dfps.select("nome","idade","imc")
sub.show()

+-------+-----+------------------+
|   nome|idade|               imc|
+-------+-----+------------------+
|Douglas|   45| 20.45288531775018|
|Daniela|    7|14.541608830722454|
|  Pedro|   65|28.408163265306122|
|  Maria|   64|22.948115744558788|
|Eduardo|   42|28.982007003985025|
|  Ester|   37|22.720438370810918|
+-------+-----+------------------+



In [None]:
# Filtrando todos os voos com duração de mais de 120 minutos
sub2 = flights.filter("air_time > 120")
sub2.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|               6.0|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|2.1166666666666667|
|2014|    1| 15|    1037|        7|    1352|        2|     WN| N646SW|    48|   PDX| DEN|     991|  10|    37|     121|2.0166666666666666|
|2014|    4| 19|    1236|  

In [None]:
# Filtrando todos os voos com duração de mais de 120 minutos
'''
Aqui temos uma alternativa em relação ao filtro anterior para que não seja
preciso utilizar aspas no argumento da função filter
'''
sub3 = flights.filter(flights.air_time > 120)
sub3.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|               6.0|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|2.1166666666666667|
|2014|    1| 15|    1037|        7|    1352|        2|     WN| N646SW|    48|   PDX| DEN|     991|  10|    37|     121|2.0166666666666666|
|2014|    4| 19|    1236|  

In [None]:
'''
Neste bloco vamos criar um novo df spark com as colunas tailnum
(número da aeronave), origin e dest
'''
voos = ["tailnum","origin","dest"]
voos = flights.select(voos)

'''
Agora vamos criar dois objetos que são apenas filtros que não herdam as funções
de um df, mas serão aplicados no df voos criando no bloco anterior
'''
filterA = flights.origin == "SEA"

filterB = flights.dest == "HNL"

'''
Agora vamos aplicar os filtros de origem e destino na tabela voos
'''
voos = voos.filter(filterA).filter(filterB)
voos.show(10)

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N559AS|   SEA| HNL|
| N597AS|   SEA| HNL|
| N396HA|   SEA| HNL|
| N531AS|   SEA| HNL|
| N592AS|   SEA| HNL|
| N590NW|   SEA| HNL|
| N386HA|   SEA| HNL|
| N569AS|   SEA| HNL|
| N589AS|   SEA| HNL|
| N393HA|   SEA| HNL|
+-------+------+----+
only showing top 10 rows



In [None]:
# Achar a maior tempo de voo de SEA para outras cidades em minutos
flights.filter(flights.origin == "SEA").groupBy().max("air_time").show()

+-------------+
|max(air_time)|
+-------------+
|          409|
+-------------+



In [None]:
# Achar a maior tempo de voo de SEA para outras cidades em horas
flights.filter(flights.origin == "SEA").groupBy().max("duration_hrs").show()

+-----------------+
|max(duration_hrs)|
+-----------------+
|6.816666666666666|
+-----------------+



In [None]:
# Achar o tempo médio de voo de SEA para outras cidades em minutos
flights.filter(flights.origin == "SEA").groupBy().avg("air_time").show()

+-----------------+
|    avg(air_time)|
+-----------------+
|160.4361496051259|
+-----------------+



In [None]:
# Achar o desvio padrão de voo de SEA para outras cidades em minutos
flights.filter(flights.origin == "SEA").groupBy().agg({"air_time": "stddev"}).show()

+-----------------+
| stddev(air_time)|
+-----------------+
|71.48445621179845|
+-----------------+



In [None]:
# Achar a menor distancia do voo de PDX para outras cidades
'''
Jeito informal de agregação em pyspark
'''
flights.filter(flights.origin == "PDX").groupBy().min("distance").show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+



In [None]:
'''
Jeito formal de agregação em pyspark
'''
# Achar a menor distancia do voo de PDX para outras cidades
flights.filter(flights.origin == "PDX").groupBy().agg({"distance": "min"}).show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+



In [None]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F
# GroupBy por Mes e destino
'''
Taanto em Pandas como em PySpark um agrupamento sem agregação não gera
visualização, assim, além de agruparmos por uma dimensão, precisão agragar por
alguma métrica
'''
by_month_dest = flights.groupBy("month", "dest")
by_month_dest.agg(F.avg("dep_delay")).show()

+-----+----+-------------------+
|month|dest|     avg(dep_delay)|
+-----+----+-------------------+
|    4| PHX| 1.6833333333333333|
|    1| RDM|             -1.625|
|    5| ONT| 3.5555555555555554|
|    7| OMA|               -6.5|
|    8| MDW|               7.45|
|    6| DEN|  5.418181818181818|
|    5| IAD|               -4.0|
|   12| COS|               -1.0|
|   11| ANC|  7.529411764705882|
|    5| AUS|              -0.75|
|    5| COS| 11.666666666666666|
|    2| PSP|                0.6|
|    4| ORD|0.14285714285714285|
|   10| DFW| 18.176470588235293|
|   10| DCA|               -1.5|
|    8| JNU|             18.125|
|   11| KOA|               -1.0|
|   10| OMA|-0.6666666666666666|
|    6| ONT|              9.625|
|    3| MSP|                3.2|
+-----+----+-------------------+
only showing top 20 rows



In [None]:
arquivo = "airports.csv"
airports = spark\
        .read.format("csv")\
        .option("inferSchema", "True")\
        .option("header", "True")\
        .csv(arquivo)

In [None]:
# Examine the data
airports.show(10)

+---+--------------------+----------+------------+----+---+---+
|faa|                name|       lat|         lon| alt| tz|dst|
+---+--------------------+----------+------------+----+---+---+
|04G|   Lansdowne Airport|41.1304722| -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722| -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408| -88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912| -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722| -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|36.3712222| -82.1734167|1593| -4|  A|
|0G6|Williams County A...|41.4673056| -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|42.8835647| -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|39.7948244| -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|48.0538086|-122.8106436| 108| -8|  A|
+---+--------------------+----------+------------+----+---+---+
only showing top 10 rows



In [None]:
# Renomeie a coluna faa
airports = airports.withColumnRenamed("faa", "dest")
airports.show()

+----+--------------------+----------------+-----------------+----+---+---+
|dest|                name|             lat|              lon| alt| tz|dst|
+----+--------------------+----------------+-----------------+----+---+---+
| 04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
| 06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
| 06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
| 06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
| 09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
| 0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
| 0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
| 0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
| 0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
| 0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
| 0W3|Harfor

In [None]:
flights.show(10)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|      duration_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|1.3833333333333333|
|2014|    3|  9|     754|  

In [None]:
# Join os DataFrames
flights_with_airports = flights.join(airports, on="dest", how="leftouter")
flights_with_airports.show()

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+----+------+--------+------------------+--------------------+---------+-----------+----+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|distance|hour|minute|air_time|      duration_hrs|                name|      lat|        lon| alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+----+------+--------+------------------+--------------------+---------+-----------+----+---+---+
| LAX|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA|     954|   6|    58|     132|               2.2|    Los Angeles Intl|33.942536|-118.408075| 126| -8|  A|
| HNL|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA|    2677|  10|    40|     360|               6.0|       Honolulu Intl|21.318681|-157.922428|  13|-10|  N|
| SFO|2014|    3|  9|    

In [None]:
from google.colab import files
files.upload()

Saving base - base.csv to base - base.csv


{'base - base.csv': b'matricula,nome,cidade,estado,pais,idade,departamento,cargo,salario,escolaridade,nota\r\n1,Lucas,Atibaia,SP,Brasil,35,Compras,Gerente,25000,Superior,8\r\n2,Ana,S\xc3\xa3o Paulo,SP,Brasil,29,Vendas,Coordenador,12000,Superior,6\r\n3,Luiza,Santos,SP,Brasil,38,Finan\xc3\xa7as,Gerente,28000,MBA,9\r\n4,Fernando,Atibaia,SP,Brasil,36,Marketing,Diretor,40000,Mestrado,7\r\n5,Sandra,Atibaia,SP,Brasil,28,Produ\xc3\xa7\xc3\xa3o,Analista,23000,Superior,5\r\n6,Douglas,Bragan\xc3\xa7a,SP,Brasil,29,Finan\xc3\xa7as,Analista,11000,Superior,9\r\n7,Eduardo,Extrema,MG,Brasil,30,Marketing,Gerente,12000,MBA,4\r\n8,Ester,Itapeva,MG,Brasil,29,Compras,Analista,10000,Superior,2\r\n9,Pedro,Extrema,MG,Brasil,30,Marketing,Analista,13000,Superior,1\r\n10,Maria,Extrema,MG,Brasil,40,Produ\xc3\xa7\xc3\xa3o,Analista,12000,MBA,7'}

In [None]:
arquivo = "base - base.csv"
rh = spark\
        .read.format("csv")\
        .option("inferSchema", "True")\
        .option("header", "True")\
        .csv(arquivo)

In [None]:
rh.show()

+---------+--------+---------+------+------+-----+------------+-----------+-------+------------+----+
|matricula|    nome|   cidade|estado|  pais|idade|departamento|      cargo|salario|escolaridade|nota|
+---------+--------+---------+------+------+-----+------------+-----------+-------+------------+----+
|        1|   Lucas|  Atibaia|    SP|Brasil|   35|     Compras|    Gerente|  25000|    Superior|   8|
|        2|     Ana|São Paulo|    SP|Brasil|   29|      Vendas|Coordenador|  12000|    Superior|   6|
|        3|   Luiza|   Santos|    SP|Brasil|   38|    Finanças|    Gerente|  28000|         MBA|   9|
|        4|Fernando|  Atibaia|    SP|Brasil|   36|   Marketing|    Diretor|  40000|    Mestrado|   7|
|        5|  Sandra|  Atibaia|    SP|Brasil|   28|    Produção|   Analista|  23000|    Superior|   5|
|        6| Douglas| Bragança|    SP|Brasil|   29|    Finanças|   Analista|  11000|    Superior|   9|
|        7| Eduardo|  Extrema|    MG|Brasil|   30|   Marketing|    Gerente|  12000

In [None]:
'''
A Criação da view temporária é necessária para que as consultas sql posteriores
funcionem
'''
#Registrando o dataframe em uma view temporária
rh.createOrReplaceTempView("rh")

query = "FROM rh SELECT *"

# Selecionando as 10 primeiras linhas do dataset
rh1 = spark.sql(query)

# Print o resultado
rh1.show()

+---------+--------+---------+------+------+-----+------------+-----------+-------+------------+----+
|matricula|    nome|   cidade|estado|  pais|idade|departamento|      cargo|salario|escolaridade|nota|
+---------+--------+---------+------+------+-----+------------+-----------+-------+------------+----+
|        1|   Lucas|  Atibaia|    SP|Brasil|   35|     Compras|    Gerente|  25000|    Superior|   8|
|        2|     Ana|São Paulo|    SP|Brasil|   29|      Vendas|Coordenador|  12000|    Superior|   6|
|        3|   Luiza|   Santos|    SP|Brasil|   38|    Finanças|    Gerente|  28000|         MBA|   9|
|        4|Fernando|  Atibaia|    SP|Brasil|   36|   Marketing|    Diretor|  40000|    Mestrado|   7|
|        5|  Sandra|  Atibaia|    SP|Brasil|   28|    Produção|   Analista|  23000|    Superior|   5|
|        6| Douglas| Bragança|    SP|Brasil|   29|    Finanças|   Analista|  11000|    Superior|   9|
|        7| Eduardo|  Extrema|    MG|Brasil|   30|   Marketing|    Gerente|  12000

In [None]:
sqlrh1 = spark.sql('SELECT cargo, AVG(salario) FROM rh GROUP BY cargo')
sqlrh1.show()

+-----------+------------------+
|      cargo|      avg(salario)|
+-----------+------------------+
|    Gerente|21666.666666666668|
|Coordenador|           12000.0|
|    Diretor|           40000.0|
|   Analista|           13800.0|
+-----------+------------------+



In [None]:
# Configurar a sessão Spark com o pacote spark-excel
spark = SparkSession.builder \
    .appName("Ler XLSX com PySpark") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7") \
    .getOrCreate()

# Caminho para o arquivo Excel
file_path = "/caminho/para/seu/arquivo.xlsx"

# Ler o arquivo Excel
df = spark.read.format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("dataAddress", "'Planilha1'!A1") \
    .load(file_path)

# Exibir as primeiras linhas do DataFrame
df.show()