<a href="https://colab.research.google.com/github/mat-garcia/PySpark-GColab/blob/main/Spark_Init.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Configurando Ambiente**

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://archive.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

In [3]:
!pip install -q findspark

In [5]:
!pip install -q pyspark

In [4]:
!pip install -q pandas

In [6]:
!pip install -q numpy

[31mERROR: Operation cancelled by user[0m[31m
[0m

In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.10/dist-packages/pyspark"


# **Executando o Spark**

In [8]:
import findspark
findspark.init()

In [9]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [10]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv',inferSchema=True, header =True)

In [11]:
dataset.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [12]:
dataset.head()

Row(longitude=-122.05, latitude=37.37, housing_median_age=27.0, total_rooms=3885.0, total_bedrooms=661.0, population=1537.0, households=606.0, median_income=6.6085, median_house_value=344700.0)

In [13]:
spark.stop()

## **Exemplos Pandas com Spark**
- Convertendo spark sql em pandas
- Convertendo Pandas DataFrame para Spark DataFrame

In [14]:
from pyspark import SparkContext
spark_contexto = SparkContext()
print(spark_contexto)
print(spark_contexto.version)

<SparkContext master=local[*] appName=pyspark-shell>
3.4.1


In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7a11440de140>


In [16]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv',inferSchema=True, header =True)

In [17]:
dataset.createOrReplaceTempView("tabela_temporaria")
print(spark.catalog.listTables())


[Table(name='tabela_temporaria', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


In [18]:
query = "FROM tabela_temporaria SELECT longitude, latitude LIMIT 3"
saida = spark.sql(query)
saida.show()

+---------+--------+
|longitude|latitude|
+---------+--------+
|  -122.05|   37.37|
|   -118.3|   34.26|
|  -117.81|   33.78|
+---------+--------+



**Convertendo Spark para Pandas**

In [20]:
query1 = "SELECT MAX(total_rooms) as maximo_quartos FROM tabela_temporaria"
q_maximo_quartos = spark.sql(query1)
pd_maximo_quartos = q_maximo_quartos.toPandas()
print('A quantidade máxima de quartos é: {}'.format(pd_maximo_quartos['maximo_quartos']))
qtd_maximo_quartos = int(pd_maximo_quartos.loc[0,'maximo_quartos'])


A quantidade máxima de quartos é: 0    30450.0
Name: maximo_quartos, dtype: float64


In [21]:

query2 = "SELECT longitude, latitude FROM tabela_temporaria WHERE total_rooms = "+str(qtd_maximo_quartos)
localizacao_maximo_quartos = spark.sql(query2)
pd_localizacao_maximo_quartos = localizacao_maximo_quartos.toPandas()
print(pd_localizacao_maximo_quartos.head())

   longitude  latitude
0     -117.2     33.58


**Convertendo Pandas DataFrame para Spark DataFrame**

In [23]:
import pandas as pd
import numpy as np

In [24]:
media = 0
desvio_padrao=0.1
pd_temporario = pd.DataFrame(np.random.normal(media,desvio_padrao,100))
spark_temporario = spark.createDataFrame(pd_temporario)
print(spark.catalog.listTables())
spark_temporario.createOrReplaceTempView("nova_tabela_temporaria")
print(spark.catalog.listTables())

[Table(name='tabela_temporaria', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
[Table(name='nova_tabela_temporaria', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='tabela_temporaria', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


In [26]:
spark.stop()

## **Praticas com MapReduce**

**exemplo 1**

In [27]:
from pyspark import SparkContext
spark_contexto = SparkContext()

In [28]:
import numpy as np

In [29]:
vetor = np.array([10, 20, 30, 40, 50])

In [31]:
paralelo = spark_contexto.parallelize(vetor)

In [32]:
print(paralelo)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287


In [33]:
mapa = paralelo.map(lambda x : x**2+x)

In [34]:
mapa.collect()

[110, 420, 930, 1640, 2550]

**exemplo 2**

In [35]:
paralelo = spark_contexto.parallelize(["distribuida", "distribuida", "spark", "rdd", "spark", "spark"])

In [36]:
funcao_lambda = lambda x:(x,1)

In [37]:
from operator import add
mapa = paralelo.map(funcao_lambda).reduceByKey(add).collect()

In [39]:
for (w, c) in mapa:

  print("{}: {}".format(w, c))

distribuida: 2
spark: 3
rdd: 1


In [40]:
spark_contexto.stop()

## **Exemplo Transformaçao e Acão**

In [41]:
from pyspark import SparkContext

spark_contexto = SparkContext()

In [42]:
lista = [1, 2, 3, 4, 5, 3]
lista_rdd = spark_contexto.parallelize(lista)

In [43]:
lista_rdd.count()

6

In [44]:
par_ordenado = lambda numero: (numero, numero*10)

In [45]:
lista_rdd.flatMap(par_ordenado).collect()

[1, 10, 2, 20, 3, 30, 4, 40, 5, 50, 3, 30]

In [46]:
lista_rdd.map(par_ordenado).collect()

[(1, 10), (2, 20), (3, 30), (4, 40), (5, 50), (3, 30)]

In [47]:
spark_contexto.stop()