In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json

In [3]:
spark = SparkSession.builder.appName('Rateio Etapa 1').getOrCreate()

In [4]:
spark

## Funções auxiliares

In [5]:
@F.udf('string')
def formato_data(data):
  if data:
    if '/' in data:
      data_ = data.split('/')
      return data_[-1] + '-' + data_[1] + '-' + data_[0]
  return data

@F.udf('string')
def flag_rateio(rateio):
  if rateio:
    return 'S'
  return 'N'

@F.udf('double')
def valor_pos_rateio(valor, pos_rateio):
  if pos_rateio is None:
    return valor
  return pos_rateio

## Leitura dos dados

In [6]:
json_lancamentos = spark.read.json('/content/data/lancamentos.json')

In [7]:
lancamentos = json_lancamentos.withColumn('array_columns', F.array(*[json_lancamentos.columns]))\
                              .select('array_columns')\
                              .withColumn('columns', F.explode('array_columns'))\
                              .select('columns')\
                              .select("columns.ds_periodicidade","columns.dt_competencia","columns.dt_pagamento","columns.dt_vencimento","columns.id_centro_custo",
"columns.id_centro_resultado","columns.id_forma_pagamento","columns.id_lancamento","columns.valor")

In [8]:
lancamentos.show(2,False)

+----------------+--------------+------------+-------------+---------------+-------------------+------------------+-------------+-------+
|ds_periodicidade|dt_competencia|dt_pagamento|dt_vencimento|id_centro_custo|id_centro_resultado|id_forma_pagamento|id_lancamento|valor  |
+----------------+--------------+------------+-------------+---------------+-------------------+------------------+-------------+-------+
|NULL            |30/11/2024    |NULL        |2024-10-31   |125737         |288                |236252            |47152863     |-125.49|
|NULL            |31/10/2024    |NULL        |2024-10-31   |32             |902                |236252            |46975400     |-209.13|
+----------------+--------------+------------+-------------+---------------+-------------------+------------------+-------------+-------+
only showing top 2 rows



In [9]:
lancamentos.count()

1312

In [11]:
json_metricas = spark.read.json('/content/data/metricas.json')

In [13]:
metricas = json_metricas.withColumn('array_columns', F.array(*[json_metricas.columns]))\
                              .select('array_columns')\
                              .withColumn('columns', F.explode('array_columns'))\
                              .select('columns')\
                              .select("columns.ds_canal_aquisicao","columns.ds_metrica","columns.ds_segmento","columns.dt_referencia","columns.total")

In [14]:
metricas.show(5,False)

+------------------+----------+-----------+-------------+--------+
|ds_canal_aquisicao|ds_metrica|ds_segmento|dt_referencia|total   |
+------------------+----------+-----------+-------------+--------+
|canalA            |metrica_1 |segmento-S |2024-10-31   |10906.27|
|canalA            |metrica_1 |segmento-M |2024-11-30   |21518.88|
|canalB            |metrica_2 |segmento-R |2022-09-30   |11646.6 |
|canalB            |metrica_3 |segmento-M |2023-06-30   |21305.87|
|canalB            |metrica_3 |segmento-S |2023-06-30   |26511.85|
+------------------+----------+-----------+-------------+--------+
only showing top 5 rows



In [15]:
metricas.count()

573

## Rateio 1 - CENTROS (100 | 204) - Canais (A e B) por segmentos
1. Realizar o cast de todos os campos de datas e o campo de valor
1. Filtro dos lançamentos dos meses de OUTUBRO e NOVEMBRO
1. Separar o dataset em 2:
  - Filtro dos centros 100 (centro custo - 0) e 204 (centro de custo 81044)
  - Restando do dataset fora os centros (100 e 204)
1. Preparar o dataset de métricas, agrupando o valores totais por CANAL (ANO,MES) | SEGMENTO (ANO, MES) | TOTAL (CANAL + SEGMENTO)(ANO, MES) | TOTAL_METRICA_2 (ANO, MES)
1. Calculo do valor (metrica do canal+segmento / total metrica 2)
1. Join com a tabela de LANÇAMENTOS e calculo com o Valor de Lançamento
1. Criação de colunas de FLAG_RATEIO indicando se houve rateio para aquele dado e coluna VALOR_POS_RATEIO
1. Select e ordenaçao de colunas para uma melhor visualização dos dados
1. Union com os dados que não foram analisados para o rateio
1. Salvamento dos dados .parquet

In [16]:
lanc = lancamentos.withColumn('dt_vencimento',F.col('dt_vencimento').cast('timestamp'))\
                  .withColumn('dt_pagamento',F.col('dt_pagamento').cast('timestamp'))\
                  .withColumn('dt_competencia',formato_data(F.col('dt_competencia')).cast('timestamp'))\
                  .withColumn('valor',F.col('valor').cast('double'))

### Separando os DATASETS

In [17]:
## Parte do dataset para RATEIO
lanc_raterio = lanc.where('month(dt_competencia) = 10 or month(dt_competencia) = 11')\
                   .where('id_centro_resultado = 100 or id_centro_resultado = 204')

In [18]:
## Parte do dataset que NAO sera feito RATEIO
lanc_nao_raterio = lanc.where('(month(dt_competencia) != 10 and month(dt_competencia) != 11) or (id_centro_resultado != 100 and id_centro_resultado != 204)')\
                       .withColumn('flag_rateio', F.lit('N').cast('string'))\
                       .withColumn('valor_pos_rateio', F.col('valor'))\
                       .withColumn('ds_canal_aquisicao', F.lit(None))\
                       .withColumn('ds_segmento', F.lit(None))\
                       .select(["id_lancamento", "id_centro_custo", "id_centro_resultado", "id_forma_pagamento","ds_periodicidade", "dt_vencimento", "dt_pagamento", "dt_competencia", "valor","ds_canal_aquisicao","ds_segmento", "flag_rateio", "valor_pos_rateio"])

In [19]:
# Count total da tabela
#1312

In [20]:
# Count dos valores para RATEIO
lanc_raterio.count()

172

In [21]:
# Count dos valores para NAO RATEIO
lanc_nao_raterio.count()

1140

### Preparando o dataset de metricas

In [22]:
metr = metricas.where("ds_metrica like 'metrica_2'")\
               .withColumn('dt_referencia', F.col('dt_referencia').cast('timestamp'))\
               .withColumn('total',F.col('total').cast('double'))\
               .withColumn('year', F.year('dt_referencia'))\
               .withColumn('month', F.month('dt_referencia'))\
               .where('month = 10 or month = 11')\
               .where('(ds_canal_aquisicao like "canalA" or ds_canal_aquisicao like "canalB") and ds_segmento like "%segmento%"')

In [23]:
metr.count()

37

In [24]:
metr.show(5,False)

+------------------+----------+-----------+-------------------+-----------------+----+-----+
|ds_canal_aquisicao|ds_metrica|ds_segmento|dt_referencia      |total            |year|month|
+------------------+----------+-----------+-------------------+-----------------+----+-----+
|canalA            |metrica_2 |segmento-S |2022-11-30 00:00:00|509890.3099999999|2022|11   |
|canalB            |metrica_2 |segmento-S |2022-11-30 00:00:00|484163.4900000003|2022|11   |
|canalA            |metrica_2 |segmento-M |2022-10-31 00:00:00|803912.6300000001|2022|10   |
|canalA            |metrica_2 |segmento-R |2022-10-31 00:00:00|15843.29         |2022|10   |
|canalA            |metrica_2 |segmento-S |2022-10-31 00:00:00|506732.48        |2022|10   |
+------------------+----------+-----------+-------------------+-----------------+----+-----+
only showing top 5 rows



In [25]:
metrica_2_mes = metr.select('year','month','total')\
                    .groupBy(['year','month',])\
                    .agg(F.sum("total").alias("total_aquisicao_seguimento_mes"))

In [26]:
metrica_2_mes.show(5,False)

+----+-----+------------------------------+
|year|month|total_aquisicao_seguimento_mes|
+----+-----+------------------------------+
|2022|10   |2389182.9500000007            |
|2024|10   |5069966.460000005             |
|2022|11   |2409471.19                    |
|2023|11   |4382979.360000007             |
|2024|11   |5131834.320000003             |
+----+-----+------------------------------+
only showing top 5 rows



### Calculando -> Valor Rateado para Canal = (Métrica do Canal / Total da Métrica)

In [27]:
rateio_canal_segmento = metr.join(metrica_2_mes, ['year','month'])\
                            .withColumn('canal_segmento_por_total_metrica',F.col('total')/F.col('total_aquisicao_seguimento_mes'))\
                            .withColumnRenamed('dt_referencia','dt_competencia')\
                            .select('dt_competencia','ds_canal_aquisicao','ds_segmento','total','total_aquisicao_seguimento_mes','canal_segmento_por_total_metrica')

In [28]:
rateio_canal_segmento.orderBy('dt_competencia').show(5,False)

+-------------------+------------------+-----------+-----------------+------------------------------+--------------------------------+
|dt_competencia     |ds_canal_aquisicao|ds_segmento|total            |total_aquisicao_seguimento_mes|canal_segmento_por_total_metrica|
+-------------------+------------------+-----------+-----------------+------------------------------+--------------------------------+
|2022-10-31 00:00:00|canalB            |segmento-R |11646.6          |2389182.9500000007            |0.004874720874765994            |
|2022-10-31 00:00:00|canalB            |segmento-S |468160.9000000006|2389182.9500000007            |0.1959502096731439              |
|2022-10-31 00:00:00|canalB            |segmento-M |582887.0499999999|2389182.9500000007            |0.24396919875893128             |
|2022-10-31 00:00:00|canalA            |segmento-S |506732.48        |2389182.9500000007            |0.21209446518107783             |
|2022-10-31 00:00:00|canalA            |segmento-R |158

In [29]:
rateio = lanc_raterio.join(rateio_canal_segmento, ['dt_competencia'],'left')

In [30]:
# Alguns valores ficaram duplicados, devido a sua condição de ter relação com mais de um canal (A/B) e segmento.
rateio.count()

262

### Calculo final de RATEIO - relação das métricas * Valor do Lançamento

In [31]:
rateio_final = rateio.withColumn('valor_pos_rateio',F.col('valor')*F.col('canal_segmento_por_total_metrica'))\
                     .withColumn('flag_rateio', flag_rateio(F.col('canal_segmento_por_total_metrica')))\
                     .withColumn('valor_pos_rateio', valor_pos_rateio(F.col('valor'), F.col('valor_pos_rateio')))\
                     .select(["id_lancamento", "id_centro_custo", "id_centro_resultado", "id_forma_pagamento","ds_periodicidade", "dt_vencimento", "dt_pagamento", "dt_competencia", "valor","ds_canal_aquisicao","ds_segmento", "flag_rateio", "valor_pos_rateio"])

In [32]:
rateio_final.orderBy('dt_competencia').where('flag_rateio like "S"').show(15,False)

+-------------+---------------+-------------------+------------------+----------------+-------------------+-------------------+-------------------+--------+------------------+-----------+-----------+-------------------+
|id_lancamento|id_centro_custo|id_centro_resultado|id_forma_pagamento|ds_periodicidade|dt_vencimento      |dt_pagamento       |dt_competencia     |valor   |ds_canal_aquisicao|ds_segmento|flag_rateio|valor_pos_rateio   |
+-------------+---------------+-------------------+------------------+----------------+-------------------+-------------------+-------------------+--------+------------------+-----------+-----------+-------------------+
|47475185     |0              |100                |402853            |NULL            |2024-10-31 00:00:00|2024-10-31 00:00:00|2024-10-31 00:00:00|-4544.0 |canalA            |segmento-M |S          |-1231.8146963678341|
|47475174     |0              |100                |402853            |NULL            |2024-10-31 00:00:00|2024-10-31 00

### Union com a outra parte do dataset que não estava dentro das exigências para o RATEAMENTO

In [33]:
save = rateio_final.union(lanc_nao_raterio)

In [34]:
save.count()

1402

In [35]:
save.coalesce(1).write.mode('overwrite').parquet('/content/save/etapa1')