# **Apresentação da Trilha do Conhecimento**

**Estudo de Caso: PySpark**

**prof: Sérgio Assunção Monteiro, DSc**


https://www.linkedin.com/in/sergio-assun%C3%A7%C3%A3o-monteiro-b781897b/

# **Instalação do Spark**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [3]:
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

In [4]:
!pip install -q findspark

In [5]:
!pip install -q pyspark

# **Configurar Variáveis de Ambiente**

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

# **Iniciar Spark**

In [7]:
import findspark
findspark.init()

# **PySpark**

**Instanciar o SparkContext**

In [8]:
from pyspark import SparkContext
spark_contexto = SparkContext() # Instantiate SparkContext

**Verificar o SparkContext**

In [9]:
print(spark_contexto)      

<SparkContext master=local[*] appName=pyspark-shell>


**Versão do Spark**

In [10]:
print(spark_contexto.version)

3.1.2


**Criar uma instância do Spark**

In [11]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder.master("local[*]").getOrCreate()

**Imprimir Instância do Spark**

In [12]:
print(spark) 

<pyspark.sql.session.SparkSession object at 0x7f63c6546890>


**Carregar os dados do arquivo no Spark Dataframe**

In [13]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv',inferSchema=True, header =True)

**Imprimir o Esquema**

In [14]:
dataset.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



**Visualizar um subconjunto dos Dados**

In [15]:
dataset.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows



In [16]:
dataset.head()

Row(longitude=-122.05, latitude=37.37, housing_median_age=27.0, total_rooms=3885.0, total_bedrooms=661.0, population=1537.0, households=606.0, median_income=6.6085, median_house_value=344700.0)

**Total de registros**

In [17]:
dataset.count()

3000

**Criar Tabela SQL Temporária**

In [18]:
dataset.createOrReplaceTempView("tabela_temporaria")

**Imprimir as Tabelas do Catálogo**

In [19]:
print(spark.catalog.listTables()) 

[Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


**Criar consultas SQL**

In [20]:
query = "FROM tabela_temporaria SELECT longitude, latitude LIMIT 3"  # Don't change this query

**Executar a Query**

In [21]:
saida = spark.sql(query)  # Get the first 10 rows of flights

**Exibir o resultado da Query**

In [22]:
saida.show() # Show the results

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -122.05|   37.37|
|   -118.3|   34.26|
|  -117.81|   33.78|
+---------+--------+



**Parar o Spark**

In [23]:
spark.stop()