<a href="https://colab.research.google.com/github/webertmaximiano/apache-spark-colab/blob/main/ApacheSpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Instalação do **Spark**

In [35]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [36]:
!wget -q https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz

In [37]:
# Descompactando os arquivos
!tar xf spark-3.2.4-bin-hadoop3.2.tgz

In [38]:
!pip install -q findspark

In [39]:
!pip install -q pyspark

# Configurar Variáveis de **Ambiente**

In [40]:
import os
# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

# **Iniciar Spark**

In [41]:
import findspark

In [42]:
findspark.init()

# **PySpark**

**Instanciar o SparkContext**

In [43]:
from pyspark import SparkContext
spark_contexto = SparkContext() # Instantiate SparkContext

**Verificar o SparkContext**

In [44]:
print(spark_contexto)

<SparkContext master=local[*] appName=pyspark-shell>


**Versão do Spark**

In [46]:
print(spark_contexto.version)

3.2.4


**Criar uma instância do Spark**

In [19]:

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

**Imprimir Instância do Spark**

In [20]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fa17712a3b0>


**Carregar os dados do arquivo no Spark Dataframe**

In [21]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv',inferSchema=True, header =True)

**Imprimir o Esquema**

In [22]:
dataset.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



**Visualizar um subconjunto dos Dados**

In [23]:
dataset.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows



In [24]:
dataset.head()

Row(longitude=-122.05, latitude=37.37, housing_median_age=27.0, total_rooms=3885.0, total_bedrooms=661.0, population=1537.0, households=606.0, median_income=6.6085, median_house_value=344700.0)

**Total de registros**

In [25]:
dataset.count()

3000

**Criar Tabela SQL Temporária**

In [28]:
dataset.createOrReplaceTempView("tabela_temporaria")

**Imprimir as Tabelas do Catálogo**

In [29]:
print(spark.catalog.listTables())

[Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


**Criar consultas SQL**

In [30]:
query = "FROM tabela_temporaria SELECT longitude, latitude LIMIT 3"  # Don't change this query

**Executar a Query**

In [31]:
saida = spark.sql(query)  # Get the first 10 rows of flights

**Exibir o resultado da Query**

In [32]:
saida.show() # Show the results

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -122.05|   37.37|
|   -118.3|   34.26|
|  -117.81|   33.78|
+---------+--------+



**Parar o Spark**

In [50]:
spark.stop()

**Práticas com MapReduce**

**Exemplo 01**

In [51]:
import numpy as np

In [52]:
from pyspark import SparkContext
spark_contexto = SparkContext() # Instanciar um SparkContext

In [53]:
vetor = np.array([10, 20, 30, 40, 50])

In [54]:
paralelo = spark_contexto.parallelize(vetor)

In [55]:
mapa = paralelo.map(lambda x : x**2+x)

In [56]:
mapa.collect()

[110, 420, 930, 1640, 2550]

**Exemplo 02**

In [57]:
paralelo = spark_contexto.parallelize(["distribuida", "distribuida", "spark", "rdd", "spark", "spark"])

In [58]:
funcao_lambda = lambda x:(x,1)

In [59]:
from operator import add
mapa = paralelo.map(funcao_lambda).reduceByKey(add).collect()

In [60]:
for (w, c) in mapa:
    print("{}: {}".format(w, c))

distribuida: 2
spark: 3
rdd: 1


In [61]:
spark_contexto.stop()

**Exemplo 03**

In [62]:
from pyspark import SparkContext
spark_contexto = SparkContext()

In [63]:
lista = [1, 2, 3, 4, 5, 3]
lista_rdd = spark_contexto.parallelize(lista)

In [64]:
lista_rdd.count()

6

In [65]:
par_ordenado = lambda numero: (numero, numero*10)

In [66]:
lista_rdd.flatMap(par_ordenado).collect()

[1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 3, 30]

In [67]:
lista_rdd.map(par_ordenado).collect()

[(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (3, 30)]

In [68]:
spark_contexto.stop()