In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, FloatType
from pyspark.sql.functions import *
from pyspark.sql.window import *
import random
from pyspark.sql import Row
# Criando uma sessão Spark
spark = SparkSession.builder.appName("SQL").getOrCreate()

# Criando DataFrame de clientes
schema_clientes = StructType([
    StructField('cliente_id', IntegerType(), True),
    StructField('nome', StringType(), True),
    StructField('idade', IntegerType(), True),
    StructField('cidade', StringType(), True),
    StructField('genero', StringType(), True)
])

clientes_data = [
    (i, 'Cliente' + str(i), random.randint(18, 65),
     random.choice(['São Paulo', 'Rio de Janeiro', 'Belo Horizonte', 'Brasília']),
     random.choice(['M', 'F']))
    for i in range(1, 10000)
]

df_clientes = spark.createDataFrame(clientes_data, schema=schema_clientes)

# Criando DataFrame de produtos
schema_produtos = StructType([
    StructField('produto_id', IntegerType(), True),
    StructField('nome_produto', StringType(), True),
    StructField('categoria', StringType(), True),
    StructField('preco', FloatType(), True)
   
])

produtos_data = [
    Row(
        produto_id=i,
        nome_produto='Produto' + str(i),
        categoria=random.choice(['Eletrônicos', 'Roupas', 'Alimentos', 'Acessórios']),
        preco=random.uniform(10, 10000),  # Convertendo para float
        disponibilidade=random.choice([True, False])
    )
    for i in range(1, 10000)
]

# Criando o DataFrame de produtos
df_produtos = spark.createDataFrame(produtos_data)

# # Criando DataFrame de vendas
schema_vendas = StructType([
    StructField('venda_id', IntegerType(), True),
    StructField('cliente_id', IntegerType(), True),
    StructField('produto_id', IntegerType(), True),
    StructField('quantidade', IntegerType(), True),
    StructField('data_venda', StringType(), True)
])

vendas_data = [
    (i, random.randint(1, 10000), random.randint(1, 50), random.randint(1, 5),  random.choice(['2023-01-01', '2022-01-01', '2021-01-01']))
    for i in range(1, 10000)
]

df_vendas = spark.createDataFrame(vendas_data, schema=schema_vendas)

# Criando DataFrame de transações
schema_transacoes = StructType([
    StructField('transacao_id', IntegerType(), True),
    StructField('produto_id', IntegerType(), True),
    StructField('valor', FloatType(), True),
    StructField('tipo', StringType(), True),
    StructField('status', StringType(), True)
])

transacoes_data = [
    (i, random.randint(1, 50), random.uniform(50, 500),
     random.choice(['Débito', 'Crédito']), random.choice(['Aprovado', 'Pendente', 'Rejeitado']))
    for i in range(1, 10000)
]

df_transacoes = spark.createDataFrame(transacoes_data, schema=schema_transacoes)

In [52]:
# Criando visões temporárias para os DataFrames
df_clientes.createOrReplaceTempView("clientes")
df_vendas.createOrReplaceTempView("vendas")
df_produtos.createOrReplaceTempView("produtos")
df_transacoes.createOrReplaceTempView("transacoes")

In [35]:
from pyspark.sql.window import *

In [137]:
query = """
   WITH VendasValidadas AS (
    SELECT
        v.venda_id,
        v.cliente_id,
        v.produto_id,
        v.quantidade,
        v.data_venda,
        t.status AS status_transacao,
        ROW_NUMBER() OVER(PARTITION BY v.cliente_id ORDER BY v.data_venda DESC) AS rn
    FROM
        vendas v
    LEFT JOIN
        transacoes t ON v.produto_id = t.produto_id
    WHERE
        v.data_venda LIKE '2023%'
        AND t.status IN ('Aprovado', 'Pendente')
        AND t.status NOT IN ('Rejeitado')
),
VendasClientes AS (
    SELECT
        c.cliente_id,
        c.nome AS nome_cliente,
        c.idade,
        COUNT(vv.venda_id) AS total_vendas,
        CASE
            WHEN c.idade > 30 THEN 'Maior que 30'
            ELSE '30 ou Menos'
        END AS faixa_etaria
    FROM
        clientes c
    LEFT JOIN
        VendasValidadas vv ON c.cliente_id = vv.cliente_id
    WHERE
        c.cidade LIKE 'São%'
    GROUP BY
        c.cliente_id, c.nome, c.idade
    HAVING
        COUNT(vv.venda_id) > 2
),
TopClientes AS (
    SELECT
        cliente_id,
        nome_cliente,
        idade,
        total_vendas,
        faixa_etaria,
        ROW_NUMBER() OVER(PARTITION BY cliente_id, nome_cliente, idade, total_vendas ORDER BY total_vendas DESC) AS ranking
    FROM
        VendasClientes
)
SELECT
    tc.cliente_id,
    tc.nome_cliente,
    tc.idade,
    tc.total_vendas,
    tc.faixa_etaria,
    tc.ranking,
    SUM(p.preco) AS valor_total_compras
FROM
    TopClientes tc
JOIN
    VendasValidadas vv ON tc.cliente_id = vv.cliente_id
JOIN
    produtos p ON vv.produto_id = p.produto_id
WHERE
    vv.rn = 1
GROUP BY
    tc.cliente_id, tc.nome_cliente, tc.idade, tc.total_vendas, tc.faixa_etaria, tc.ranking
HAVING
    SUM(p.preco) > 1
ORDER BY
    tc.nome_cliente DESC, tc.ranking;

"""
result = spark.sql(query)
result.show()

+----------+------------+-----+------------+------------+-------+-------------------+
|cliente_id|nome_cliente|idade|total_vendas|faixa_etaria|ranking|valor_total_compras|
+----------+------------+-----+------------+------------+-------+-------------------+
|      9997| Cliente9997|   54|         125|Maior que 30|      1| 3051.5832139488152|
|      9989| Cliente9989|   53|         126|Maior que 30|      1| 1208.1925963414535|
|      9977| Cliente9977|   27|         137| 30 ou Menos|      1|  8952.833799778895|
|      9970| Cliente9970|   42|         147|Maior que 30|      1| 1085.7269217761611|
|      9960| Cliente9960|   51|         130|Maior que 30|      1| 2729.0503313397444|
|      9952| Cliente9952|   47|         145|Maior que 30|      1| 2567.1457045712677|
|      9933| Cliente9933|   33|         131|Maior que 30|      1|  9970.298775861726|
|      9908| Cliente9908|   60|         114|Maior que 30|      1| 1463.8195504942883|
|      9865| Cliente9865|   63|         260|Maior que 

##### Valid Sales

In [66]:
spec_windown = Window.partitionBy(col("cliente_id")).orderBy(col("data_venda").desc())
df_sales_2023 = df_vendas.filter(col("data_venda").like("2023%"))
df_valid_transactions = df_transacoes.filter((col("status").isin(["Aprovado", "Pendente"])) & (~col("status").isin(["Rejeitado"])))

df_valid_sales = df_sales_2023.alias("v").join(
    df_valid_transactions.alias("t"), on=col("v.produto_id") == col("t.produto_id"), how="left")\
    .select(
        "v.venda_id",
        "v.cliente_id",
        "v.produto_id",
        "v.quantidade",
        "v.data_venda",
        col("t.status").alias("status_transacao"),
        row_number().over(spec_windown).alias("rn")
    )

##### Customer Sales

In [169]:
df_customers_sp = df_clientes\
    .filter(col("cidade").like("São%"))\
    .withColumn("faixa_etaria", when(col("idade") > 30, "Maior que 30").otherwise("30 ou menos"))

df_customer_sales = df_customers_sp.alias("c").join(
    df_valid_sales.alias("v"), on=col("c.cliente_id") == col("v.cliente_id"), how="left")\
    .groupBy("c.cliente_id", "c.nome", "c.idade")\
    .agg(count("v.venda_id").alias("total_vendas"))\
    .filter("total_vendas > 2")

In [167]:
query = """
   WITH VendasValidadas AS (
    SELECT
        v.venda_id,
        v.cliente_id,
        v.produto_id,
        v.quantidade,
        v.data_venda,
        t.status AS status_transacao,
        ROW_NUMBER() OVER(PARTITION BY v.cliente_id ORDER BY v.data_venda DESC) AS rn
    FROM
        vendas v
    LEFT JOIN
        transacoes t ON v.produto_id = t.produto_id
    WHERE
        v.data_venda LIKE '2023%'
        AND t.status IN ('Aprovado', 'Pendente')
        AND t.status NOT IN ('Rejeitado')
),
VendasClientes AS (
    SELECT
        c.cliente_id,
        c.nome AS nome_cliente,
        c.idade,
        COUNT(vv.venda_id) AS total_vendas,
        CASE
            WHEN c.idade > 30 THEN 'Maior que 30'
            ELSE '30 ou Menos'
        END AS faixa_etaria
    FROM
        clientes c
    LEFT JOIN
        VendasValidadas vv ON c.cliente_id = vv.cliente_id
    WHERE
        c.cidade LIKE 'São%'
    GROUP BY
        c.cliente_id, c.nome, c.idade
    HAVING
        COUNT(vv.venda_id) > 2
)
select * from VendasClientes

"""
result = spark.sql(query)
result.count()

675