# Como utilizar o PySpark?

- Conectar-se a um cluster Spark do Pyspark
- Realizar operações com Spark DataFrames

### Conectar-se a um cluster Spark do Pyspark

#### Inicializar o Findspark

In [22]:
import findspark
findspark.init()

#### Importar a biblioteca dos pyspark SparkContext

In [23]:
from pyspark import SparkContext

#### Instanciar um objeto do SparkContext

In [24]:
#spark_contexto = SparkContext()

#### Conferir o Objeto instanciado

In [25]:
#print(spark_contexto)
#print(spark_contexto.version)

### Realizar Operações com Spark DataFrames

#### Criando conexão com um Cluster

##### Importando a biblioteca do pyspark SparkSession

In [2]:
from pyspark.sql import SparkSession

##### Criando uma Seção do Spark

In [3]:
spark = SparkSession.builder.getOrCreate()

##### Imprimir/Verificar a Seção criada

In [28]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x0000022BC3267EF0>


#### Carregando Dados

##### Leitura dos Dados de um Arquivo

In [29]:
dataset = spark.read.csv('G:/Meu Drive/01 - Faculdade de Ciência de Dados/Uni-DS-Codes/Periodo 3/Tópicos de Big Data em Python/Tema 3 - Principios de Desenvolvimento de Spark com Python/california_housing_test.csv',inferSchema=True, header=True)

##### Exibindo os Cabeçalhos das Colunas e os Valores do Primeiro Registro

In [30]:
dataset.head()

Row(longitude=-122.23, latitude=37.88, housing_median_age=41.0, total_rooms=880.0, total_bedrooms=129.0, population=322.0, households=126.0, median_income=8.3252, median_house_value=452600.0, ocean_proximity='NEAR BAY')

##### Obtendo a Quantidade de Linhas/Registros no Arquivo

In [31]:
dataset.count()

20640

#### Criando o DataFrame

##### Criando uma Tabela SQL Temporária

In [32]:
dataset.createOrReplaceTempView('tabela_temporaria')

##### Obtendo 3 Registros da Tabela Temporária

In [33]:
# Cria uma string com a instrução da consulta SQL que será feita
query = "SELECT longitude, latitude FROM tabela_temporaria LIMIT 3"

In [34]:
# O Spark processa a consulta e armazena o resultado em uma variável
saida = spark.sql(query)

In [35]:
# Exibição dos Dados obtidos com a Consulta
saida.show()

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -122.23|   37.88|
|  -122.22|   37.86|
|  -122.24|   37.85|
+---------+--------+



## Utilizando o PySpark com o Pandas

### Converter Spark SQL para Pandas

#### Obtendo o número máximo de quartos

##### 1-Implementar consulta SQL para retornar a quantidade máxima de quartos:

In [45]:
query1 = 'SELECT MAX(total_rooms) AS maximo_quartos FROM tabela_temporaria'

##### 2-Executar a consulta SQL no Spark para obter o DataFrame do Spark:

In [46]:
q_maximo_quartos = spark.sql(query1)

##### 3-Converter o DataFrame do Spark em um DataFrame dos Pandas:

In [48]:
import pandas as pd
pd_maximo_quartos = q_maximo_quartos.toPandas()

##### 4-Imprimir o Resultado da Consulta:

In [49]:
print(f'A quantidade máxima de quartos é: {pd_maximo_quartos['maximo_quartos']}')

A quantidade máxima de quartos é: 0    39320.0
Name: maximo_quartos, dtype: float64


##### 5-Converter o valor do DataFrame para um valor inteiro:

In [50]:
qtd_maximo_quartos = int(pd_maximo_quartos.loc[0,'maximo_quartos'])

#### Obtendo a Localização do número máximo de quartos

##### 1-Implementar a Consulta SQL para retornar a latitude e longitude da residência com mais quartos:

In [51]:
query2 = 'SELECT longitude, latitude FROM tabela_temporaria WHERE total_rooms = ' + str(qtd_maximo_quartos)

##### 2-Executar o SQL no Spark para obter o resultado no DataFrame do Spark:

In [52]:
localizacao_maximo_quartos = spark.sql(query2)

##### 3-Converter o DataFrame do Spark para DataFrame do Pandas:

In [53]:
pd_localizacao_maximo_quartos = localizacao_maximo_quartos.toPandas()

##### 4-Exibir o Resultado:

In [54]:
print(pd_localizacao_maximo_quartos.head())

   longitude  latitude
0    -121.44     38.43


### Fechando a Seção do Spark:

In [4]:
spark.stop()