#**Explorar as Funcionalidades do PySpark**

**prof: Sergio Assuncao Monteiro, DSc**

lattes: http://lattes.cnpq.br/9489191035734025

# **(0) Pre-Requisitos**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

# **(1) Configuracao das Variaveis de Ambiente**

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

# **Exemplo 01:**

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
#Read Spark Dataframe
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv',inferSchema=True, header =True)

In [None]:
dataset.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows



In [None]:
dataset.head(5)

[Row(longitude=-122.05, latitude=37.37, housing_median_age=27.0, total_rooms=3885.0, total_bedrooms=661.0, population=1537.0, households=606.0, median_income=6.6085, median_house_value=344700.0),
 Row(longitude=-118.3, latitude=34.26, housing_median_age=43.0, total_rooms=1510.0, total_bedrooms=310.0, population=809.0, households=277.0, median_income=3.599, median_house_value=176500.0),
 Row(longitude=-117.81, latitude=33.78, housing_median_age=27.0, total_rooms=3589.0, total_bedrooms=507.0, population=1484.0, households=495.0, median_income=5.7934, median_house_value=270500.0),
 Row(longitude=-118.36, latitude=33.82, housing_median_age=28.0, total_rooms=67.0, total_bedrooms=15.0, population=49.0, households=11.0, median_income=6.1359, median_house_value=330000.0),
 Row(longitude=-119.67, latitude=36.33, housing_median_age=19.0, total_rooms=1241.0, total_bedrooms=244.0, population=850.0, households=237.0, median_income=2.9375, median_house_value=81700.0)]

In [None]:
dataset.count()

3000

In [None]:
#Create a temporary SQL Table
dataset.createOrReplaceTempView("tabela_temporaria")
print(spark.catalog.listTables()) # Print the tables in the catalog

[Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [None]:
#query = "FROM tabela_temporaria SELECT * LIMIT 3"  # Don't change this query
query = "FROM tabela_temporaria SELECT longitude, latitude LIMIT 3"  # Don't change this query

saida = spark.sql(query)  # Get the first 10 rows of flights
saida.show() # Show the results

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -122.05|   37.37|
|   -118.3|   34.26|
|  -117.81|   33.78|
+---------+--------+



**Converter SQL para PANDAS**

In [None]:
# Don't change this query
query1 = "SELECT MAX(total_rooms) as maximo_quartos FROM tabela_temporaria"

q_maximo_quartos = spark.sql(query1)

pd_maximo_quartos = q_maximo_quartos.toPandas()

print('A quantidade máxima de quartos é: {}'.format(pd_maximo_quartos['maximo_quartos']))

qtd_maximo_quartos = int(pd_maximo_quartos.loc[0,'maximo_quartos'])

A quantidade máxima de quartos é: 0    30450.0
Name: maximo_quartos, dtype: float64


**Query no Spark**

In [None]:
query2 = "SELECT longitude, latitude FROM tabela_temporaria WHERE total_rooms = "+str(qtd_maximo_quartos)
localizacao_maximo_quartos = spark.sql(query2)                      # Run the query
pd_localizacao_maximo_quartos = localizacao_maximo_quartos.toPandas() # Convert the results to a pandas DataFrame
print(pd_localizacao_maximo_quartos.head())                         # Print the head of pd_counts

   longitude  latitude
0     -117.2     33.58


**Converter PANDAS DataFrame para Spark DataFrame**

In [None]:
import pandas as pd
import numpy as np
media = 0
desvio_padrao=0.1
pd_temporario = pd.DataFrame(np.random.normal(media,desvio_padrao,100))                 # Create pd_temp
spark_temporario = spark.createDataFrame(pd_temporario)             # Create spark_temp from pd_temp
print(spark.catalog.listTables())                                   # Examine the tables in the catalog
spark_temporario.createOrReplaceTempView("nova_tabela_temporaria")  # Add spark_temp to the catalog
print(spark.catalog.listTables())                                   # Examine the tables in the catalog again

  for column, series in pdf.iteritems():


[Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
[Table(name='nova_tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [None]:
spark.stop()