<a href="https://colab.research.google.com/github/tathycosta/PySparkSQL/blob/main/Atividade_PySparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Importação da biblioteca pandas
import pandas as pd

In [None]:
# Instalação dos requisitos para o PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null #Instala o OpenJDK 8 (necessário para o Spark rodar no ambiente). A opção -qq reduz a saída do comando, e > /dev/null silencia os logs.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz #Baixa a versão 3.1.1 do Apache Spark com suporte ao Hadoop 3.2 do repositório oficial.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz #Descompacta o arquivo .tgz para acesso aos binários do Spark.
!pip install -q findspark #findspark é uma biblioteca que facilita a inicialização do Spark em ambientes como o Google Colab.

In [None]:
# Importa o módulo 'os' para interagir com variáveis de ambiente do sistema operacional
import os

# Define a variável de ambiente JAVA_HOME, indicando o caminho para o Java 8
# O Spark precisa do Java para ser executado, e aqui especificamos onde ele está instalado
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Define a variável de ambiente SPARK_HOME, indicando o caminho de instalação do Spark
# Isso é necessário para que o Python saiba onde encontrar os binários e bibliotecas do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

# Importa a biblioteca 'findspark', que ajuda a localizar e configurar o Spark no ambiente Python
import findspark

# Inicializa o findspark, permitindo que possamos importar e utilizar o PySpark no código
findspark.init()


In [None]:
# Importa a classe SparkSession do módulo pyspark.sql
# SparkSession é o ponto de entrada principal para trabalhar com DataFrames no PySpark
from pyspark.sql import SparkSession

# Inicializar a SparkSession com suporte ao Hive
spark = SparkSession.builder \
    .appName("Spark with Hive on Colab") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.sql.warehouse.dir", "/content/spark-warehouse") \
    .config("hive.metastore.warehouse.dir", "/content/spark-warehouse") \
    .enableHiveSupport() \
    .getOrCreate()
# Define o nome da aplicação Spark (aparece nos logs)
# Configura o Spark para usar o catálogo do Hive
# Define o diretório do warehouse para o Hive
# Criar diretório para o warehouse
# Habilita o suporte ao Hive, permitindo consultas SQL compatíveis com Hive
# Cria a SparkSession ou reutiliza uma existente
# Cria o diretório para armazenar os metadados e tabelas gerenciadas pelo Hive
!mkdir -p /content/spark-warehouse



In [None]:
# Verifica o SparkContext
print(spark)

# Exibe a Spark version
print(spark.version)

<pyspark.sql.session.SparkSession object at 0x7f7600a20750>
3.1.1


In [None]:
aluguel = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vRBm8orBNbKLqHPtSSrZCLJrduyM_lI-4ZfVkmRqkqK7PvqnzkvKV0mJbRCiHH6IYVpcMXeefCqQsW2/pub?gid=0&single=true&output=csv')
cliente = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vRBm8orBNbKLqHPtSSrZCLJrduyM_lI-4ZfVkmRqkqK7PvqnzkvKV0mJbRCiHH6IYVpcMXeefCqQsW2/pub?gid=1573858125&single=true&output=csv')
carro = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vRBm8orBNbKLqHPtSSrZCLJrduyM_lI-4ZfVkmRqkqK7PvqnzkvKV0mJbRCiHH6IYVpcMXeefCqQsW2/pub?gid=2131324504&single=true&output=csv')
marca = pd.read_csv('https://docs.google.com/spreadsheets/d/e/2PACX-1vRBm8orBNbKLqHPtSSrZCLJrduyM_lI-4ZfVkmRqkqK7PvqnzkvKV0mJbRCiHH6IYVpcMXeefCqQsW2/pub?gid=1957306968&single=true&output=csv')


In [None]:
cliente.to_csv('cliente.csv',index=False)
aluguel.to_csv('aluguel.csv',index=False)
carro.to_csv('carro.csv',index=False)
marca.to_csv('marca.csv',index=False)

In [None]:
spark.sql('''
CREATE TABLE IF NOT EXISTS cliente (
  codcliente INT,
  nome STRING,
  cidade STRING,
  sexo STRING,
  estado STRING,
  estadocivil STRING
)

USING CSV
OPTIONS (path '/content/cliente.csv', header 'true', inferSchema 'true')

''')



DataFrame[]

In [None]:
spark.sql('''
SELECT * FROM cliente
''').show()

+----------+----------------+---------------+----+------+-----------+
|codcliente|            nome|         cidade|sexo|estado|estadocivil|
+----------+----------------+---------------+----+------+-----------+
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|          C|
|         2|   Bruna Pereira|        Niterói|   F|    RJ|          C|
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|          S|
|         4|  Fernando Souza|       Campinas|   M|    SP|          S|
|         5|   Lúcia Andrade|      São Paulo|   F|    SP|          C|
+----------+----------------+---------------+----+------+-----------+



In [None]:
spark.sql('''
CREATE TABLE IF NOT EXISTS aluguel (
  codaluguel INT,
  codcliente INT,
  codcarro INT,
  data_aluguel DATE
)

USING CSV
OPTIONS (path '/content/aluguel.csv', header 'true', inferSchema 'true')

''')

DataFrame[]

In [None]:
spark.sql('''
SELECT * FROM aluguel
''').show()

+----------+----------+--------+------------+
|codaluguel|codcliente|codcarro|data_aluguel|
+----------+----------+--------+------------+
|         1|         3|       2|  2023-04-01|
|         2|         2|       1|  2023-04-02|
|         3|         2|       1|  2023-04-03|
|         4|         2|       3|  2023-04-04|
|         5|         1|       4|  2023-04-05|
|         6|         1|       4|  2023-04-13|
|         7|         1|       1|  2023-04-15|
|         8|         5|       2|  2023-04-19|
|         9|         5|       2|  2023-04-21|
|        10|         3|       1|  2023-04-25|
+----------+----------+--------+------------+



In [None]:
spark.sql('''
CREATE TABLE IF NOT EXISTS carro(
    codcarro INT,
    codmarca INT,
    modelo STRING,
    valor DOUBLE
)
USING CSV
OPTIONS (path '/content/carro.csv', header 'true', inferSchema 'true')
''')

DataFrame[]

In [None]:
spark.sql('''
SELECT * FROM carro
''').show()

+--------+--------+------+-----+
|codcarro|codmarca|modelo|valor|
+--------+--------+------+-----+
|       1|       1|    Ka|100.0|
|       2|       2|  Argo|150.0|
|       3|       3|  Onix|170.0|
|       4|       4|  Polo|150.0|
|       5|       5|  Kwid|120.0|
+--------+--------+------+-----+



In [None]:
spark.sql('''CREATE TABLE IF NOT EXISTS marca (
  codmarca INT,
  marca STRING
)
USING CSV
OPTIONS (path '/content/marca.csv', header 'true', inferSchema 'true')
''')

DataFrame[]

In [None]:
spark.sql('''
SELECT * FROM marca
''').show()

+--------+----------+
|codmarca|     marca|
+--------+----------+
|       1|      Ford|
|       2|      Fiat|
|       3| Chevrolet|
|       4|Volkswagen|
|       5|   Renault|
+--------+----------+



01 - Carregar as quatro tabelas do banco locadora no PySpark como DataFrames

In [None]:
spark.sql('''
CREATE TABLE IF NOT EXISTS cliente
USING csv
OPTIONS (
    path "cliente.csv",
    header "true",
    inferSchema "true"
)
''')

spark.sql('''
CREATE TABLE IF NOT EXISTS carro
USING csv
OPTIONS (
    path "carro.csv",
    header "true",
    inferSchema "true"
)
''')

spark.sql('''
CREATE TABLE IF NOT EXISTS marca
USING csv
OPTIONS (
    path "marca.csv",
    header "true",
    inferSchema "true"
)
''')

spark.sql('''
CREATE TABLE IF NOT EXISTS aluguel
USING csv
OPTIONS (
    path "aluguel.csv",
    header "true",
    inferSchema "true"
)
''')

print("Clientes:")
spark.sql("SELECT * FROM cliente").show()

print("Carros:")
spark.sql("SELECT * FROM carro").show()

print("Marcas:")
spark.sql("SELECT * FROM marca").show()

print("Aluguéis:")
spark.sql("SELECT * FROM aluguel").show()


Clientes:
+----------+----------------+---------------+----+------+-----------+
|codcliente|            nome|         cidade|sexo|estado|estadocivil|
+----------+----------------+---------------+----+------+-----------+
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|          C|
|         2|   Bruna Pereira|        Niterói|   F|    RJ|          C|
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|          S|
|         4|  Fernando Souza|       Campinas|   M|    SP|          S|
|         5|   Lúcia Andrade|      São Paulo|   F|    SP|          C|
+----------+----------------+---------------+----+------+-----------+

Carros:
+--------+--------+------+-----+
|codcarro|codmarca|modelo|valor|
+--------+--------+------+-----+
|       1|       1|    Ka|100.0|
|       2|       2|  Argo|150.0|
|       3|       3|  Onix|170.0|
|       4|       4|  Polo|150.0|
|       5|       5|  Kwid|120.0|
+--------+--------+------+-----+

Marcas:
+--------+----------+
|codmarca|     marca|
+

02 - Exibir as cinco primeiras linhas de cada DataFrame


In [None]:
print("Clientes:")
spark.sql("SELECT * FROM cliente").show(5)

print("Carros:")
spark.sql("SELECT * FROM carro").show(5)

print("Marcas:")
spark.sql("SELECT * FROM marca").show(5)

print("Aluguéis:")
spark.sql("SELECT * FROM aluguel").show(5)


Clientes:
+----------+----------------+---------------+----+------+-----------+
|codcliente|            nome|         cidade|sexo|estado|estadocivil|
+----------+----------------+---------------+----+------+-----------+
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|          C|
|         2|   Bruna Pereira|        Niterói|   F|    RJ|          C|
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|          S|
|         4|  Fernando Souza|       Campinas|   M|    SP|          S|
|         5|   Lúcia Andrade|      São Paulo|   F|    SP|          C|
+----------+----------------+---------------+----+------+-----------+

Carros:
+--------+--------+------+-----+
|codcarro|codmarca|modelo|valor|
+--------+--------+------+-----+
|       1|       1|    Ka|100.0|
|       2|       2|  Argo|150.0|
|       3|       3|  Onix|170.0|
|       4|       4|  Polo|150.0|
|       5|       5|  Kwid|120.0|
+--------+--------+------+-----+

Marcas:
+--------+----------+
|codmarca|     marca|
+

03 - Contar o número de linhas e colunas de cada tabela


In [None]:
def contar_tabela(nome_tabela):
    num_linhas = spark.sql(f"SELECT COUNT(*) FROM {nome_tabela}").collect()[0][0]
    num_colunas = len(spark.sql(f"DESCRIBE {nome_tabela}").collect())
    print(f"Tabela '{nome_tabela}': {num_linhas} linhas, {num_colunas} colunas")

contar_tabela("cliente")
contar_tabela("carro")
contar_tabela("marca")
contar_tabela("aluguel")


Tabela 'cliente': 5 linhas, 6 colunas
Tabela 'carro': 5 linhas, 4 colunas
Tabela 'marca': 5 linhas, 2 colunas
Tabela 'aluguel': 10 linhas, 4 colunas


04 - Exibir o esquema (schema) de cada DataFrame


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ExibirEsquema").getOrCreate()


df_clientes = spark.read.option("header", "true").csv("cliente.csv")
df_carros = spark.read.option("header", "true").csv("carro.csv")
df_marcas = spark.read.option("header", "true").csv("marca.csv")
df_alugueis = spark.read.option("header", "true").csv("aluguel.csv")


print("Esquema da tabela Clientes:")
df_clientes.printSchema()

print("Esquema da tabela Carros:")
df_carros.printSchema()

print("Esquema da tabela Marcas:")
df_marcas.printSchema()

print("Esquema da tabela Aluguéis:")
df_alugueis.printSchema()



Esquema da tabela Clientes:
root
 |-- codcliente: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- estadocivil: string (nullable = true)

Esquema da tabela Carros:
root
 |-- codcarro: string (nullable = true)
 |-- codmarca: string (nullable = true)
 |-- modelo: string (nullable = true)
 |-- valor: string (nullable = true)

Esquema da tabela Marcas:
root
 |-- codmarca: string (nullable = true)
 |-- marca: string (nullable = true)

Esquema da tabela Aluguéis:
root
 |-- codaluguel: string (nullable = true)
 |-- codcliente: string (nullable = true)
 |-- codcarro: string (nullable = true)
 |-- data_aluguel: string (nullable = true)



05 - Renomear as colunas dos DataFrames para ter nomes mais amigáveis


In [None]:
from pyspark.sql.functions import col

df_clientes = df_clientes \
    .withColumnRenamed("codcliente", "ID_Cliente") \
    .withColumnRenamed("nome", "Nome") \
    .withColumnRenamed("cidade", "Cidade") \
    .withColumnRenamed("sexo", "Sexo") \
    .withColumnRenamed("estado", "Estado") \
    .withColumnRenamed("estadocivil", "Estado_Civil")

df_carros = df_carros \
    .withColumnRenamed("codcarro", "ID_Carro") \
    .withColumnRenamed("codmarca", "ID_Marca") \
    .withColumnRenamed("modelo", "Modelo") \
    .withColumnRenamed("valor", "Valor_Diaria")

df_marcas = df_marcas \
    .withColumnRenamed("codmarca", "ID_Marca") \
    .withColumnRenamed("marca", "Nome_Marca")

df_alugueis = df_alugueis \
    .withColumnRenamed("codaluguel", "ID_Aluguel") \
    .withColumnRenamed("codcliente", "ID_Cliente") \
    .withColumnRenamed("codcarro", "ID_Carro") \
    .withColumnRenamed("data_aluguel", "Data_Aluguel")

print("Esquema atualizado da tabela Clientes:")
df_clientes.printSchema()

print("\n Esquema atualizado da tabela Carros:")
df_carros.printSchema()

print("\n Esquema atualizado da tabela Marcas:")
df_marcas.printSchema()

print("\n Esquema atualizado da tabela Aluguéis:")
df_alugueis.printSchema()


Esquema atualizado da tabela Clientes:
root
 |-- ID_Cliente: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Cidade: string (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Estado_Civil: string (nullable = true)


 Esquema atualizado da tabela Carros:
root
 |-- ID_Carro: string (nullable = true)
 |-- ID_Marca: string (nullable = true)
 |-- Modelo: string (nullable = true)
 |-- Valor_Diaria: string (nullable = true)


 Esquema atualizado da tabela Marcas:
root
 |-- ID_Marca: string (nullable = true)
 |-- Nome_Marca: string (nullable = true)


 Esquema atualizado da tabela Aluguéis:
root
 |-- ID_Aluguel: string (nullable = true)
 |-- ID_Cliente: string (nullable = true)
 |-- ID_Carro: string (nullable = true)
 |-- Data_Aluguel: string (nullable = true)



06 - Selecionar apenas os aluguéis realizados após uma data específica


In [None]:
data_corte = '2023-01-01'

df_alugueis_filtrados = spark.sql(f"""
    SELECT *
    FROM aluguel
    WHERE CAST(Data_Aluguel AS DATE) > '{data_corte}'
""")

df_alugueis_filtrados.show()


+----------+----------+--------+------------+
|codaluguel|codcliente|codcarro|data_aluguel|
+----------+----------+--------+------------+
|         1|         3|       2|  2023-04-01|
|         2|         2|       1|  2023-04-02|
|         3|         2|       1|  2023-04-03|
|         4|         2|       3|  2023-04-04|
|         5|         1|       4|  2023-04-05|
|         6|         1|       4|  2023-04-13|
|         7|         1|       1|  2023-04-15|
|         8|         5|       2|  2023-04-19|
|         9|         5|       2|  2023-04-21|
|        10|         3|       1|  2023-04-25|
+----------+----------+--------+------------+



07 - Encontrar clientes que residem no estado de "RJ"


In [None]:
df_clientes_rj = spark.sql("""
    SELECT *
    FROM cliente
    WHERE Estado = 'RJ'
""")

df_clientes_rj.show()


+----------+----------------+---------------+----+------+-----------+
|codcliente|            nome|         cidade|sexo|estado|estadocivil|
+----------+----------------+---------------+----+------+-----------+
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|          C|
|         2|   Bruna Pereira|        Niterói|   F|    RJ|          C|
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|          S|
+----------+----------------+---------------+----+------+-----------+



08 - Filtrar carros com valor de aluguel maior que 150

In [None]:
df_carros_filtrados = spark.sql("""
    SELECT *
    FROM carro
    WHERE valor > 150
""")

df_carros_filtrados.show()


+--------+--------+------+-----+
|codcarro|codmarca|modelo|valor|
+--------+--------+------+-----+
|       3|       3|  Onix|170.0|
+--------+--------+------+-----+



09 - Selecionar aluguéis onde o cliente é do sexo feminino


In [None]:
df_alugueis_mulheres = spark.sql("""
    SELECT a.*
    FROM aluguel a
    INNER JOIN cliente c ON a.codcliente = c.codcliente
    WHERE c.sexo = 'F'
""")

df_alugueis_mulheres.show()


+----------+----------+--------+------------+
|codaluguel|codcliente|codcarro|data_aluguel|
+----------+----------+--------+------------+
|         2|         2|       1|  2023-04-02|
|         3|         2|       1|  2023-04-03|
|         4|         2|       3|  2023-04-04|
|         5|         1|       4|  2023-04-05|
|         6|         1|       4|  2023-04-13|
|         7|         1|       1|  2023-04-15|
|         8|         5|       2|  2023-04-19|
|         9|         5|       2|  2023-04-21|
+----------+----------+--------+------------+



10 - Identificar clientes solteiros


In [None]:
df_clientes_solteiros = spark.sql("""
    SELECT * FROM cliente
    WHERE estadocivil = 'S'
""")

df_clientes_solteiros.show()


+----------+----------------+---------------+----+------+-----------+
|codcliente|            nome|         cidade|sexo|estado|estadocivil|
+----------+----------------+---------------+----+------+-----------+
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|          S|
|         4|  Fernando Souza|       Campinas|   M|    SP|          S|
+----------+----------------+---------------+----+------+-----------+



11 - Realizar um join entre "Aluguel" e "Cliente" para adicionar informações do cliente ao DataFrame de aluguéis


In [None]:
df_aluguel_cliente = spark.sql("""
    SELECT a.*, c.nome, c.cidade, c.estado, c.sexo, c.estadocivil
    FROM aluguel a
    INNER JOIN cliente c ON a.codcliente = c.codcliente
""")

df_aluguel_cliente.show()


+----------+----------+--------+------------+----------------+---------------+------+----+-----------+
|codaluguel|codcliente|codcarro|data_aluguel|            nome|         cidade|estado|sexo|estadocivil|
+----------+----------+--------+------------+----------------+---------------+------+----+-----------+
|         7|         1|       1|  2023-04-15|       Ana Silva|Duque de Caxias|    RJ|   F|          C|
|         6|         1|       4|  2023-04-13|       Ana Silva|Duque de Caxias|    RJ|   F|          C|
|         5|         1|       4|  2023-04-05|       Ana Silva|Duque de Caxias|    RJ|   F|          C|
|         4|         2|       3|  2023-04-04|   Bruna Pereira|        Niterói|    RJ|   F|          C|
|         3|         2|       1|  2023-04-03|   Bruna Pereira|        Niterói|    RJ|   F|          C|
|         2|         2|       1|  2023-04-02|   Bruna Pereira|        Niterói|    RJ|   F|          C|
|        10|         3|       1|  2023-04-25|Túlio Nascimento|Duque de Ca

12 - Juntar "Carro" e "Marca" para incluir o nome da marca no DataFrame de carros


In [None]:
df_carro_marca = spark.sql("""
    SELECT c.*, m.marca
    FROM carro c
    INNER JOIN marca m ON c.codmarca = m.codmarca
""")

df_carro_marca.show()


+--------+--------+------+-----+----------+
|codcarro|codmarca|modelo|valor|     marca|
+--------+--------+------+-----+----------+
|       1|       1|    Ka|100.0|      Ford|
|       2|       2|  Argo|150.0|      Fiat|
|       3|       3|  Onix|170.0| Chevrolet|
|       4|       4|  Polo|150.0|Volkswagen|
|       5|       5|  Kwid|120.0|   Renault|
+--------+--------+------+-----+----------+







13 - Criar um DataFrame combinando "Aluguel", "Carro" e "Cliente"


In [None]:
df_aluguel_carro_cliente = spark.sql("""
    SELECT a.*, c.nome, c.cidade, c.estado, c.sexo, c.estadocivil,
           car.modelo, car.valor
    FROM aluguel a
    INNER JOIN cliente c ON a.codcliente = c.codcliente
    INNER JOIN carro car ON a.codcarro = car.codcarro
""")

df_aluguel_carro_cliente.show()


+----------+----------+--------+------------+----------------+---------------+------+----+-----------+------+-----+
|codaluguel|codcliente|codcarro|data_aluguel|            nome|         cidade|estado|sexo|estadocivil|modelo|valor|
+----------+----------+--------+------------+----------------+---------------+------+----+-----------+------+-----+
|         7|         1|       1|  2023-04-15|       Ana Silva|Duque de Caxias|    RJ|   F|          C|    Ka|100.0|
|         6|         1|       4|  2023-04-13|       Ana Silva|Duque de Caxias|    RJ|   F|          C|  Polo|150.0|
|         5|         1|       4|  2023-04-05|       Ana Silva|Duque de Caxias|    RJ|   F|          C|  Polo|150.0|
|         4|         2|       3|  2023-04-04|   Bruna Pereira|        Niterói|    RJ|   F|          C|  Onix|170.0|
|         3|         2|       1|  2023-04-03|   Bruna Pereira|        Niterói|    RJ|   F|          C|    Ka|100.0|
|         2|         2|       1|  2023-04-02|   Bruna Pereira|        Ni

14 - Realizar um join entre "Cliente" e "Carro" com uma condição específica

In [None]:
df_cliente_carro_filtrado = spark.sql("""
    SELECT c.*, car.modelo, car.valor
    FROM cliente c
    INNER JOIN aluguel a ON c.codcliente = a.codcliente
    INNER JOIN carro car ON a.codcarro = car.codcarro
    WHERE car.valor > 100
""")

df_cliente_carro_filtrado.show()


+----------+----------------+---------------+----+------+-----------+------+-----+
|codcliente|            nome|         cidade|sexo|estado|estadocivil|modelo|valor|
+----------+----------------+---------------+----+------+-----------+------+-----+
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|          C|  Polo|150.0|
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|          C|  Polo|150.0|
|         2|   Bruna Pereira|        Niterói|   F|    RJ|          C|  Onix|170.0|
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|          S|  Argo|150.0|
|         5|   Lúcia Andrade|      São Paulo|   F|    SP|          C|  Argo|150.0|
|         5|   Lúcia Andrade|      São Paulo|   F|    SP|          C|  Argo|150.0|
+----------+----------------+---------------+----+------+-----------+------+-----+



15 - Encontrar o valor médio dos carros alugados


In [None]:
df_valor_medio_carros = spark.sql("""
    SELECT AVG(car.valor) AS valor_medio
    FROM aluguel a
    INNER JOIN carro car ON a.codcarro = car.codcarro
""")

df_valor_medio_carros.show()


+-----------+
|valor_medio|
+-----------+
|      132.0|
+-----------+



16 - Calcular o número total de clientes por estado


In [None]:
df_clientes_por_estado = spark.sql("""
    SELECT estado, COUNT(*) AS total_clientes
    FROM cliente
    GROUP BY estado
""")

df_clientes_por_estado.show()


+------+--------------+
|estado|total_clientes|
+------+--------------+
|    SP|             2|
|    RJ|             3|
+------+--------------+



17 - Identificar a marca mais popular com base nos aluguéis

In [None]:
df_marca_popular = spark.sql("""
    SELECT m.marca, COUNT(*) AS total_alugueis
    FROM aluguel a
    INNER JOIN carro c ON a.codcarro = c.codcarro
    INNER JOIN marca m ON c.codmarca = m.codmarca
    GROUP BY m.marca
    ORDER BY total_alugueis DESC
    LIMIT 1
""")

df_marca_popular.show()


+-----+--------------+
|marca|total_alugueis|
+-----+--------------+
| Ford|             4|
+-----+--------------+



18 - Determinar o maior e menor valor de aluguel entre os carros


In [None]:
df_valores_aluguel = spark.sql("""
    SELECT
        MAX(valor) AS valor_maximo,
        MIN(valor) AS valor_minimo
    FROM carro
""")

df_valores_aluguel.show()


+------------+------------+
|valor_maximo|valor_minimo|
+------------+------------+
|       170.0|       100.0|
+------------+------------+



19 - Classificar os carros pelo valor do aluguel em ordem decrescente


In [None]:
df_carros_ordenados = spark.sql("""
    SELECT *
    FROM carro
    ORDER BY valor DESC
""")

df_carros_ordenados.show()


+--------+--------+------+-----+
|codcarro|codmarca|modelo|valor|
+--------+--------+------+-----+
|       3|       3|  Onix|170.0|
|       4|       4|  Polo|150.0|
|       2|       2|  Argo|150.0|
|       5|       5|  Kwid|120.0|
|       1|       1|    Ka|100.0|
+--------+--------+------+-----+



20 - Calcular a diferença em dias entre o aluguel mais recente e o mais antigo


In [None]:
df_diferenca = spark.sql("""
    SELECT
        MAX(data_aluguel) AS data_recente,
        MIN(data_aluguel) AS data_antiga,
        DATEDIFF(MAX(data_aluguel), MIN(data_aluguel)) AS diferenca_dias
    FROM aluguel
""")

df_diferenca.show()


+------------+-----------+--------------+
|data_recente|data_antiga|diferenca_dias|
+------------+-----------+--------------+
|  2023-04-25| 2023-04-01|            24|
+------------+-----------+--------------+



21 - Criar uma coluna no DataFrame "Carro" para categorizar os valores de aluguel


In [None]:
df_carros_categorizados = spark.sql("""
    SELECT *,
        CASE
            WHEN valor <= 100 THEN 'Econômico'
            WHEN valor > 100 AND valor <= 200 THEN 'Intermediário'
            ELSE 'Luxo'
        END AS categoria_preco
    FROM carro
""")

df_carros_categorizados.show()


+--------+--------+------+-----+---------------+
|codcarro|codmarca|modelo|valor|categoria_preco|
+--------+--------+------+-----+---------------+
|       1|       1|    Ka|100.0|      Econômico|
|       2|       2|  Argo|150.0|  Intermediário|
|       3|       3|  Onix|170.0|  Intermediário|
|       4|       4|  Polo|150.0|  Intermediário|
|       5|       5|  Kwid|120.0|  Intermediário|
+--------+--------+------+-----+---------------+



22 - Criar uma coluna no DataFrame "Cliente" para indicar se a cidade é a capital do estado


In [None]:
df_clientes_capital = spark.sql("""
    SELECT *,
        CASE
            WHEN (Estado = 'AC' AND cidade = 'Rio Branco') OR
                 (Estado = 'AL' AND cidade = 'Maceió') OR
                 (Estado = 'AP' AND cidade = 'Macapá') OR
                 (Estado = 'AM' AND cidade = 'Manaus') OR
                 (Estado = 'BA' AND cidade = 'Salvador') OR
                 (Estado = 'CE' AND cidade = 'Fortaleza') OR
                 (Estado = 'DF' AND cidade = 'Brasília') OR
                 (Estado = 'ES' AND cidade = 'Vitória') OR
                 (Estado = 'GO' AND cidade = 'Goiânia') OR
                 (Estado = 'MA' AND cidade = 'São Luís') OR
                 (Estado = 'MT' AND cidade = 'Cuiabá') OR
                 (Estado = 'MS' AND cidade = 'Campo Grande') OR
                 (Estado = 'MG' AND cidade = 'Belo Horizonte') OR
                 (Estado = 'PA' AND cidade = 'Belém') OR
                 (Estado = 'PB' AND cidade = 'João Pessoa') OR
                 (Estado = 'PR' AND cidade = 'Curitiba') OR
                 (Estado = 'PE' AND cidade = 'Recife') OR
                 (Estado = 'PI' AND cidade = 'Teresina') OR
                 (Estado = 'RJ' AND cidade = 'Rio de Janeiro') OR
                 (Estado = 'RN' AND cidade = 'Natal') OR
                 (Estado = 'RS' AND cidade = 'Porto Alegre') OR
                 (Estado = 'RO' AND cidade = 'Porto Velho') OR
                 (Estado = 'RR' AND cidade = 'Boa Vista') OR
                 (Estado = 'SC' AND cidade = 'Florianópolis') OR
                 (Estado = 'SP' AND cidade = 'São Paulo') OR
                 (Estado = 'SE' AND cidade = 'Aracaju') OR
                 (Estado = 'TO' AND cidade = 'Palmas')
            THEN 'Sim'
            ELSE 'Não'
        END AS e_capital
    FROM cliente
""")

df_clientes_capital.show()


+----------+----------------+---------------+----+------+-----------+---------+
|codcliente|            nome|         cidade|sexo|estado|estadocivil|e_capital|
+----------+----------------+---------------+----+------+-----------+---------+
|         1|       Ana Silva|Duque de Caxias|   F|    RJ|          C|      Não|
|         2|   Bruna Pereira|        Niterói|   F|    RJ|          C|      Não|
|         3|Túlio Nascimento|Duque de Caxias|   M|    RJ|          S|      Não|
|         4|  Fernando Souza|       Campinas|   M|    SP|          S|      Não|
|         5|   Lúcia Andrade|      São Paulo|   F|    SP|          C|      Sim|
+----------+----------------+---------------+----+------+-----------+---------+



23 - Adicionar uma coluna em "Aluguel" com o valor total do aluguel, considerando uma taxa fixa de 10%

In [None]:
df_alugueis.createOrReplaceTempView("aluguel")
df_carros.createOrReplaceTempView("carro")


df_alugueis_com_valor_total = spark.sql("""
    SELECT a.*,
           c.Valor_Diaria * 1.10 AS valor_total
    FROM aluguel a
    INNER JOIN carro c ON a.ID_Carro = c.ID_Carro
""")

df_alugueis_com_valor_total.show()


+----------+----------+--------+------------+------------------+
|ID_Aluguel|ID_Cliente|ID_Carro|Data_Aluguel|       valor_total|
+----------+----------+--------+------------+------------------+
|         1|         3|       2|  2023-04-01|             165.0|
|         2|         2|       1|  2023-04-02|110.00000000000001|
|         3|         2|       1|  2023-04-03|110.00000000000001|
|         4|         2|       3|  2023-04-04|187.00000000000003|
|         5|         1|       4|  2023-04-05|             165.0|
|         6|         1|       4|  2023-04-13|             165.0|
|         7|         1|       1|  2023-04-15|110.00000000000001|
|         8|         5|       2|  2023-04-19|             165.0|
|         9|         5|       2|  2023-04-21|             165.0|
|        10|         3|       1|  2023-04-25|110.00000000000001|
+----------+----------+--------+------------+------------------+



24 - Agrupar os aluguéis por cliente e contar o número de carros alugados

In [None]:
df_alugueis_por_cliente = spark.sql("""
    SELECT codcliente, COUNT(codcarro) AS num_carros_alugados
    FROM aluguel
    GROUP BY codcliente
""")

df_alugueis_por_cliente.show()


+----------+-------------------+
|codcliente|num_carros_alugados|
+----------+-------------------+
|         1|                  3|
|         3|                  2|
|         5|                  2|
|         2|                  3|
+----------+-------------------+



25 - Criar um script PySpark para agendar a execução automática das transformações


In [None]:
!pip install apache-airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from datetime import datetime

# Função que será chamada para realizar as transformações no PySpark
def executar_transformacoes_pyspark():
    # Iniciar uma sessão Spark
    spark = SparkSession.builder.appName("TransformacoesLocadora").getOrCreate()

    # Leitura dos dados
    df_carros = spark.read.option("header", "true").csv("carros.csv")
    df_clientes = spark.read.option("header", "true").csv("clientes.csv")
    df_alugueis = spark.read.option("header", "true").csv("alugueis.csv")

    # Realizar as transformações nos dados
    # Exemplo de transformação: Categorizar os carros por preço
    df_carros = df_carros.withColumn(
        "categoria_preco",
        when(col("valor") <= 100, "Econômico")
        .when((col("valor") > 100) & (col("valor") <= 200), "Intermediário")
        .otherwise("Luxo")
    )

    # Exemplo de transformação: Filtrar aluguéis realizados após uma data específica
    data_corte = "2025-01-01"
    df_alugueis_filtrados = df_alugueis.filter(col("data_aluguel") > data_corte)

    # Salvar os resultados transformados em arquivos CSV
    df_carros.write.mode("overwrite").csv("carros_transformados.csv")
    df_alugueis_filtrados.write.mode("overwrite").csv("alugueis_filtrados.csv")

    # Fechar a sessão Spark
    spark.stop()

# Definir o DAG (Directed Acyclic Graph)
dag = DAG(
    "transformacoes_locadora",  # Nome do DAG
    description="Agendamento de transformações no PySpark",  # Descrição do DAG
    schedule_interval="0 * * * *",  # Agendar para rodar a cada hora
    start_date=datetime(2025, 4, 1),  # Data de início das execuções
    catchup=False,  # Evitar execução retroativa
)

# Definir a tarefa dentro do DAG
tarefa_transformacoes = PythonOperator(
    task_id="executar_transformacoes_pyspark",  # Nome da tarefa
    python_callable=executar_transformacoes_pyspark,  # Função a ser chamada
    dag=dag,
)

# Rodar a tarefa
tarefa_transformacoes




<Task(PythonOperator): executar_transformacoes_pyspark>