## UTILIZANDO OPERAÇOES DE JOIN, LEFT e RIGHT


In [5]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

# Cria uma instância de SparkSession, que é a entrada para usar o Spark
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()
    
spark

In [6]:
#Leitura dos arquivos de profissionais e salarios
profissional=spark.sparkContext.textFile(r"C:\Users\Kaue\Documents\Cursos\Databricks e PySpark\PySpark\Folhapagamento\Profissionais.txt")
salario=spark.sparkContext.textFile(r"C:\Users\Kaue\Documents\Cursos\Databricks e PySpark\PySpark\Folhapagamento\Salario.txt")

In [7]:
# Exibindo os dados sobre os profissionais
profissional.collect()

['Carlos,oncologista,hospital',
 'Ana,dentista,clinica',
 'Fernanda,enfermeira,hospital',
 'Sandra,pediatra,clinica',
 'Fatima,dentista,clinica',
 'Gilmar,cardiologista,hospital',
 'Fabio,pediatra,clinica',
 'Hilton,enfermeiro,clinica',
 'Daiane,dentista,clinica',
 'Paulo,farmaceutico,clinica',
 'Gilberto,pediatra,hospital']

In [8]:
#Exibindo os dados sobre salario
salario.collect()

['Carlos,10000',
 'Ana,7000',
 'Fernanda,5000',
 'Sandra,6000',
 'Fatima,8500',
 'Gilmar,9000',
 'Fabio,12000',
 'Hilton,5000',
 'Jefferson,8000',
 'Antonio,3000',
 'Joaquim,5000']

In [9]:
# Aplica a transformação 'map' no RDD 'profissional'.
# A função lambda é utilizada para:
# 1. Dividir cada linha do arquivo (cada elemento do RDD) em uma lista usando a vírgula como delimitador.
# 2. Criar uma tupla onde o primeiro elemento é o primeiro item da lista (x.split(",")[0]),
# e o segundo elemento é outra tupla com o segundo e o terceiro itens da lista (x.split(",")[1], x.split(",")[2]).
profissional_ajuste = profissional.map(lambda x: (x.split(",")[0], (x.split(",")[1], x.split(",")[2])))

# Coleta os resultados do RDD transformado 'profissional_ajuste' e os retorna como uma lista.
# Isso significa que o processamento distribuído é executado e os resultados são trazidos de volta para o driver.
profissional_ajuste.collect()

[('Carlos', ('oncologista', 'hospital')),
 ('Ana', ('dentista', 'clinica')),
 ('Fernanda', ('enfermeira', 'hospital')),
 ('Sandra', ('pediatra', 'clinica')),
 ('Fatima', ('dentista', 'clinica')),
 ('Gilmar', ('cardiologista', 'hospital')),
 ('Fabio', ('pediatra', 'clinica')),
 ('Hilton', ('enfermeiro', 'clinica')),
 ('Daiane', ('dentista', 'clinica')),
 ('Paulo', ('farmaceutico', 'clinica')),
 ('Gilberto', ('pediatra', 'hospital'))]

In [10]:
#realizando o mapeamento dos campos sobre salario
salario_ajuste=salario.map(lambda x:(x.split(",")[0],x.split(",")[1]))

In [11]:
# Realizando a operacao de join, que significa a busca de todas as informacoes no RDD
# profissional e que exista no RDD salario
folha_pagamento_join=profissional_ajuste.join(salario_ajuste)
folha_pagamento_join.collect()

[('Carlos', (('oncologista', 'hospital'), '10000')),
 ('Ana', (('dentista', 'clinica'), '7000')),
 ('Fabio', (('pediatra', 'clinica'), '12000')),
 ('Hilton', (('enfermeiro', 'clinica'), '5000')),
 ('Sandra', (('pediatra', 'clinica'), '6000')),
 ('Fatima', (('dentista', 'clinica'), '8500')),
 ('Fernanda', (('enfermeira', 'hospital'), '5000')),
 ('Gilmar', (('cardiologista', 'hospital'), '9000'))]

In [12]:
# Realizando a operacao de left, que significa a busca de todas as informacoes no RDD
# profissional e que exista ou nao no RDD salario
folha_pagamento_join=profissional_ajuste.leftOuterJoin(salario_ajuste)
folha_pagamento_join.collect()

[('Paulo', (('farmaceutico', 'clinica'), None)),
 ('Carlos', (('oncologista', 'hospital'), '10000')),
 ('Ana', (('dentista', 'clinica'), '7000')),
 ('Fabio', (('pediatra', 'clinica'), '12000')),
 ('Hilton', (('enfermeiro', 'clinica'), '5000')),
 ('Gilberto', (('pediatra', 'hospital'), None)),
 ('Sandra', (('pediatra', 'clinica'), '6000')),
 ('Fatima', (('dentista', 'clinica'), '8500')),
 ('Fernanda', (('enfermeira', 'hospital'), '5000')),
 ('Gilmar', (('cardiologista', 'hospital'), '9000')),
 ('Daiane', (('dentista', 'clinica'), None))]

In [13]:
# Realizando a operacao de right, que significa a busca de todas as informacoes no RDD
# salario e que exista ou nao no RDD profissional
folha_pagamento_join=profissional_ajuste.rightOuterJoin(salario_ajuste)
folha_pagamento_join.collect()

[('Antonio', (None, '3000')),
 ('Carlos', (('oncologista', 'hospital'), '10000')),
 ('Ana', (('dentista', 'clinica'), '7000')),
 ('Fabio', (('pediatra', 'clinica'), '12000')),
 ('Hilton', (('enfermeiro', 'clinica'), '5000')),
 ('Jefferson', (None, '8000')),
 ('Joaquim', (None, '5000')),
 ('Sandra', (('pediatra', 'clinica'), '6000')),
 ('Fatima', (('dentista', 'clinica'), '8500')),
 ('Fernanda', (('enfermeira', 'hospital'), '5000')),
 ('Gilmar', (('cardiologista', 'hospital'), '9000'))]