# PySpark #03

Link da Aula: [PySpark - Aula 03 - Union / Joins / When - Otherwise / Collect](https://www.youtube.com/watch?v=EoI3XwxCkfI)

**Indice:**

- [Importacão bibliotecas/funcões](#importacão-bibliotecasfuncões)
- [Iniciando sessão PySpark](#iniciando-sessão-pyspark)
- [Criar DF/ler arquivo](#criar-dfler-arquivo)
- [Alteracões Aula PySpark 01](#alteracões-aula-pyspark-01)
- [Alteracões Aula PySpark 02](#alteracões-aula-pyspark-02)
- [Distinct()](#distinct)
- [Collect()](#collect)
- [When() / Otherwise()](#when--otherwise)
- [Union (Concat)](#union-concat)
- [Preparando dados para fazer os Joins](#preparando-dados-para-fazer-os-joins)
- [Join - Simples](#join---simples)
- [Inner Join](#inner-join)
- [Left Join](#left-join)
- [Right Join](#right-join)
- [Full Join](#full-join)
- [Semi Join](#semi-join)
- [Anti Join](#anti-join)

##### Importacão bibliotecas/funcões

In [1]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

##### Iniciando sessão PySpark

In [2]:
spark = (
    SparkSession.builder
    .master('local')
    .appName('PySpark_03')
    .getOrCreate()
)

##### Criar DF/ler arquivo

In [3]:
df = spark.read.csv("wc2018-players.csv", header=True, inferSchema=True)

##### Alteracões Aula PySpark 01

In [4]:
#Renomeando colunas
df = df.withColumnRenamed('Team','Selecao').withColumnRenamed('#','Numero')\
.withColumnRenamed('Pos.','Posicao').withColumnRenamed('FIFA Popular Name','Nome_FIFA')\
.withColumnRenamed('Birth Date','Nascimento')\
.withColumnRenamed('Shirt Name','Nome Camiseta').withColumnRenamed('Club','Time')\
.withColumnRenamed('Height','Altura').withColumnRenamed('Weight','Peso')

#Extraindo dia, mes e ano
dia = udf(lambda data: data.split('.')[0])
mes = udf(lambda mes: mes.split('.')[1])
ano = udf(lambda ano: ano.split('.')[2])

#criando colunas com base nas funcoes, dia, mes e ano
df = df.withColumn('Dia', dia('Nascimento')).withColumn('Mes', mes('Nascimento')).withColumn('Ano', ano('Nascimento'))

#Criando coluna com dia, mes e ano concatenados e transofrmando no tipo data
df = df.withColumn('Data_Nascimento', concat_ws('-', 'Ano', 'Mes', 'Dia').cast(DateType()))

##### Alteracões Aula PySpark 02

In [5]:
#Removendo colunas
df = df.drop('Nascimento', 'Nome_FIFA')

##### Distinct()

Retorna valores distintos

In [6]:
df.select(col('Selecao')).distinct().show(10)

+---------+
|  Selecao|
+---------+
|   Russia|
|  Senegal|
|   Sweden|
|  IR Iran|
|  Germany|
|   France|
|Argentina|
|  Belgium|
|     Peru|
|  Croatia|
+---------+
only showing top 10 rows



##### Collect()

Retorna as linhas distintas em forma de lista

In [7]:
df.select(col('Selecao')).distinct().collect()

[Row(Selecao='Russia'),
 Row(Selecao='Senegal'),
 Row(Selecao='Sweden'),
 Row(Selecao='IR Iran'),
 Row(Selecao='Germany'),
 Row(Selecao='France'),
 Row(Selecao='Argentina'),
 Row(Selecao='Belgium'),
 Row(Selecao='Peru'),
 Row(Selecao='Croatia'),
 Row(Selecao='Nigeria'),
 Row(Selecao='Korea Republic'),
 Row(Selecao='Spain'),
 Row(Selecao='Denmark'),
 Row(Selecao='Morocco'),
 Row(Selecao='Panama'),
 Row(Selecao='Iceland'),
 Row(Selecao='Uruguay'),
 Row(Selecao='Mexico'),
 Row(Selecao='Tunisia'),
 Row(Selecao='Saudi Arabia'),
 Row(Selecao='Switzerland'),
 Row(Selecao='Brazil'),
 Row(Selecao='Japan'),
 Row(Selecao='England'),
 Row(Selecao='Poland'),
 Row(Selecao='Portugal'),
 Row(Selecao='Australia'),
 Row(Selecao='Costa Rica'),
 Row(Selecao='Egypt'),
 Row(Selecao='Serbia'),
 Row(Selecao='Colombia')]

In [8]:
#Adicionado valores do collect em uma lista
lista = df.select(col('Selecao')).distinct().collect()
paises = []

for pais in lista:
    paises.append(pais[0])
    
print(paises)

['Russia', 'Senegal', 'Sweden', 'IR Iran', 'Germany', 'France', 'Argentina', 'Belgium', 'Peru', 'Croatia', 'Nigeria', 'Korea Republic', 'Spain', 'Denmark', 'Morocco', 'Panama', 'Iceland', 'Uruguay', 'Mexico', 'Tunisia', 'Saudi Arabia', 'Switzerland', 'Brazil', 'Japan', 'England', 'Poland', 'Portugal', 'Australia', 'Costa Rica', 'Egypt', 'Serbia', 'Colombia']


##### When() / Otherwise()

Básicamente é o If/Else do pySpark

In [9]:
#When e otherwise, com 1 condicao
df.withColumn('Coluna_Nova', when(col('Selecao') == "Argentina", " Argentinos").otherwise("Não Argentinos")).show(5)

+---------+------+-------+-------------+--------------------+------+----+---+---+----+---------------+-----------+
|  Selecao|Numero|Posicao|Nome Camiseta|                Time|Altura|Peso|Dia|Mes| Ano|Data_Nascimento|Coluna_Nova|
+---------+------+-------+-------------+--------------------+------+----+---+---+----+---------------+-----------+
|Argentina|     3|     DF|   TAGLIAFICO|      AFC Ajax (NED)|   169|  65| 31| 08|1992|     1992-08-31| Argentinos|
|Argentina|    22|     MF|        PAVÓN|CA Boca Juniors (...|   169|  65| 21| 01|1996|     1996-01-21| Argentinos|
|Argentina|    15|     MF|      LANZINI|West Ham United F...|   167|  66| 15| 02|1993|     1993-02-15| Argentinos|
|Argentina|    18|     DF|       SALVIO|    SL Benfica (POR)|   167|  69| 13| 07|1990|     1990-07-13| Argentinos|
|Argentina|    10|     FW|        MESSI|  FC Barcelona (ESP)|   170|  72| 24| 06|1987|     1987-06-24| Argentinos|
+---------+------+-------+-------------+--------------------+------+----+---+---

In [12]:
#Criando coluna nova When e Otherwise com várias condicões

#Criando lista de continentes
europa = ['Russia','Sweden','Germany','France', 'Belgium','Croatia', 'Spain', 'Denmark', 'Iceland','Switzerland', 'England','Poland', 'Portugal','Serbia']
asia = ['IR Iran','Korea Republic','Saudi Arabia', 'Japan']
africa = ['Senegal','Nigeria','Morocco','Tunisia','Egypt']
oceania = ['Australia']
america_norte = ['Panama','Mexico','Costa Rica']
america_sul = ['Argentina','Peru','Uruguay', 'Brazil', 'Colombia']

#Criando coluna continente, baseada nos nomes dos paises cima
df = df.withColumn('Continente', when(col('Selecao').isin(europa), 'Europa')\
            .when(col('Selecao').isin(asia), 'Ásia')\
            .when(col('Selecao').isin(africa), 'Africa')\
            .when(col('Selecao').isin(oceania), 'Oceania')\
            .when(col('Selecao').isin(america_norte), 'América do Norte')\
            .when(col('Selecao').isin(america_sul), 'América do Sul')\
            .otherwise('Verificar'))

In [13]:
df.filter('Continente = "Verificar"').show()

+-------+------+-------+-------------+----+------+----+---+---+---+---------------+----------+
|Selecao|Numero|Posicao|Nome Camiseta|Time|Altura|Peso|Dia|Mes|Ano|Data_Nascimento|Continente|
+-------+------+-------+-------------+----+------+----+---+---+---+---------------+----------+
+-------+------+-------+-------------+----+------+----+---+---+---+---------------+----------+



##### Union (Concat)

Junta Data frames com mesmas colunas. (Um em cima do outro)

*OBs: as colunas tem que ser necessáriamente iguais

In [16]:
#Criando primeiro df
df_america_sul = df.filter('Continente = "América do Sul"')
df_america_sul.select('Selecao').distinct().show()


+---------+
|  Selecao|
+---------+
|Argentina|
|     Peru|
|  Uruguay|
|   Brazil|
| Colombia|
+---------+



In [17]:
#Criando segundo df
df_america_norte = df.filter('Continente = "América do Norte"')
df_america_norte.select('Selecao').distinct().show()

+----------+
|   Selecao|
+----------+
|    Panama|
|    Mexico|
|Costa Rica|
+----------+



In [18]:
#Juntando dfs um em baixo do outro
df_americas = df_america_sul.union(df_america_norte)
#Verificando se a uniao foi feita
df_americas.select('Selecao').distinct().show()

+----------+
|   Selecao|
+----------+
| Argentina|
|      Peru|
|   Uruguay|
|    Brazil|
|  Colombia|
|    Panama|
|    Mexico|
|Costa Rica|
+----------+



##### Preparando dados para fazer os Joins

In [26]:
#criando dfs separados para mostrar a funcao
arg = df.filter('Selecao = "Argentina"')
bra = df.filter('Selecao = "Brazil"')

In [27]:
#excluindo colunas desnescessárias
arg = arg.drop('Time','Dias','Mes','Ano','Continente','Peso', 'Data_Nascimento')
bra = bra.drop('Time','Dias','Mes','Ano','Continente','Peso', 'Data_Nascimento')

In [24]:
arg.show(5)

+---------+------+-------+-------------+------+---+
|  Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|
+---------+------+-------+-------------+------+---+
|Argentina|     3|     DF|   TAGLIAFICO|   169| 31|
|Argentina|    22|     MF|        PAVÓN|   169| 21|
|Argentina|    15|     MF|      LANZINI|   167| 15|
|Argentina|    18|     DF|       SALVIO|   167| 13|
|Argentina|    10|     FW|        MESSI|   170| 24|
+---------+------+-------+-------------+------+---+
only showing top 5 rows



In [28]:
bra.show(5)

+-------+------+-------+-------------+------+---+
|Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|
+-------+------+-------+-------------+------+---+
| Brazil|    18|     MF|         FRED|   169| 05|
| Brazil|    21|     FW|       TAISON|   172| 13|
| Brazil|    17|     MF|  FERNANDINHO|   179| 04|
| Brazil|    22|     DF|       FAGNER|   168| 11|
| Brazil|    10|     FW|    NEYMAR JR|   175| 05|
+-------+------+-------+-------------+------+---+
only showing top 5 rows



##### Join - Simples

In [29]:
dfnovo = arg.join(bra, arg.Numero == bra.Numero)
dfnovo.show(5)

+---------+------+-------+-------------+------+---+-------+------+-------+-------------+------+---+
|  Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|
+---------+------+-------+-------------+------+---+-------+------+-------+-------------+------+---+
|Argentina|     3|     DF|   TAGLIAFICO|   169| 31| Brazil|     3|     DF|      MIRANDA|   186| 07|
|Argentina|    22|     MF|        PAVÓN|   169| 21| Brazil|    22|     DF|       FAGNER|   168| 11|
|Argentina|    15|     MF|      LANZINI|   167| 15| Brazil|    15|     MF|     PAULINHO|   181| 25|
|Argentina|    18|     DF|       SALVIO|   167| 13| Brazil|    18|     MF|         FRED|   169| 05|
|Argentina|    10|     FW|        MESSI|   170| 24| Brazil|    10|     FW|    NEYMAR JR|   175| 05|
+---------+------+-------+-------------+------+---+-------+------+-------+-------------+------+---+
only showing top 5 rows



##### Inner Join

Mostra apenas os dados que tem correspondencias

In [30]:
#preparando dados. Mudando os números da arg para fazer o inner join com a bra

arg = arg.withColumn('Numero', col('Numero') + 1)
arg.show(23)

+---------+------+-------+-------------+------+---+
|  Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|
+---------+------+-------+-------------+------+---+
|Argentina|     4|     DF|   TAGLIAFICO|   169| 31|
|Argentina|    23|     MF|        PAVÓN|   169| 21|
|Argentina|    16|     MF|      LANZINI|   167| 15|
|Argentina|    19|     DF|       SALVIO|   167| 13|
|Argentina|    11|     FW|        MESSI|   170| 24|
|Argentina|     5|     DF|      ANSALDI|   181| 20|
|Argentina|     6|     MF|       BIGLIA|   175| 30|
|Argentina|     8|     MF|       BANEGA|   175| 29|
|Argentina|    15|     DF|   MASCHERANO|   174| 08|
|Argentina|    22|     FW|       DYBALA|   177| 15|
|Argentina|    20|     FW|       AGÜERO|   172| 02|
|Argentina|    10|     FW|      HIGUAÍN|   184| 10|
|Argentina|    12|     MF|     DI MARÍA|   178| 14|
|Argentina|    21|     MF|     LO CELSO|   177| 09|
|Argentina|    14|     MF|         MEZA|   180| 15|
|Argentina|     9|     DF|        ACUÑA|   172| 28|
|Argentina| 

In [31]:
dfnovo = arg.join(bra, arg.Numero == bra.Numero)
dfnovo.show(40)

+---------+------+-------+-------------+------+---+-------+------+-------+-------------+------+---+
|  Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|
+---------+------+-------+-------------+------+---+-------+------+-------+-------------+------+---+
|Argentina|     4|     DF|   TAGLIAFICO|   169| 31| Brazil|     4|     DF|      GEROMEL|   190| 21|
|Argentina|    23|     MF|        PAVÓN|   169| 21| Brazil|    23|     GK|      EDERSON|   188| 17|
|Argentina|    16|     MF|      LANZINI|   167| 15| Brazil|    16|     GK|       CASSIO|   195| 06|
|Argentina|    19|     DF|       SALVIO|   167| 13| Brazil|    19|     MF|      WILLIAN|   175| 09|
|Argentina|    11|     FW|        MESSI|   170| 24| Brazil|    11|     MF|  P. COUTINHO|   172| 12|
|Argentina|     5|     DF|      ANSALDI|   181| 20| Brazil|     5|     MF|     CASEMIRO|   185| 23|
|Argentina|     6|     MF|       BIGLIA|   175| 30| Brazil|     6|     DF|  FILIPE LUIS|   182| 09|


##### Left Join

Mostra todos que estao do lado esquerdo (arg)

In [32]:
dfnovo = arg.join(bra, arg.Numero == bra.Numero, 'left')
dfnovo.show(40)

+---------+------+-------+-------------+------+---+-------+------+-------+-------------+------+----+
|  Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|Selecao|Numero|Posicao|Nome Camiseta|Altura| Dia|
+---------+------+-------+-------------+------+---+-------+------+-------+-------------+------+----+
|Argentina|     4|     DF|   TAGLIAFICO|   169| 31| Brazil|     4|     DF|      GEROMEL|   190|  21|
|Argentina|    23|     MF|        PAVÓN|   169| 21| Brazil|    23|     GK|      EDERSON|   188|  17|
|Argentina|    16|     MF|      LANZINI|   167| 15| Brazil|    16|     GK|       CASSIO|   195|  06|
|Argentina|    19|     DF|       SALVIO|   167| 13| Brazil|    19|     MF|      WILLIAN|   175|  09|
|Argentina|    11|     FW|        MESSI|   170| 24| Brazil|    11|     MF|  P. COUTINHO|   172|  12|
|Argentina|     5|     DF|      ANSALDI|   181| 20| Brazil|     5|     MF|     CASEMIRO|   185|  23|
|Argentina|     6|     MF|       BIGLIA|   175| 30| Brazil|     6|     DF|  FILIPE LUIS|   

##### Right Join

Mostra todos os valores do lado direito(bra)

In [33]:
dfnovo = arg.join(bra, arg.Numero == bra.Numero, 'right')
dfnovo.show(40)

+---------+------+-------+-------------+------+----+-------+------+-------+-------------+------+---+
|  Selecao|Numero|Posicao|Nome Camiseta|Altura| Dia|Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|
+---------+------+-------+-------------+------+----+-------+------+-------+-------------+------+---+
|Argentina|    18|     DF|     OTAMENDI|   181|  12| Brazil|    18|     MF|         FRED|   169| 05|
|Argentina|    21|     MF|     LO CELSO|   177|  09| Brazil|    21|     FW|       TAISON|   172| 13|
|Argentina|    17|     DF|         ROJO|   189|  20| Brazil|    17|     MF|  FERNANDINHO|   179| 04|
|Argentina|    22|     FW|       DYBALA|   177|  15| Brazil|    22|     DF|       FAGNER|   168| 11|
|Argentina|    10|     FW|      HIGUAÍN|   184|  10| Brazil|    10|     FW|    NEYMAR JR|   175| 05|
|Argentina|    11|     FW|        MESSI|   170|  24| Brazil|    11|     MF|  P. COUTINHO|   172| 12|
|Argentina|     7|     DF|        FAZIO|   199|  17| Brazil|     7|     FW|     D. COSTA|  

##### Full Join

In [34]:
dfnovo = arg.join(bra, arg.Numero == bra.Numero,'full')
dfnovo.show(40)

+---------+------+-------+-------------+------+----+-------+------+-------+-------------+------+----+
|  Selecao|Numero|Posicao|Nome Camiseta|Altura| Dia|Selecao|Numero|Posicao|Nome Camiseta|Altura| Dia|
+---------+------+-------+-------------+------+----+-------+------+-------+-------------+------+----+
|     null|  null|   null|         null|  null|null| Brazil|     1|     GK|    A. BECKER|   193|  02|
|Argentina|     2|     GK|       GUZMÁN|   192|  10| Brazil|     2|     DF|     T. SILVA|   183|  22|
|Argentina|     3|     DF|      MERCADO|   181|  18| Brazil|     3|     DF|      MIRANDA|   186|  07|
|Argentina|     4|     DF|   TAGLIAFICO|   169|  31| Brazil|     4|     DF|      GEROMEL|   190|  21|
|Argentina|     5|     DF|      ANSALDI|   181|  20| Brazil|     5|     MF|     CASEMIRO|   185|  23|
|Argentina|     6|     MF|       BIGLIA|   175|  30| Brazil|     6|     DF|  FILIPE LUIS|   182|  09|
|Argentina|     7|     DF|        FAZIO|   199|  17| Brazil|     7|     FW|     D.

##### Semi Join

parecido com o Inner, mas mostra só o que tem correspondencia

In [36]:
dfnovo = arg.join(bra, arg.Numero == bra.Numero,'semi')
dfnovo.show(40)

+---------+------+-------+-------------+------+---+
|  Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|
+---------+------+-------+-------------+------+---+
|Argentina|     4|     DF|   TAGLIAFICO|   169| 31|
|Argentina|    23|     MF|        PAVÓN|   169| 21|
|Argentina|    16|     MF|      LANZINI|   167| 15|
|Argentina|    19|     DF|       SALVIO|   167| 13|
|Argentina|    11|     FW|        MESSI|   170| 24|
|Argentina|     5|     DF|      ANSALDI|   181| 20|
|Argentina|     6|     MF|       BIGLIA|   175| 30|
|Argentina|     8|     MF|       BANEGA|   175| 29|
|Argentina|    15|     DF|   MASCHERANO|   174| 08|
|Argentina|    22|     FW|       DYBALA|   177| 15|
|Argentina|    20|     FW|       AGÜERO|   172| 02|
|Argentina|    10|     FW|      HIGUAÍN|   184| 10|
|Argentina|    12|     MF|     DI MARÍA|   178| 14|
|Argentina|    21|     MF|     LO CELSO|   177| 09|
|Argentina|    14|     MF|         MEZA|   180| 15|
|Argentina|     9|     DF|        ACUÑA|   172| 28|
|Argentina| 

##### Anti Join

Ao contrario do Semi, mostra o que nao teve correspondencia

In [37]:
dfnovo = arg.join(bra, arg.Numero == bra.Numero, 'anti')
dfnovo.show(40)

+---------+------+-------+-------------+------+---+
|  Selecao|Numero|Posicao|Nome Camiseta|Altura|Dia|
+---------+------+-------+-------------+------+---+
|Argentina|    24|     GK|    CABALLERO|   186| 28|
+---------+------+-------+-------------+------+---+

