# Introduction to spark 
<a href="https://spark.apache.org/docs/latest/quick-start.html">Spark Documentation</a>

In [246]:
import sys
import pandas as pd
from pyspark.sql import functions as sf
from pyspark.sql.functions import col, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType
from decimal import Decimal
from datetime import datetime
from pyspark.sql import functions as sf
from pyspark import SparkContext
# Create a SparkContext object
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Introduction to spark").getOrCreate()
# Versão do pyspark usada neste jupyter
print(f"Versão do pyspark usada neste jupyter: {spark.version}")
# Versão do python usada neste jupyter
print(f"Versão do python usada neste jupyter: {sys.version}")

Versão do pyspark usada neste jupyter: 3.5.4
Versão do python usada neste jupyter: 3.11.5 (main, Sep 11 2023, 13:54:46) [GCC 11.2.0]


In [225]:
# Dados criando um array de dados
dados = [
    ("mauro lucio", "pereira da silva",55, "mauro@email.com","Rua Leila Diniz, 798","Independencia","Petropolis","25645410","RJ","Brasil"),
    ("ethelyn helena", "c. pereira da silva", 19, "ethelyn@email.com","Estr. Luis Gomes da Silva, 714","Correas","Petropolis","25720020","RJ","Brasil"),
    ("evelin", "c. pereira da silva", 15, "evelin@email.com","Estr. Luis Gomes da Silva, 714","Correas","Petropolis","25720020","RJ","Brasil"),
    ("everton cristian",  "c. pereira da silva", 22, "everton@email.com","Estr. Luis Gomes da Silva, 714","Correas","Petropolis","25720020","RJ","Brasil"),
    ("andreia cristina",  "de moares", 29, "andreia@email.com","Estr. Luis Gomes da Silva, 714","Correas","Petropolis","25720020","RJ","Brasil"),
]

# Definição do Schema
schema = StructType([
    StructField("nome", StringType(), False),
    StructField("sobrenome", StringType(), False),
    StructField("idade",IntegerType(), False),
    StructField("e-mail", StringType(), False),
    StructField("Rua", StringType(), False),
    StructField("Bairro",StringType(), False),
    StructField("Cidade",StringType(), False),
    StructField("Cep",StringType(), False),
    StructField("Estado",StringType(), False),
    StructField("Pais",StringType(), False)
])

# Criando um DataFrame com o arry de dados
df = spark.createDataFrame(dados,schema)

# Mostrando o DataFrame
df.show()

+----------------+-------------------+-----+-----------------+--------------------+-------------+----------+--------+------+------+
|            nome|          sobrenome|idade|           e-mail|                 Rua|       Bairro|    Cidade|     Cep|Estado|  Pais|
+----------------+-------------------+-----+-----------------+--------------------+-------------+----------+--------+------+------+
|     mauro lucio|   pereira da silva|   55|  mauro@email.com|Rua Leila Diniz, 798|Independencia|Petropolis|25645410|    RJ|Brasil|
|  ethelyn helena|c. pereira da silva|   19|ethelyn@email.com|Estr. Luis Gomes ...|      Correas|Petropolis|25720020|    RJ|Brasil|
|          evelin|c. pereira da silva|   15| evelin@email.com|Estr. Luis Gomes ...|      Correas|Petropolis|25720020|    RJ|Brasil|
|everton cristian|c. pereira da silva|   22|everton@email.com|Estr. Luis Gomes ...|      Correas|Petropolis|25720020|    RJ|Brasil|
|andreia cristina|          de moares|   29|andreia@email.com|Estr. Luis Gom

In [226]:
# Filtrando colunas
filtered_df = df.filter(df['nome'] == "mauro lucio")
filtered_df.show()

+-----------+----------------+-----+---------------+--------------------+-------------+----------+--------+------+------+
|       nome|       sobrenome|idade|         e-mail|                 Rua|       Bairro|    Cidade|     Cep|Estado|  Pais|
+-----------+----------------+-----+---------------+--------------------+-------------+----------+--------+------+------+
|mauro lucio|pereira da silva|   55|mauro@email.com|Rua Leila Diniz, 798|Independencia|Petropolis|25645410|    RJ|Brasil|
+-----------+----------------+-----+---------------+--------------------+-------------+----------+--------+------+------+



In [227]:
# Agrupar e calcular a média agregada
grouped_df = df.groupBy("nome").agg({"idade":"avg"})
grouped_df.show()

+----------------+----------+
|            nome|avg(idade)|
+----------------+----------+
|     mauro lucio|      55.0|
|  ethelyn helena|      19.0|
|          evelin|      15.0|
|andreia cristina|      29.0|
|everton cristian|      22.0|
+----------------+----------+



In [228]:
# Número de registros do DataFrame
print(f'Número de registros do DataFame: {df.count()}')

Número de registros do DataFame: 5


In [229]:
# Primeira linha do DataFrame
df.first()

Row(nome='mauro lucio', sobrenome='pereira da silva', idade=55, e-mail='mauro@email.com', Rua='Rua Leila Diniz, 798', Bairro='Independencia', Cidade='Petropolis', Cep='25645410', Estado='RJ', Pais='Brasil')

In [230]:
# Filtrando onde tem uma palavra
linesWithSpark = df.filter(df["sobrenome"].contains("pereira"))
linesWithSpark.show()

+----------------+-------------------+-----+-----------------+--------------------+-------------+----------+--------+------+------+
|            nome|          sobrenome|idade|           e-mail|                 Rua|       Bairro|    Cidade|     Cep|Estado|  Pais|
+----------------+-------------------+-----+-----------------+--------------------+-------------+----------+--------+------+------+
|     mauro lucio|   pereira da silva|   55|  mauro@email.com|Rua Leila Diniz, 798|Independencia|Petropolis|25645410|    RJ|Brasil|
|  ethelyn helena|c. pereira da silva|   19|ethelyn@email.com|Estr. Luis Gomes ...|      Correas|Petropolis|25720020|    RJ|Brasil|
|          evelin|c. pereira da silva|   15| evelin@email.com|Estr. Luis Gomes ...|      Correas|Petropolis|25720020|    RJ|Brasil|
|everton cristian|c. pereira da silva|   22|everton@email.com|Estr. Luis Gomes ...|      Correas|Petropolis|25720020|    RJ|Brasil|
+----------------+-------------------+-----+-----------------+--------------

In [231]:
# Contando o núemro de ocorrências de uma palavra
countText= df.filter(df["sobrenome"].contains("pereira")).count()
print(f"O número de ocorrências encontradas foram: {countText}")

O número de ocorrências encontradas foram: 4


In [232]:
# Encontrar a linha com mais palavras
# df.select(sf.size(sf.split(df.value,"\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()

# Encontrar a linha com mais palavras na coluna 'sobrenome'
df.select(sf.size(sf.split(df["sobrenome"], "\s+")).alias("numWords")).agg(sf.max(sf.col("numWords"))).collect()

[Row(max(numWords)=4)]

# Products, Sales and customer

In [233]:
# DataFrame customer
customer_data = [(
    1,
    "ruan jose",
    datetime(1965, 10, 15, 00, 00, 00),
    "ruan_jose@gmail.com",
    datetime(2025, 3, 2, 17, 10, 00),
)]

In [234]:
schema_customer = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("customer_birthday", TimestampType(), False),
    StructField("email", StringType(), False),
    StructField("created_at", TimestampType(), False)
])

In [235]:
# Criando DataFrame com array de dados
df_customers = spark.createDataFrame(customer_data, schema_customer)
# Convertendo strings de data/hora para TimestampType
df_customer = df_customers.withColumn("created_at", sf.to_timestamp("created_at", "dd-MM-yyyy HH:mm:ss"))
df_customer = df_customers.withColumn("customer_birthday", sf.to_timestamp("customer_birthday", "dd-MM-yyyy HH:mm:ss"))
# Mostrando o DataFrame
df_customers.show(truncate=False)
df_customer.printSchema()

+-----------+---------+-------------------+-------------------+-------------------+
|customer_id|name     |customer_birthday  |email              |created_at         |
+-----------+---------+-------------------+-------------------+-------------------+
|1          |ruan jose|1965-10-15 00:00:00|ruan_jose@gmail.com|2025-03-02 17:10:00|
+-----------+---------+-------------------+-------------------+-------------------+

root
 |-- customer_id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- customer_birthday: timestamp (nullable = true)
 |-- email: string (nullable = false)
 |-- created_at: timestamp (nullable = false)



In [236]:
# Criando dados do DataFrame
products_data = [(1,"notebook sony vaio 15 fz","Notebook VAIO® FE15 Intel® Core™", Decimal("3799.00"), 38, datetime(2025, 3, 2, 21, 38, 0))]

In [237]:
# Criando DataFrame de itens
schema_products = StructType([
   StructField("product_id", IntegerType(), False),
   StructField("name", StringType(), False),
   StructField("description", StringType(),False),
   StructField("price", DecimalType(10,2), False),
   StructField("stock", IntegerType(), False),
   StructField("created_at", TimestampType(), False)  
])

In [238]:
# Criando um DataFrame com array de dados
df_products = spark.createDataFrame(products_data, schema_products)
# Convertendo strings de data/hora para TimestampType
df_product = df_products.withColumn("created_at", sf.to_timestamp("created_at", "dd-MM-yyyy HH:mm:ss") )
# Mostrando o DataFrame
df_products.show(truncate=False)
df_product.printSchema()

+----------+------------------------+--------------------------------+-------+-----+-------------------+
|product_id|name                    |description                     |price  |stock|created_at         |
+----------+------------------------+--------------------------------+-------+-----+-------------------+
|1         |notebook sony vaio 15 fz|Notebook VAIO® FE15 Intel® Core™|3799.00|38   |2025-03-02 21:38:00|
+----------+------------------------+--------------------------------+-------+-----+-------------------+

root
 |-- product_id: integer (nullable = false)
 |-- name: string (nullable = false)
 |-- description: string (nullable = false)
 |-- price: decimal(10,2) (nullable = false)
 |-- stock: integer (nullable = false)
 |-- created_at: timestamp (nullable = true)



In [None]:
# Schema orders
schema_orders = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("order_date", TimestampType(), False),
    StructField("total_price",DecimalType(10,2), True),
    
])

In [240]:
# Schema
schema_sales = StructType([
    StructField("sale_id", IntegerType(), False),
    StructField("order_id", IntegerType(), False),
    StructField("item_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("unit_price", DecimalType(10,2), False),
    StructField("date_sale", TimestampType(), False),
    StructField("status", StringType(), False)
])

In [239]:
# DataFrame sales data
sales_data =[(1, 1,1,1,10, Decimal("3799.00"), datetime(2025, 2, 27, 10, 25, 0),"delivered")]

In [241]:
# Criando um DataFrame com o array de dados
df_sales = spark.createDataFrame(sales_data,schema_sales)
# Convertendo strings de data/hora para TimestampType
df_sale = df_sales.withColumn("date_sale", sf.to_timestamp("date_sale", "dd-MM-yyyy HH:mm:ss") )
# Mostrando o DataFrame
df_sales.show(truncate=False)
df_sale.printSchema()

+-------+--------+-------+-----------+--------+----------+-------------------+---------+
|sale_id|order_id|item_id|customer_id|quantity|unit_price|date_sale          |status   |
+-------+--------+-------+-----------+--------+----------+-------------------+---------+
|1      |1       |1      |1          |10      |3799.00   |2025-02-27 10:25:00|delivered|
+-------+--------+-------+-----------+--------+----------+-------------------+---------+

root
 |-- sale_id: integer (nullable = false)
 |-- order_id: integer (nullable = false)
 |-- item_id: integer (nullable = false)
 |-- customer_id: integer (nullable = false)
 |-- quantity: integer (nullable = false)
 |-- unit_price: decimal(10,2) (nullable = false)
 |-- date_sale: timestamp (nullable = true)
 |-- status: string (nullable = false)



In [248]:
# Schema de orders
schema_orders = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("order_date", TimestampType(), False),
    StructField("total_price", DecimalType(10,2), False),
    StructField("status", StringType(), False)
])

In [251]:
# DataFrame de orders
orders_data =[(1, 1, datetime(2025, 3,2, 22, 36, 00),Decimal("3799.00"),"delivered")]
# Convertendo strings de data/hora para TimestampType
df_order = df_sales.withColumn("date_sale", sf.to_timestamp("date_sale", "dd-MM-yyyy HH:mm:ss") )
# Mostrando o DataFrame
df_sales.show(truncate=False)
df_sale.printSchema()

+-------+--------+-------+-----------+--------+----------+-------------------+---------+
|sale_id|order_id|item_id|customer_id|quantity|unit_price|date_sale          |status   |
+-------+--------+-------+-----------+--------+----------+-------------------+---------+
|1      |1       |1      |1          |10      |3799.00   |2025-02-27 10:25:00|delivered|
+-------+--------+-------+-----------+--------+----------+-------------------+---------+

root
 |-- sale_id: integer (nullable = false)
 |-- order_id: integer (nullable = false)
 |-- item_id: integer (nullable = false)
 |-- customer_id: integer (nullable = false)
 |-- quantity: integer (nullable = false)
 |-- unit_price: decimal(10,2) (nullable = false)
 |-- date_sale: timestamp (nullable = true)
 |-- status: string (nullable = false)



In [252]:
# Criando um DataFrame com o array de dados
df_orders = spark.createDataFrame(orders_data,schema_orders)
# Convertendo strings de data/hora para TimestampType
df_order = df_orders.withColumn("order_date", sf.to_timestamp("order_date", "dd-MM-yyyy HH:mm:ss") )
# Mostrando o DataFrame
df_orders.show(truncate=False)
df_order.printSchema()

+--------+-----------+-------------------+-----------+---------+
|order_id|customer_id|order_date         |total_price|status   |
+--------+-----------+-------------------+-----------+---------+
|1       |1          |2025-03-02 22:36:00|3799.00    |delivered|
+--------+-----------+-------------------+-----------+---------+

root
 |-- order_id: integer (nullable = false)
 |-- customer_id: integer (nullable = false)
 |-- order_date: timestamp (nullable = true)
 |-- total_price: decimal(10,2) (nullable = false)
 |-- status: string (nullable = false)

