
## Overview

Esse notebook cria uma tabela permanente no DBFS com os dados de Cartão Corporativo do Governo Federal. Esta tabela é considerada a tabela 'fato', pois contém os valores das transações. A tabela também foi normalizada para compor o modelo 'estrela'. 

In [0]:
from datetime import datetime
from pyspark.sql.functions import col, date_format

In [0]:
# File location and type
file_location = "/FileStore/tables/202401_CPGF-1.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ";"
encode = 'latin1'

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("encoding", encode)  \
  .load(file_location)

# display(df)

In [0]:
cols_to_drop = ["NOME ÓRGÃO SUPERIOR", "NOME ÓRGÃO", "NOME UNIDADE GESTORA"]
df = df.drop(*cols_to_drop)


In [0]:
df = df.withColumn("DATA TRANSAÇÃO", date_format(col("DATA TRANSAÇÃO"), "dd/MM/yyyy"))

In [0]:
df = df.to_pandas_on_spark()

In [0]:
# df["DATA TRANSAÇÃO"] = df["DATA TRANSAÇÃO"].apply(lambda x: datetime.strptime(str(x), "%d/%m/%Y") if x not null else str(x))

In [0]:
df["VALOR TRANSAÇÃO"] = df["VALOR TRANSAÇÃO"].apply(lambda x: float(str(x).replace(",", ".")))

In [0]:
df = df[(df["CNPJ OU CPF FAVORECIDO"] != -1) & (df["CNPJ OU CPF FAVORECIDO"] != -2)]

In [0]:
column_names = ["cod_org_sup", "cod_org", "unid_gestora", 
                "ano_extrato", "mes_extrato", "cpf_portador", 
                "nome_portador", "cnpj_cpf_fav", "nome_fav",
                "transaction", "dt_trans", "vlr_trans"]

df.columns = column_names

In [0]:
df = df.to_spark()

In [0]:
# cpgf -> cartão de pagamentos do governo federal
permanent_table_name = "fato_cpgf"
# df.write.format('parquet').saveAsTable(permanent_table_name)
df.write.mode("overwrite").saveAsTable(permanent_table_name)

In [0]:
%sql
SELECT
    fc.cod_org, ol.ORG_LOTACAO,     
    ROUND(SUM(fc.vlr_trans), 2) AS total_gasto
FROM
    fato_cpgf fc
INNER JOIN 
    org_lotacao ol 
ON
    fc.cod_org = ol.COD_ORG_LOTACAO
GROUP BY
    fc.cod_org, ol.ORG_LOTACAO
ORDER BY
    total_gasto DESC
LIMIT 10

In [0]:
%sql
SELECT 
  fc.nome_portador, ug.UORG_LOTACAO, 
  round(sum(fc.vlr_trans), 2) AS total_gasto
FROM 
  fato_cpgf fc
INNER JOIN
  cadastro_servidores cs
ON
  fc.cpf_portador = cs.CPF
INNER JOIN
  qualificacao_servidores qs
ON
  qs.Id_SERVIDOR_PORTAL = cs.id_SERVIDOR_PORTAL
INNER JOIN
  unid_org ug
ON
  qs.COD_UORG_LOTACAO = ug.COD_UORG_LOTACAO
GROUP BY
  fc.nome_portador, ug.UORG_LOTACAO
ORDER BY
  total_gasto DESC
LIMIT 10

In [0]:
%sql
SELECT
    fc.cod_org, ol.ORG_LOTACAO, 
    ROUND(SUM(fc.vlr_trans), 2) AS total_gasto
FROM
    fato_cpgf fc
INNER JOIN 
    org_lotacao ol 
ON
    fc.cod_org = ol.COD_ORG_LOTACAO
GROUP BY
    fc.cod_org, ol.ORG_LOTACAO
ORDER BY
    total_gasto DESC

In [0]:
%sql
WITH avgspending AS (
    SELECT
        ROUND(AVG(total_gasto), 2) AS avg_gasto
    FROM (
        SELECT
            fc.cod_org,
            ol.ORG_LOTACAO,
            SUM(fc.vlr_trans) AS total_gasto
        FROM
            fato_cpgf fc
        INNER JOIN 
            org_lotacao ol 
        ON
            fc.cod_org = ol.COD_ORG_LOTACAO
        GROUP BY
            fc.cod_org,
            ol.ORG_LOTACAO
  )
)

SELECT
    fc.cod_org,
    ol.ORG_LOTACAO,
    ROUND(SUM(fc.vlr_trans), 2) AS total_gasto
FROM
    fato_cpgf fc
INNER JOIN 
    org_lotacao ol 
ON
    fc.cod_org = ol.COD_ORG_LOTACAO
GROUP BY
    fc.cod_org,
    ol.ORG_LOTACAO
HAVING
    SUM(fc.vlr_trans) > (SELECT avg_gasto FROM avgspending)
ORDER BY
    total_gasto DESC