### Imports

In [0]:
# Módulo no PySpark que fornece classes para definir os tipos de dados das colunas de um DataFrame
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType

#  Função que converte uma coluna de string em um tipo de dado DateType
from pyspark.sql.functions import to_date

# Fornece uma interface para definir janelas de agregação ou funções
from pyspark.sql.window import Window

# Importa todas as funções disponíveis no módulo e as renomeia como F, facilitando o acesso a funções como sum() e avg()
from pyspark.sql import functions as F

# Importa a função dayofmonth (que extrai o dia do mês a partir de uma data/timestamp e col (que cria uma coluna a partir de uma expressão, utilizada para fazer referência a colunas de um DataFrame em operações)
from pyspark.sql.functions import dayofmonth, col

# Módulo que fornece classes para manipulação de data e hora em Python
from datetime import datetime

### Criação das tabelas

In [0]:
# Definindo os esquemas paras os DataFrames
# StructType é um tipo de estrutura que contém uma lista de campos (colunas).
# StructField define cada campo.

schema_clientes = StructType([
    StructField("CD_CLIENTE ", IntegerType(), True),
    StructField("NM_CLIENTE ", StringType(), True)
])

schema_transacoes = StructType([
    StructField("CD_CLIENTE", IntegerType(), False),
    StructField("DT_TRANSACAO", StringType(), False),  # Vou converter para Data mais para frente!
    StructField("CD_TRANSACAO", StringType(), False),
    StructField("VR_TRANSACAO", DoubleType(), False)
])

# 
# Tabela de Clientes (TbCliente)
clientes = [
    (1, "João"),
    (2, "Maria"),
    (3, "José"),
    (4, "Adilson"),
    (5, "Cleber")
]

# Tabela de Transações (TbTransacoes)
transacoes = [
    (1, "2021-08-28", "000", 20.00),
    (1, "2021-09-09", "110", 78.90),
    (1, "2021-09-17", "220", 58.00),
    (1, "2021-11-15", "110", 178.90),
    (1, "2021-12-24", "220", 110.37),
    (5, "2021-10-28", "110", 220.00),
    (5, "2021-11-07", "110", 380.00),
    (5, "2021-12-05", "220", 398.86),
    (5, "2021-12-14", "220", 33.90),
    (5, "2021-12-21", "220", 16.90),
    (3, "2021-10-05", "110", 720.90),
    (3, "2021-11-05", "110", 720.90),
    (3, "2021-12-05", "110", 720.90),
    (4, "2021-10-09", "000", 50.00)
]

# Criando os DataFrames através dos esquemas e dos dados
df_clientes = spark.createDataFrame(clientes, schema_clientes)
df_transacoes = spark.createDataFrame(transacoes, schema_transacoes)

# Convertendo DT_TRANSACAO para DateType(), já que PySpark não converte automaticamente strings para datas.
df_transacoes = df_transacoes.withColumn("DT_TRANSACAO", to_date(df_transacoes["DT_TRANSACAO"], "yyyy-MM-dd"))


### 01. Qual cliente teve o maior saldo médio no mês 11? 

In [0]:
'''
Vou considerar que o cálculo do saldo médio de um saldo bancário é o resultado da soma dos saldos diários dividido pelo número de dias de observação. Como não há informações quanto ao saldo e apenas quanto aos valores de transação, vou considerar que todos começaram com um saldo de R$0,00.
'''

In [0]:
# Criando a coluna 'VR_AJUSTADO' para determinar, através dos códigos, quais valores serão acrescentados ou diminuidos
# Estrutura: F.when(condição, valor) | Ou seja: quanto acontecer x, faça y
df_transacoes = df_transacoes.withColumn(
    "VR_AJUSTADO", 
    F.when(df_transacoes.CD_TRANSACAO == "000", df_transacoes.VR_TRANSACAO)
    .when(df_transacoes.CD_TRANSACAO == "110", df_transacoes.VR_TRANSACAO)
    .when(df_transacoes.CD_TRANSACAO == "220", -df_transacoes.VR_TRANSACAO)
    .otherwise(0)  # Caso algum valor não seja 000, 110 ou 220, assume 0
)

# Criando a coluna 'SALDO_ACUMULADO' que vai somar os valores de 'VR_AJUSTADO' com base na janela.
# A janela define que a soma acumulada será calculada separadamente para cada 'CD_CLIENTE'. Além disso, dentro de cada 'CD_CLIENTE', a soma será acumulada de acordo com a ordenação da coluna 'DT_TRANSACAO'. A soma será acumulada desde o começo até a transação atual.
df_transacoes = df_transacoes.withColumn(
    "SALDO_ACUMULADO",
    F.sum("VR_AJUSTADO").over(Window.partitionBy("CD_CLIENTE").orderBy("DT_TRANSACAO").rowsBetween(Window.unboundedPreceding, Window.currentRow))
)

In [0]:
# Printando o DataFrame para visualização das alterações:
df_transacoes.show()

+----------+------------+------------+------------+-----------+------------------+
|CD_CLIENTE|DT_TRANSACAO|CD_TRANSACAO|VR_TRANSACAO|VR_AJUSTADO|   SALDO_ACUMULADO|
+----------+------------+------------+------------+-----------+------------------+
|         1|  2021-08-28|         000|        20.0|       20.0|              20.0|
|         1|  2021-09-09|         110|        78.9|       78.9|              98.9|
|         1|  2021-09-17|         220|        58.0|      -58.0|40.900000000000006|
|         1|  2021-11-15|         110|       178.9|      178.9|             219.8|
|         1|  2021-12-24|         220|      110.37|    -110.37|            109.43|
|         3|  2021-10-05|         110|       720.9|      720.9|             720.9|
|         3|  2021-11-05|         110|       720.9|      720.9|            1441.8|
|         3|  2021-12-05|         110|       720.9|      720.9|            2162.7|
|         4|  2021-10-09|         000|        50.0|       50.0|              50.0|
|   

In [0]:
# Explorando o DataFrame, é possível perceber que cada usuário tem apenas uma transação em novembro:
november_data = df_transacoes.filter((col("DT_TRANSACAO") >= "2021-11-01") & (col("DT_TRANSACAO") <= "2021-11-30"))
df_transacoes_count = november_data.groupBy("CD_CLIENTE").agg(
    F.count("CD_TRANSACAO").alias("NUMERO_DE_TRANSACOES")
)

df_transacoes_unique = df_transacoes_count.select("NUMERO_DE_TRANSACOES").distinct()

df_transacoes_unique.show()

# Com base nisso, eu vou fazer o cálculo do saldo médio em novembro dividindo o mês em dois.


+--------------------+
|NUMERO_DE_TRANSACOES|
+--------------------+
|                   1|
+--------------------+



In [0]:
# Selecionando apenas os IDs únicos
# Eu transformo para RDD para usar o método flatMap, uma operação disponível apenas para RDDs no Spark e assim eu consigo transformar cada linha (que é uma tupla) de volta para o valor da coluna. Ou seja, cada linha do RDD vai gerar um único valor do tipo CD_CLIENTE em vez de uma lista ou tupla de valores.
ids = df_transacoes.select("CD_CLIENTE").distinct().rdd.flatMap(lambda x: x).collect()

In [0]:
# Definindo um dicionário para armazenar o saldo médio de cada cliente
resultados = {}

for id in ids: # filtrando apenas as transações de novembro do usuário em questão
    transacoes_novembro = df_transacoes.filter(
        (col("DT_TRANSACAO") >= "2021-11-01") & 
        (col("DT_TRANSACAO") <= "2021-11-30") &
        (col("CD_CLIENTE") == id)
    )

    if transacoes_novembro.count() > 0: # Se tiver alguma transação em novembro, eu armazeno a data
        data_transacao_novembro = transacoes_novembro.select("DT_TRANSACAO").first()[0]

        # Pego a última transação antes da de novembro
        transacao_anterior = df_transacoes.filter(
            (col("DT_TRANSACAO") < data_transacao_novembro) & 
            (col("CD_CLIENTE") == id)
        )

        # Pego a data da última transação antes da de novembro
        data_transacao_anterior = transacao_anterior.select("DT_TRANSACAO").agg(F.last("DT_TRANSACAO")).collect()[0][0]

        # Pego o saldo do usuário a partir da transação que aconteceu no mês
        saldo_novembro = transacoes_novembro.select("SALDO_ACUMULADO").first()[0]

        # Pego o saldo do usuário quando ele "entrou" em novembro
        saldo_anterior = df_transacoes.filter(
            (col("DT_TRANSACAO") == data_transacao_anterior)
        ).select("SALDO_ACUMULADO").first()[0]

        # Multiplico o saldo com o qual o usuário "entrou" em novembro pelos dias até que a transação ocorreu
        # Multiplico o saldo a partir da transação pelos dias restantes até o fim de novembro 
        # Divido pelo número total de dias em novembro
        valor = (((data_transacao_novembro.day + 1) * saldo_novembro) + ((data_transacao_novembro.day - 1) * saldo_anterior))/30
        resultados[id] = valor # Atribuo o resultado ao ID do usuário
    else: # Se não tem transação em novembro, o saldo médio será aquele a partir da última transação antes de novembro
        ultima_transacao_anterior_a_novembro = df_transacoes.filter(
            (col("DT_TRANSACAO") < "2021-11-01") & 
            (col("CD_CLIENTE") == id)
        ).orderBy(col("DT_TRANSACAO").desc()).first()

        valor = ultima_transacao_anterior_a_novembro["SALDO_ACUMULADO"]
        resultados[id] = valor



In [0]:
# Visualizando os resultados de cada cliente:
print(resultados)

{1: 136.31333333333336, 5: 204.0, 3: 384.47999999999996, 4: 50.0}


In [0]:
# Imprimindo o resultado!
maior_saldo = max(resultados.values()) 
id_maior_saldo = max(resultados, key=resultados.get)
nome_maior_saldo = df_clientes.filter(
    (col("CD_CLIENTE ") == id_maior_saldo)
).select("NM_CLIENTE ").first()

print(f'O cliente com maior saldo médio em novembro é o {nome_maior_saldo[0]} com R${maior_saldo}!')

O cliente com maior saldo médio em novembro é o José com R$384.47999999999996!


### 02. Qual é o saldo de cada cliente?

In [0]:
# Definindo a janela para cada cliente (ordenado pela data)
window_spec = Window.partitionBy("CD_CLIENTE").orderBy(F.col("DT_TRANSACAO").desc())

# Adicionando uma coluna com o último saldo acumulado de cada cliente
df_last_saldo = df_transacoes.withColumn("LAST_SALDO_ACUMULADO", F.first("SALDO_ACUMULADO").over(window_spec))

# Selecionando os últimos saldos acumulados por cliente
df_last_saldo = df_last_saldo.select("CD_CLIENTE", "LAST_SALDO_ACUMULADO").distinct()

# Resultado:
df_last_saldo.show()

+----------+--------------------+
|CD_CLIENTE|LAST_SALDO_ACUMULADO|
+----------+--------------------+
|         1|              109.43|
|         3|              2162.7|
|         4|                50.0|
|         5|  150.33999999999997|
+----------+--------------------+



### 03. Qual é o saldo médio de clientes que receberam CashBack?

In [0]:
# Verificando quais clientes receberam cashback
df_ids_cashback = df_transacoes.filter(
    (col("CD_TRANSACAO") == 000) 
).select("CD_CLIENTE")

# Selecionando apenas os ids e formando uma lista
ids_cashback = df_ids_cashback.rdd.flatMap(lambda row: row).collect() 

# Filtrando apenas os saldos dos ids selecionados
saldo_ids_cashback = df_last_saldo.filter(
    (col("CD_CLIENTE").isin(ids_cashback))
)

# Visualizar:
saldo_ids_cashback.show()


+----------+--------------------+
|CD_CLIENTE|LAST_SALDO_ACUMULADO|
+----------+--------------------+
|         1|              109.43|
|         4|                50.0|
+----------+--------------------+



### 04. Qual o ticket médio das quatro últimas movimentações dos usuários?

Normalmente o ticket médio é calculado com base em vendas, mas o enunciado diz para utilizar movimentações, então eu vou usar as transações Cash In, Out e Cashback.

In [0]:
# Particionando os dados por CD_CLIENTE e ordenando as transações de cada cliente pela data da transação em ordem decrescente (transações mais recentes primeiro).
window_spec = Window.partitionBy("CD_CLIENTE").orderBy(F.col("DT_TRANSACAO").desc())

# Para cada CD_CLIENTE, as transações serão numeradas com base na data da transação, da mais recente para a mais antiga.
df_ranked = df_transacoes.withColumn("rank", F.row_number().over(window_spec))

# Filtrando apenas as 4 últimas transações de cada cliente
df_last_4 = df_ranked.filter(F.col("rank") <= 4)

# Calculando o ticket médio
df_ticket_medio = df_last_4.groupBy("CD_CLIENTE").agg(
    F.sum("VR_TRANSACAO").alias("SOMA_TRANSACOES"),
    F.count("VR_TRANSACAO").alias("NUM_TRANSACOES")  # Conta quantas transações cada cliente tem
).withColumn(
    "TICKET_MEDIO", F.col("SOMA_TRANSACOES") / F.col("NUM_TRANSACOES")  # Divide pelo número real de transações
).select("CD_CLIENTE", "TICKET_MEDIO")

# Resultado
df_ticket_medio.show()

+----------+------------------+
|CD_CLIENTE|      TICKET_MEDIO|
+----------+------------------+
|         1|106.54249999999999|
|         3|             720.9|
|         4|              50.0|
|         5|207.41500000000002|
+----------+------------------+



### 05. Qual é a proporção entre Cash In/Out mensal?

In [0]:
# Criando um novo DataFrame com uma coluna de mês
df_transacoes_com_mes = df_transacoes.withColumn("ANO_MES", F.date_format("DT_TRANSACAO", "yyyy-MM"))

df_cash_in = df_transacoes_com_mes.filter(col("CD_TRANSACAO") == 110) # Todas as transações de Cash In
df_cash_out = df_transacoes_com_mes.filter(col("CD_TRANSACAO") == 220) # Todas as transações de Cash Out

# Agrupando por mês e calcular a soma de cada categoria
df_soma_cash_in = df_cash_in.groupBy("ANO_MES").agg(F.sum("VR_TRANSACAO").alias("SOMA_CASH_IN"))
df_soma_cash_out = df_cash_out.groupBy("ANO_MES").agg(F.sum("VR_TRANSACAO").alias("SOMA_CASH_OUT"))

# Juntando os dois DataFrames pelo mês
df_resultado = df_soma_cash_in.join(df_soma_cash_out, "ANO_MES", "outer")

# Calculando o total por mês
df_resultado = df_resultado.withColumn(
    "TOTAL", F.coalesce(col("SOMA_CASH_IN"), F.lit(0)) + F.coalesce(col("SOMA_CASH_OUT"), F.lit(0))
)

# Calculando o percentual de cada categoria no mês
df_resultado = df_resultado.withColumn(
    "PERCENTUAL_CASH_IN",
    (F.col("SOMA_CASH_IN") / F.col("TOTAL") * 100).cast("double")
).withColumn(
    "PERCENTUAL_CASH_OUT",
    (F.col("SOMA_CASH_OUT") / F.col("TOTAL") * 100).cast("double")
)

# Substituir valores nulos por 0, se tiver
df_resultado = df_resultado.fillna(0)

# Mostrar o resultado
df_resultado.show()

+-------+------------+-------------+------------------+------------------+-------------------+
|ANO_MES|SOMA_CASH_IN|SOMA_CASH_OUT|             TOTAL|PERCENTUAL_CASH_IN|PERCENTUAL_CASH_OUT|
+-------+------------+-------------+------------------+------------------+-------------------+
|2021-09|        78.9|         58.0|             136.9| 57.63330898466034|  42.36669101533966|
|2021-10|       940.9|          0.0|             940.9|             100.0|                0.0|
|2021-11|      1279.8|          0.0|            1279.8|             100.0|                0.0|
|2021-12|       720.9|       560.03|1280.9299999999998| 56.27942198246587|  43.72057801753414|
+-------+------------+-------------+------------------+------------------+-------------------+



### 06. Qual a última transação de cada tipo para cada usuário?

In [0]:

# Agrupando as transações por cliente e código de transação.
# Em seguida, obtém a última data e o último valor de transação dentro de cada grupo usando a função last().
df_ultima_transacao = df_transacoes.groupBy("CD_CLIENTE", "CD_TRANSACAO").agg(
    F.last("DT_TRANSACAO").alias("ULTIMA_DT_TRANSACAO"),
    F.last("VR_TRANSACAO").alias("ULTIMO_VALOR_TRANSACAO")
)

df_ultima_transacao.show()

+----------+------------+-------------------+----------------------+
|CD_CLIENTE|CD_TRANSACAO|ULTIMA_DT_TRANSACAO|ULTIMO_VALOR_TRANSACAO|
+----------+------------+-------------------+----------------------+
|         1|         000|         2021-08-28|                  20.0|
|         1|         110|         2021-11-15|                 178.9|
|         1|         220|         2021-12-24|                110.37|
|         5|         110|         2021-11-07|                 380.0|
|         5|         220|         2021-12-21|                  16.9|
|         3|         110|         2021-12-05|                 720.9|
|         4|         000|         2021-10-09|                  50.0|
+----------+------------+-------------------+----------------------+



### 07. Qual a última transação de cada tipo para cada usuário por mês?

In [0]:
# Criando uma nova coluna "MES_ANO" no DataFrame df_transacoes formatando a coluna "DT_TRANSACAO" para o formato "yyyy-MM", e depois agrupando os dados por cliente, transação e mês/ano para obter a última data e o último valor de transação em cada grupo.
df_transacoes_mes = df_transacoes.withColumn("MES_ANO", F.date_format("DT_TRANSACAO", "yyyy-MM"))

df_ultima_transacao_mes = df_transacoes_mes.groupBy("CD_CLIENTE", "CD_TRANSACAO", "MES_ANO").agg(
    F.last("DT_TRANSACAO").alias("ULTIMA_DT_TRANSACAO"),
    F.last("VR_TRANSACAO").alias("ULTIMO_VALOR_TRANSACAO")
)

df_ultima_transacao_mes.show()


+----------+------------+-------+-------------------+----------------------+
|CD_CLIENTE|CD_TRANSACAO|MES_ANO|ULTIMA_DT_TRANSACAO|ULTIMO_VALOR_TRANSACAO|
+----------+------------+-------+-------------------+----------------------+
|         1|         000|2021-08|         2021-08-28|                  20.0|
|         1|         110|2021-09|         2021-09-09|                  78.9|
|         1|         220|2021-09|         2021-09-17|                  58.0|
|         1|         220|2021-12|         2021-12-24|                110.37|
|         1|         110|2021-11|         2021-11-15|                 178.9|
|         5|         110|2021-11|         2021-11-07|                 380.0|
|         5|         110|2021-10|         2021-10-28|                 220.0|
|         5|         220|2021-12|         2021-12-21|                  16.9|
|         3|         110|2021-10|         2021-10-05|                 720.9|
|         3|         110|2021-11|         2021-11-05|                 720.9|

### 08. Qual quantidade de usuários que movimentaram a conta?

In [0]:
# Contando quantos ids distintos tem na taela de transações
qtd_usuarios = df_transacoes.select("CD_CLIENTE").distinct().count()

print(f"Quantidade de usuários que movimentaram a conta: {qtd_usuarios}")


Quantidade de usuários que movimentaram a conta: 4


### 09. Qual o balanço do final de 2021?

Para esse cálculo, vou considerar com balanço  diferença entre entradas (cash-in + cashback) e saídas (cash-out).

In [0]:
# Somando cada categoria
soma_cash_in = df_transacoes.filter(col("CD_TRANSACAO") == 110).agg(F.sum("VR_TRANSACAO")).collect()[0][0]
soma_cash_out = df_transacoes.filter(col("CD_TRANSACAO") == 220).agg(F.sum("VR_TRANSACAO")).collect()[0][0]
soma_cashback = df_transacoes.filter(col("CD_TRANSACAO") == 000).agg(F.sum("VR_TRANSACAO")).collect()[0][0]

balanço = soma_cash_in + soma_cashback - soma_cash_out
print(f"O balanço é de R${balanço}!")


O balanço é de R$2472.4700000000003!


### 10. Quantos usuários que receberam CashBack continuaram interagindo com este banco?

In [0]:
# Filtrando as transações de cashback 
cashback_transacoes = df_transacoes.filter(col("CD_TRANSACAO") == 000)

# Seleciona o CD_CLIENTE e a data da transação de cashback 
cashback_usuarios = cashback_transacoes.select("CD_CLIENTE", "DT_TRANSACAO").distinct()

# Para cada usuário que recebeu cashback, verifica se há transações posteriores
usuarios_com_movimentacao_pos_cashback = cashback_usuarios.alias('cb').join(
    df_transacoes.alias('dt'),
    (col('cb.CD_CLIENTE') == col('dt.CD_CLIENTE')) & 
    (col('dt.DT_TRANSACAO') > col('cb.DT_TRANSACAO')),
    how='inner'
).select('cb.CD_CLIENTE').distinct()

# Contabiliza quantos usuários tiveram transações após o cashback
quantidade_usuarios = usuarios_com_movimentacao_pos_cashback.count()

print("Quantidade de usuários que continuaram interagindo após o cashback:", quantidade_usuarios)


Quantidade de usuários que continuaram interagindo após o cashback: 1


### 11. Qual a primeira e a última movimentação dos usuários com saldo maior que R$100?


In [0]:
# Filtra os clientes com saldo maior que 100
clientes_com_saldo_maior_que_100 = df_last_saldo.filter(df_last_saldo["LAST_SALDO_ACUMULADO"] > 100)

# Pega os IDs dos clientes
ids_com_saldo_maior_que_100 = clientes_com_saldo_maior_que_100.select("CD_CLIENTE").distinct()

# Pega a transação mais antiga e mais recente de cada cliente
transacoes_mais_antiga_mais_recente = df_transacoes.join(
    ids_com_saldo_maior_que_100, 
    "CD_CLIENTE", 
    "inner"
).groupBy("CD_CLIENTE").agg(
    F.min("DT_TRANSACAO").alias("DATA_TRANSACAO_ANTIGA"),
    F.max("DT_TRANSACAO").alias("DATA_TRANSACAO_RECENTE")
)

# Coleta todas as informações da transação mais antiga e mais recente para cada cliente
transacoes_completas = df_transacoes.join(
    transacoes_mais_antiga_mais_recente,
    (df_transacoes["CD_CLIENTE"] == transacoes_mais_antiga_mais_recente["CD_CLIENTE"]) &
    ((df_transacoes["DT_TRANSACAO"] == transacoes_mais_antiga_mais_recente["DATA_TRANSACAO_ANTIGA"]) |
     (df_transacoes["DT_TRANSACAO"] == transacoes_mais_antiga_mais_recente["DATA_TRANSACAO_RECENTE"])),
    "inner"
)

# Adiciona uma coluna para indicar se a transação é a mais antiga ou mais recente
transacoes_completas = transacoes_completas.withColumn(
    "TIPO_TRANSACAO",
    F.when(df_transacoes["DT_TRANSACAO"] == transacoes_completas["DATA_TRANSACAO_ANTIGA"], "Mais Antiga")
     .when(df_transacoes["DT_TRANSACAO"] == transacoes_completas["DATA_TRANSACAO_RECENTE"], "Mais Recente")
     .otherwise("Outro")
)

# Remove as colunas extras
transacoes_completas = transacoes_completas.drop("DATA_TRANSACAO_ANTIGA", "DATA_TRANSACAO_RECENTE")

# Exibe o resultado
transacoes_completas.show()


+----------+------------+------------+------------+-----------+------------------+----------+--------------+
|CD_CLIENTE|DT_TRANSACAO|CD_TRANSACAO|VR_TRANSACAO|VR_AJUSTADO|   SALDO_ACUMULADO|CD_CLIENTE|TIPO_TRANSACAO|
+----------+------------+------------+------------+-----------+------------------+----------+--------------+
|         1|  2021-12-24|         220|      110.37|    -110.37|            109.43|         1|  Mais Recente|
|         1|  2021-08-28|         000|        20.0|       20.0|              20.0|         1|   Mais Antiga|
|         5|  2021-12-21|         220|        16.9|      -16.9|150.33999999999997|         5|  Mais Recente|
|         5|  2021-10-28|         110|       220.0|      220.0|             220.0|         5|   Mais Antiga|
|         3|  2021-12-05|         110|       720.9|      720.9|            2162.7|         3|  Mais Recente|
|         3|  2021-10-05|         110|       720.9|      720.9|             720.9|         3|   Mais Antiga|
+----------+-------

### 12. Qual o balanço das últimas quatro movimentações de cada usuário?


In [0]:
# Definir uma janela para cada cliente, ordenando pelas transações mais recentes
windowSpec = Window.partitionBy("CD_CLIENTE").orderBy(F.col("DT_TRANSACAO").desc())

# Selecionar as últimas 4 transações de cada cliente
df_ultimas_4 = df_transacoes.withColumn("row_number", F.row_number().over(windowSpec)) \
                            .filter(F.col("row_number") <= 4)

# Calcular o balanço (cash in + cashback - cashout)
df_balanco = df_ultimas_4.withColumn(
    "BALANCO",
    F.when(F.col("CD_TRANSACAO") == 110, F.col("VR_TRANSACAO"))  # Cash in (110) - Somar
    .when(F.col("CD_TRANSACAO") == 220, -F.col("VR_TRANSACAO"))  # Cashout (220) - Subtrair
    .when(F.col("CD_TRANSACAO") == 0, F.col("VR_TRANSACAO"))   # Cashback (000) - Somar
    .otherwise(0)  # Para outras transações, não afeta o balanço
)

# Agrupar por cliente e somar o saldo de todas as transações
df_balanco_final = df_balanco.groupBy("CD_CLIENTE").agg(
    F.sum("BALANCO").alias("BALANCO_ULTIMAS_4")
)

# Exibir os resultados
df_balanco_final.show(truncate=False)

+----------+------------------+
|CD_CLIENTE|BALANCO_ULTIMAS_4 |
+----------+------------------+
|1         |89.43             |
|3         |2162.7            |
|4         |50.0              |
|5         |-69.66000000000003|
+----------+------------------+

