<a href="https://colab.research.google.com/github/leomoritz/bigdata-pyspark/blob/master/pythonspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Prática 01: Configuração do Ambiente**

Instale a linguagem Java 8, que é o pré-requisito para usar o Apache Spark:

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Instale o Apache Spark com Hadoop, executando o seguinte comando:

In [2]:
!wget -q https://downloads.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz

In [3]:
!tar xf spark-3.3.3-bin-hadoop3.tgz

Para instalar o FindSpark (adiciona o PySpark no caminho do sistema sys.path em tempo de execução), use o seguinte comando:

In [4]:
!pip install -q findspark

Verificando se o findspark foi instalado com sucesso:

In [None]:
help()

Instalando o PySpark:

In [5]:
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


Salvando as variáveis de ambiente

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

# **Prática 02: Começando a trabalhar com Spark**

Importando e iniciando o framework do findspark

In [7]:
import findspark as fs
fs.init()

In [8]:
from pyspark import SparkContext
spark_context = SparkContext()

Iniciando sessão do spark local para criar uma instância caso não exista.

In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Salvando os dados de um CSV dentro de um dataset.

In [10]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv', inferSchema=True, header=True)

Visualização dos dados

In [11]:
dataset.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [12]:
dataset.head()

Row(longitude=-122.05, latitude=37.37, housing_median_age=27.0, total_rooms=3885.0, total_bedrooms=661.0, population=1537.0, households=606.0, median_income=6.6085, median_house_value=344700.0)

In [13]:
dataset.count()

3000

Encerrando o spark

In [14]:
spark.stop()

# Prática 3 - Spark no Python

In [15]:
import findspark as fs
fs.init()

In [82]:
from pyspark import SparkContext
spark_context = SparkContext()
print(spark_context)
print(spark_context.version)

<SparkContext master=local[*] appName=pyspark-shell>
3.3.3


In [17]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7dad2414d450>


In [18]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv', inferSchema=True, header=True)

In [19]:
dataset.head()

Row(longitude=-122.05, latitude=37.37, housing_median_age=27.0, total_rooms=3885.0, total_bedrooms=661.0, population=1537.0, households=606.0, median_income=6.6085, median_house_value=344700.0)

In [20]:
dataset.createOrReplaceTempView("tabela_temporaria")
print(spark.catalog.listTables())

[Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


In [21]:
query = "FROM tabela_temporaria SELECT longitude, latitude LIMIT 3"
saida = spark.sql(query)
saida.show()

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -122.05|   37.37|
|   -118.3|   34.26|
|  -117.81|   33.78|
+---------+--------+



# Prática 4 - Spark com Pandas

Implementar a consulta SQL na tabela chamada de tabela_temporaria, que já carregamos no Spark para retornar a quantidade máxima de quartos;

In [22]:
query1 = "SELECT MAX(total_rooms) as maximo_quartos FROM tabela_temporaria"

Executar a consulta SQL no Spark e, assim, obter um DataFrame do Spark;

In [23]:
q_maximo_quartos = spark.sql(query1)

Converter o resultado da etapa anterior para um DataFrame do Pandas;

In [24]:
pd_maximo_quartos = q_maximo_quartos.toPandas()

Imprimir o resultado da consulta;

In [25]:
print('A quantidade máxima de quartos é: {}'.format(pd_maximo_quartos))

A quantidade máxima de quartos é:    maximo_quartos
0         30450.0


Converter o valor do DataFrame para um valor inteiro.

In [26]:
qtd_maximo_quartos = int(pd_maximo_quartos.loc[0,'maximo_quartos'])

Implementar a consulta SQL para retornar a latitude e a longitude da residência com a quantidade máxima de quartos que obtivemos na execução do programa anterior;

In [27]:
query2 = "SELECT longitude, latitude FROM tabela_temporaria WHERE total_rooms="+str(qtd_maximo_quartos)

Executar SQL no Spark e obter o resultado no DataFrame do Spark

In [28]:
localizacao_maximo_quartos = spark.sql(query2)

Converter o DataFrame do Spark para o DataFrame do Pandas

In [29]:
pd_localizacao_maximo_quartos = localizacao_maximo_quartos.toPandas()

Exibir resultado

In [30]:
print(pd_localizacao_maximo_quartos)

   longitude  latitude
0     -117.2     33.58


# Prática 5 - Convertendo Pandas DataFrame para Spark DataFrame

Importando as bibliotecas numpy para gerar os dados e pandas para organizá-los em um dataframe.

In [32]:
import pandas as pd
import numpy as np

Gerando os dados com numpy de forma randomica e armazenando em um dataframe do pandas.

In [33]:
media = 0
desvio_padrao = 0.1
pd_temporario = pd.DataFrame(np.random.normal(media, desvio_padrao, 100))

Convertendo o dataframe do pandas em dataframe do spark e printando resultado

In [35]:
spark_temporario = spark.createDataFrame(pd_temporario);
print(spark.catalog.listTables())

[Table(name='nova_tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


Aqui, listamos apenas as tabelas que já estavam no catálogo. Agora listaremos as informações sobre a última tabela que criamos através da função createOrReplaceTempView:

In [36]:
spark_temporario.createOrReplaceTempView("nova_tabela_temporaria")
print(spark.catalog.listTables())

[Table(name='nova_tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='tabela_temporaria', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


Por fim, encerraremos a sessão:

In [37]:
spark.stop()

# Prática 6 - MapReduce

Considerando que o contexto já foi instanciado e a biblioteca numpy importada anteriormente, então o primeiro passo aqui é criar um vetor de dados

In [58]:
vetor = np.array([10, 20, 30, 40, 50])

Feito isso, agora o próximo passo a criar um RDD por meio de um SparkContext

In [59]:
paralelo = spark_context.parallelize(vetor)
print(paralelo)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274


O próximo passo é aplicar o mapeamento através de uma função lambda que pegará cada elemento do vetor (x) e irá elevá-lo ao quadrado e por fim somar a ele mesmo. Ex: 10² + 10 = 110.

In [65]:
mapa = paralelo.map(lambda x : x**2 + x)
outro_mapa = paralelo.map(lambda x : x**4-10*x**2+3)

Agora é possível coletar os dados para serem verificados

In [63]:
mapa.collect()

[110, 420, 930, 1640, 2550]

In [72]:
somatorio = 0
for x in outro_mapa.collect():
  somatorio += x
print(somatorio)

9735015


Podemos agora utilizar um outro conjunto de dados formado por palavras. Com isso, variável paralelo agora faz referência ao RDD:

In [48]:
paralelo = spark_context.parallelize(["distribuida", "distribuida", "spark", "rdd", "spark", "spark"])

O próximo passo é implementar uma função lambda para processar este conjunto de dados. Neste caso, iremos associar o número 1 a uma palavra, formando um conjunto de chave;valor:

In [49]:
funcao_lambda = lambda x:(x, 1)

Feito isso, agora podemos aplicar o MapReduce para organizar os dados e depois reduzir eles por chave, somando a quantidade de vezes que esta chave é encontrada no conjunto inicial. Resumidamente a função reduceByKey soma as ocorrência e as agrupa pela chave - no caso, pelas palavras. Por fim, coletamos um novo conjunto de dados onde teremos também uma chave(nome);valor(vezes em que a chave aparece no conjunto que foi processado).

In [50]:
from operator import add
mapa = paralelo.map(funcao_lambda).reduceByKey(add).collect()

Agora podemos percorrer a lista mapa para visualizar cada par formado pela palavra e sua respectiva ocorrência:

In [51]:
for (w, c) in mapa:
  print("{}: {}".format(w,c))

distribuida: 2
spark: 3
rdd: 1


Por fim, então finalizamos o contexto do spark

In [52]:
spark_context.stop()

# Prática 7 - MapReduce com transformação e ação

Considerando que o SparkContext já foi instanciado anteriormente, então o próximo passo é fornecer uma lista de entrada e transformá-la em um RDD do spark:

In [83]:
lista = [1,2,3,4,5,3]
lista_rdd = spark_context.parallelize(lista)

Agora podemos executar uma ação, como por exemplo de contar os elementos do RDD:

In [84]:
lista_rdd.count()

6

Além disso, podemos criar uma função lambda que recebe um número como parâmetro e retorna um par formado pelo número do parâmetro e pelo mesmo número multiplicado por 10:

In [85]:
func_parOrdenado = lambda numero: (numero, numero*10)
func_parOrdenadoQuadrado = lambda numero: (numero, numero**2)

Com a função criada, podemos aplicar a transformação flatMap com a ação collect da função lambda para a lista_rdd:

In [86]:
lista_rdd.flatMap(func_parOrdenado).collect()

[1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 3, 30]

Além disso, podemos ainda aplicar a transformação map com a ação collect da função lambda para a lista_rdd e desta forma produzir uma saída de conjunto de chave;valor dentro do vetor principal:

In [87]:
lista_rdd.map(func_parOrdenado).collect()

[(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (3, 30)]

In [88]:
lista_rdd.map(func_parOrdenadoQuadrado).collect()

[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25), (3, 9)]

Por fim, encerramos o contexto do spark:

In [89]:
spark_context.stop()