## Basic Imports

In [3]:
from pyspark.sql import SparkSession

## Build our Spark Session

A Spark Session pode ser definida como nosso mediador entre nossa spark application e as APIs e Libraries do Apache Spark, a mesma é responsável por unificar os diferentes contextos dentro do Spark em uma única abstração, facilitando em muito nosso trabalho

In [15]:
spark = (SparkSession.builder
                     .appName("My First PySpark Application")
                     .master("local[2]") 
                     .getOrCreate())

In [13]:
circuits_df = (spark.read
                    .option("header", "True")
                    .option("delimiter", ",")
                    .csv("data/circuits.csv"))

In [17]:
circuits_df.printSchema()

root
 |-- circuitId: string (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- alt: string (nullable = true)
 |-- url: string (nullable = true)



In [23]:
from pyspark.sql.functions import col

select_with_select_method_df = circuits_df.select(
                                                col("circuitId").alias("ID_CIRCUITO"), 
                                                col("circuitRef").alias("NOME_CIRCUITO"), 
                                                col("country").alias("PAIS")
                                            )
select_with_select_method_df.show()

+-----------+--------------+---------+
|ID_CIRCUITO| NOME_CIRCUITO|     PAIS|
+-----------+--------------+---------+
|          1|   albert_park|Australia|
|          2|        sepang| Malaysia|
|          3|       bahrain|  Bahrain|
|          4|     catalunya|    Spain|
|          5|      istanbul|   Turkey|
|          6|        monaco|   Monaco|
|          7|    villeneuve|   Canada|
|          8|   magny_cours|   France|
|          9|   silverstone|       UK|
|         10|hockenheimring|  Germany|
|         11|   hungaroring|  Hungary|
|         12|      valencia|    Spain|
|         13|           spa|  Belgium|
|         14|         monza|    Italy|
|         15|    marina_bay|Singapore|
|         16|          fuji|    Japan|
|         17|      shanghai|    China|
|         18|    interlagos|   Brazil|
|         19|  indianapolis|      USA|
|         20|   nurburgring|  Germany|
+-----------+--------------+---------+
only showing top 20 rows



In [24]:
select_with_select_expr_method_df = circuits_df.selectExpr(
                                                "circuitId AS ID_CIRCUITO", 
                                                "circuitRef AS NOME_CIRCUITO", 
                                                "country AS PAIS"
                                            )
select_with_select_expr_method_df.show()

+-----------+--------------+---------+
|ID_CIRCUITO| NOME_CIRCUITO|     PAIS|
+-----------+--------------+---------+
|          1|   albert_park|Australia|
|          2|        sepang| Malaysia|
|          3|       bahrain|  Bahrain|
|          4|     catalunya|    Spain|
|          5|      istanbul|   Turkey|
|          6|        monaco|   Monaco|
|          7|    villeneuve|   Canada|
|          8|   magny_cours|   France|
|          9|   silverstone|       UK|
|         10|hockenheimring|  Germany|
|         11|   hungaroring|  Hungary|
|         12|      valencia|    Spain|
|         13|           spa|  Belgium|
|         14|         monza|    Italy|
|         15|    marina_bay|Singapore|
|         16|          fuji|    Japan|
|         17|      shanghai|    China|
|         18|    interlagos|   Brazil|
|         19|  indianapolis|      USA|
|         20|   nurburgring|  Germany|
+-----------+--------------+---------+
only showing top 20 rows



In [25]:
circuits_df.createOrReplaceTempView("VW_VIEW_SELECT_MODE")

view_select_mode_df = spark.sql("""
        SELECT V.circuitId AS ID_CIRCUITO, 
               V.circuitRef AS NOME_CIRCUITO, 
               V.country AS PAIS 
          FROM VW_VIEW_SELECT_MODE V
""")

view_select_mode_df.show()

+-----------+--------------+---------+
|ID_CIRCUITO| NOME_CIRCUITO|     PAIS|
+-----------+--------------+---------+
|          1|   albert_park|Australia|
|          2|        sepang| Malaysia|
|          3|       bahrain|  Bahrain|
|          4|     catalunya|    Spain|
|          5|      istanbul|   Turkey|
|          6|        monaco|   Monaco|
|          7|    villeneuve|   Canada|
|          8|   magny_cours|   France|
|          9|   silverstone|       UK|
|         10|hockenheimring|  Germany|
|         11|   hungaroring|  Hungary|
|         12|      valencia|    Spain|
|         13|           spa|  Belgium|
|         14|         monza|    Italy|
|         15|    marina_bay|Singapore|
|         16|          fuji|    Japan|
|         17|      shanghai|    China|
|         18|    interlagos|   Brazil|
|         19|  indianapolis|      USA|
|         20|   nurburgring|  Germany|
+-----------+--------------+---------+
only showing top 20 rows



## Filtragem de Dados com a Cláusula WHERE

## Neste exemplo filtraremos apenas os circuitos presentes no Brasil

In [26]:
from pyspark.sql.functions import col, lit

select_with_select_method_df = circuits_df.select(
                                                col("circuitId").alias("ID_CIRCUITO"), 
                                                col("circuitRef").alias("NOME_CIRCUITO"), 
                                                col("country").alias("PAIS")
                                            ).where(col("PAIS") == lit("Brazil"))
select_with_select_method_df.show()

+-----------+-------------+------+
|ID_CIRCUITO|NOME_CIRCUITO|  PAIS|
+-----------+-------------+------+
|         18|   interlagos|Brazil|
|         36|  jacarepagua|Brazil|
+-----------+-------------+------+



## Aqui temos a mesma operação utilizando Temporary Views

In [27]:
circuits_df.createOrReplaceTempView("VW_VIEW_SELECT_MODE_WHERE_CLAUSE")

view_select_mode_df = spark.sql("""
        SELECT V.circuitId AS ID_CIRCUITO, 
               V.circuitRef AS NOME_CIRCUITO, 
               V.country AS PAIS 
          FROM VW_VIEW_SELECT_MODE_WHERE_CLAUSE V
         WHERE V.country = "Brazil"
""")

view_select_mode_df.show()

+-----------+-------------+------+
|ID_CIRCUITO|NOME_CIRCUITO|  PAIS|
+-----------+-------------+------+
|         18|   interlagos|Brazil|
|         36|  jacarepagua|Brazil|
+-----------+-------------+------+



## Colocando em Prática

Para nosso exercício prático, utilizaremos um arquivo chamado drivers.csv, este arquivo contém dados relacionados aos pilotos do grid de Fórmula 1, desde do início do esporte. Com base nisso nosso exercíco consistirá nos seguintes passos:

- Criar uma Spark Session para nossa aplicação chamada "Lendo Pilotos App"
- Ler o arquivo drivers.csv e carregá-lo em um DataFrame
- Selecionar apenas as colunas code, forename, surname e natinonality, renomeando as colunas da seguinte maneira code -> codigo, forename -> nome, surname -> sobrenome, nationality -> nacionalidade
- Filtrar apenas os pilotos cuja a nacionalidade seja Brazilian