<a href="https://colab.research.google.com/github/mthmadrid/mthmadrid/blob/main/aula_spark_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **PySpark**

Hands-on com Apache Spark e a utilização do cluster Spark para processamento e análise de grandes volumes de dados

In [124]:
 # Instalando o Spark
try:
    !pip install pyspark
    print("Pyspark instalado com sucesso!")
except Exception as e:
    print(f"Erro ao instalar Pyspark: {e}")

Pyspark instalado com sucesso!


In [125]:
# Importanto as classes do PySpark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [126]:
# Criando o método de inicialização (main)
try:
    spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Iniciando com Spark') \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

    print("Spark foi iniciado com sucesso!")
except Exception as e:
    print(f"Erro ao iniciar Spark: {e}")

Spark foi iniciado com sucesso!


In [127]:
# Criando o primeiro RDD - Resilient Distributed Dataset
rdd = spark.sparkContext.parallelize([1,2,3,4,5])

In [128]:
# Aplicando uma transformação de elevação ao quadrado no RDD
rdd_squared = rdd.map(lambda x: x**2)
rdd_squared.collect()

[1, 4, 9, 16, 25]

In [129]:
# Aplicando uma ação de soma dos elementos do RDD transformados
soma = rdd_squared.reduce(lambda a, b: a+b)
print("A soma dos elementos no RDD é:", soma)

A soma dos elementos no RDD é: 55


In [130]:
# Realizando uma multiplicação
rdd = spark.sparkContext.parallelize([10, 5])
mult = rdd.reduce(lambda x, y: x*y)
print("A multiplicação dos elementos no RDD é:", soma)

A multiplicação dos elementos no RDD é: 55


In [131]:
# Criando outro RDD
rdd = spark.sparkContext.parallelize([
  (1, "Alice"),
  (2, "Bob"),
  (3, "Charlie")
])

In [132]:
# Criando um data frame a partir do rdd anterior
df1 = spark.createDataFrame(rdd)
df1.show()

+---+-------+
| _1|     _2|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
+---+-------+



In [133]:
# Definindo o schema para o DF
schema = ["id", "nome"]

In [134]:
# Criando o DF a partir do RDD e do schema
df2 = spark.createDataFrame(rdd, schema=schema)
df2.show()

+---+-------+
| id|   nome|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
+---+-------+



In [135]:
# Comando para baixar um arquivo diretamente pelo S.O. do cluster!
!wget "https://raw.githubusercontent.com/alura-cursos/introducao-a-data-science/master/aula0/ml-latest-small/movies.csv"
csv_file_path = "/content/movies.csv"
print(csv_file_path)
csv_rdd = spark.sparkContext.textFile(csv_file_path)

--2024-09-17 15:57:07--  https://raw.githubusercontent.com/alura-cursos/introducao-a-data-science/master/aula0/ml-latest-small/movies.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 494431 (483K) [text/plain]
Saving to: ‘movies.csv.6’


2024-09-17 15:57:07 (33.9 MB/s) - ‘movies.csv.6’ saved [494431/494431]

/content/movies.csv


In [136]:
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load(csv_file_path)

In [137]:
df.show(10)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing top 10 rows



In [138]:
# Trasnformando em Pandas Data Frame
pandasDF = df.toPandas()
pandasDF

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy
...,...,...,...
9737,193581,Black Butler: Book of the Atlantic (2017),Action|Animation|Comedy|Fantasy
9738,193583,No Game No Life: Zero (2017),Animation|Comedy|Fantasy
9739,193585,Flint (2017),Drama
9740,193587,Bungo Stray Dogs: Dead Apple (2018),Action|Animation


In [139]:
# importar a biblioteca pandas
import pandas as pd

In [140]:
filmes = pandasDF
filmes.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [141]:
#Renomeando colunas
filmes.columns = ["filmeId" , "titulo" , "genero"]
filmes.head()

Unnamed: 0,filmeId,titulo,genero
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [142]:
# Encerre a sessão Spark
try:
  spark.stop()
  print("SparkSession encerrada com sucesso!")
except Exception as e:
  print(f"Erro ao encerrar SparkSession: {e}")

SparkSession encerrada com sucesso!
