# Introdução Teórica

PySpark é uma biblioteca de processamento distribuído de dados em larga escala. Ele fornece uma interface em Python para a estrutura de processamento de dados distribuída Apache Spark, permitindo que usuários executem operações de processamento de grandes conjuntos de dados em clusters distribuídos.

# Cheat Sheet

<p align = "center">
<img src = "https://res.cloudinary.com/dyd911kmh/image/upload/f_auto,q_auto:best/v1625837623/PySpark_Cheat_Sheet-_Spark_in_Python_owornh.png" width = 1400>
</p>

# Primeiros passos com PySpark

In [1]:
import pyspark as spark
import pandas as pd
from pyspark.sql import SparkSession

print(f"A versão atual do PySpark é: {spark.__version__}")

A versão atual do PySpark é: 3.4.0


In [2]:
# Cria ou recupera uma sessão Spark
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x000001F96B3B6350>


In [3]:
# Retorna o nome de todas as tabelas dentro do clusters em uma lista
print(f"Lista de todas as tabelas no cluster atual: {my_spark.catalog.listTables()}")

Lista de todas as tabelas no cluster atual: []


In [4]:
# Vamos carregar o DataFrame Iris:
iris_df = pd.read_csv(
    r"G:\Meu Drive\Data Science\Dados\Classificação\Iris\Iris.csv"
)

iris_df.drop('Id', axis = 1, inplace = True)

iris_df.head(3)

Unnamed: 0,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa


In [33]:
# Caso queiramos carregar utilizando PySpark, podemos fazer da seguinte forma:

iris_spark = my_spark.createDataFrame(iris_df) # Cria um DataFrame Spark a partir de um DataFrame do Pandas

# ou

iris_spark = my_spark.read.csv(
    r"G:\Meu Drive\Data Science\Dados\Classificação\Iris\Iris.csv",
    header = True
)

# Podemos ver o tipo de dados de cada coluna:
print(iris_spark)

DataFrame[Id: string, SepalLengthCm: string, SepalWidthCm: string, PetalLengthCm: string, PetalWidthCm: string, Species: string]


In [34]:
# Para visualizar as primeiras linhas do DataFrame, podemos utilizar o método show():
iris_spark.show(5)

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



In [35]:
# Adiciona a tabela ao catálogo do cluster atual:
iris_spark.createOrReplaceTempView("iris")

print(f"Lista de todas as tabelas no cluster atual: {my_spark.catalog.listTables()}")

Lista de todas as tabelas no cluster atual: [Table(name='iris', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


In [36]:
# Realiza uma QUERY SQL em uma tabela do cluster.
query = "SELECT * FROM iris"
query_df = my_spark.sql(query)
query_df.show()

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|  6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|  7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|  9|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
| 10|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
| 11|          5.4|         3.7|          1.5|         0.2|Iris-

# Manipulando Colunas

In [38]:
# Cria uma nova coluna no DataFrame:
iris_spark.withColumn("new_column", iris_spark.Species)

DataFrame[Id: string, SepalLengthCm: string, SepalWidthCm: string, PetalLengthCm: string, PetalWidthCm: string, Species: string, new_column: string]

In [57]:
# Renomeia uma coluna no DataFrame:
iris_spark.withColumnRenamed("Id", "ColunaInutil").show()

+------------+-------------+------------+-------------+------------+-----------+
|ColunaInutil|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+------------+-------------+------------+-------------+------------+-----------+
|           1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|           2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|           3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|           4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|           5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|           6|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|           7|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|           8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|           9|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
|          10|          4.9|

In [46]:
# Seleciona colunas do DataFrame PySpark:
iris_spark.select("Id", "Species")
# Seleciona colunas com possibilidade de definir os alias de forma prática:
iris_spark.selectExpr("Id as ColunaDesnecessaria", "Species as Especies").show(3)

+-------------------+-----------+
|ColunaDesnecessaria|   Especies|
+-------------------+-----------+
|                  1|Iris-setosa|
|                  2|Iris-setosa|
|                  3|Iris-setosa|
+-------------------+-----------+
only showing top 3 rows



In [42]:
# Filtra uma coluna no DataFrame utilizando SQL:
iris_spark.filter("Species == 'Iris-setosa'").show(3)
# Outra forma de utilizar o filtro:
iris_spark.filter(iris_spark.Species == "Iris-setosa").show(3)

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 3 rows

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
+---+-------------+------------+-------

In [52]:
# É possível realizar GroupBy também dentro do PySpark:
iris_spark.groupBy("Species").count().show() # Retorna a quantidade de cada espécie, funções max, min, avg, sum e outras também são possíveis.

+---------------+-----+
|        Species|count|
+---------------+-----+
| Iris-virginica|   50|
|    Iris-setosa|   50|
|Iris-versicolor|   50|
+---------------+-----+



In [55]:
import pyspark.sql.functions as F # Módulo de funções do PySpark utilizadas para sumarizar dados/operações matemáticas
iris_spark.groupBy("Species").agg(F.max("SepalWidthCm"), F.min("SepalWidthCm")).show(3)

+---------------+-----------------+-----------------+
|        Species|max(SepalWidthCm)|min(SepalWidthCm)|
+---------------+-----------------+-----------------+
|    Iris-setosa|              4.4|              2.3|
|Iris-versicolor|              3.4|              2.0|
| Iris-virginica|              3.8|              2.2|
+---------------+-----------------+-----------------+

