# Aplicação para desnormalizar as dimensões e fato

In [1]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [311]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import functions as f
from pyspark.sql.types import *
from pyspark.sql import Window

In [3]:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions",spark.sparkContext.defaultParallelism)
spark.conf.set('spark.sql.legacy.parquet.datetimeRebaseModeInRead', 'LEGACY')
spark.conf.set('spark.sql.legacy.parquet.int96RebaseModeInWrite', 'LEGACY')
spark.conf.set('spark.sql.legacy.parquet.datetimeRebaseModeInWrite', 'LEGACY')

In [4]:
spark.sparkContext.getConf().getAll()

[('spark.driver.port', '37027'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', 'localhost'),
 ('spark.driver.memory', '16g'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.sql.warehouse.dir', 'file:/home/bosco/spark-etl/spark-warehouse'),
 ('spark.app.startTime', '1621172441864'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1621172442606'),
 ('spark.ui.showConsoleProgress', 'true')]

In [255]:
dwd_calendario = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_CALENDARIO.parquet')
dwd_edital = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_EDITAL.parquet')
dwd_elemento_despesa = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_ELEMENTO_DESPESA.parquet')
dwd_geografia = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_GEOGRAFIA.parquet')
dwd_instituicao_ensino = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_INSTITUICAO_ENSINO.parquet')
dwd_modalidade_bolsa = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_MODALIDADE_BOLSA.parquet')
dwd_moeda = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_MOEDA.parquet')
dwd_pessoa = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_PESSOA.parquet')
dwd_processo_projeto = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_PROCESSO_PROJETO.parquet')
dwd_processo_bolsa = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_PROCESSO_BOLSA.parquet')
dwd_processo_auxpe = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_PROCESSO_AUXPE.parquet')
dwd_programa = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_PROGRAMA.parquet')
dwd_rubrica = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_RUBRICA.parquet')
dwd_situacao_prestacao = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_SITUACAO_PRESTACAO.parquet')
dwd_situacao_processo = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_SITUACAO_PROCESSO.parquet')
dwd_tipo_beneficio = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_TIPO_BENEFICIO.parquet')
dwd_tipo_solicitacao = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_TIPO_SOLICITACAO.parquet')
dwd_unidade_organizacional = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWD_UNIDADE_ORGANIZACIONAL.parquet')
dwf_extrato_cartao_pesquisador = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWF_EXTRATO_CARTAO_PESQUISADOR.parquet')
dwf_pagamento = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWF_PAGAMENTO.parquet')
dwf_prestacao_conta = spark.read.parquet('/home/bosco/dw_dri_parquet/DW_DRI.DWF_PRESTACAO_CONTA.parquet')
dwd_beneficio_bolsa = spark.read.parquet('/home/bosco/dw_dri_parquet/dwd_beneficio_bolsa.parquet')

### Função para fazer cache

In [6]:
##Faz o cache da tabela reparticionada
def _cache(df):
    return df.sort(df.columns[0]).repartition(6, df.columns[0]).cache().count()

## Tabelas fato

### Pagamento

In [7]:
##Prefixo
pref_dwf_pagamento = 'pagamento_'
pref_dwd_rubrica_pgt = 'rubrica_'
pref_dwd_tipo_solicitacao_pgt = 'tp_solicitacao_'
pref_dwd_moeda_pgt = 'moeda_'

In [8]:
dwf_pagamento_pagamento = dwf_pagamento.select([f.col(c) for c in dwf_pagamento.columns if 'SK' in c] + \
    [f.col(c).alias(c.lower()) for c in dwf_pagamento.columns if 'SK' not in c])
dwd_rubrica_pgt = dwd_rubrica.select(['SK_RUBRICA']+[f.col(c).alias(pref_dwd_rubrica_pgt+c.lower()) for c in dwd_rubrica.columns])
dwd_tipo_solicitacao_pgt = dwd_tipo_solicitacao.select(['SK_TIPO_SOLICITACAO']+[f.col(c).alias(pref_dwd_tipo_solicitacao_pgt+c.lower()) for c in dwd_tipo_solicitacao.columns])
dwd_moeda_pgt = dwd_moeda.select(['SK_MOEDA']+[f.col(c).alias(pref_dwd_moeda_pgt+c.lower()) for c in dwd_moeda.columns])

In [9]:
pagamento = \
dwf_pagamento_pagamento \
.join(dwd_rubrica_pgt, 'SK_RUBRICA') \
.join(dwd_tipo_solicitacao_pgt, 'SK_TIPO_SOLICITACAO') \
.join(dwd_moeda_pgt, 'SK_MOEDA') \
.withColumn('dt_pagamento', f.to_date(f.col('SK_DATA_PAGAMENTO').cast('string'), 'yyyyMMdd')) \
.withColumn('dt_referencia', f.to_date(f.col('SK_DATA_REFERENCIA').cast('string'), 'yyyyMMdd')) \
.withColumn('processo_sk_processo', f.when(f.col('SK_PROCESSO_BOLSA')>0, f.col('SK_PROCESSO_BOLSA')).otherwise(f.lit(-1))) \
.withColumn('processo_sk_processo', f.when(f.col('SK_PROCESSO_AUXPE')>0, f.col('SK_PROCESSO_AUXPE')).otherwise(f.col('processo_sk_processo'))) \
.withColumnRenamed('SK_PROCESSO_BOLSA', 'bolsa_sk_processo') \
.withColumnRenamed('SK_PROCESSO_AUXPE', 'auxpe_sk_processo')
## Remove as SKs
pagamento = \
pagamento \
.select(['processo_sk_processo', 'bolsa_sk_processo', 'auxpe_sk_processo']+[f.col(c) for c in pagamento.columns if 'sk' not in c.lower()])
## Adiciona o prefixo 'pagamento_' as colunas
pagamento = \
pagamento \
.select([f.col(c).alias(pref_dwf_pagamento+c) for c in pagamento.columns])
## Transforma as colunas 'dt' em data
pagamento_columns = []
for i in [f.col(c) for c in pagamento.columns if 'sk' not in c.lower()]:
    if '_dt_' in str(i).lower():
        pagamento_columns.append(i.cast('date'))
    else:
        pagamento_columns.append(i)
pagamento = pagamento.select(\
                             [f.col(pref_dwf_pagamento+'processo_sk_processo').alias('processo_sk_processo'), \
                              f.col(pref_dwf_pagamento+'bolsa_sk_processo').alias('bolsa_sk_processo'), \
                              f.col(pref_dwf_pagamento+'auxpe_sk_processo').alias('auxpe_sk_processo')] \
                             + pagamento_columns)

#### Colunas calculadas

In [10]:
pagamento = pagamento \
                .withColumn(pref_dwf_pagamento+'vl_pago_origem', f.when(f.col(pref_dwf_pagamento+'tp_solicitacao_id_tipo_solicitacao').isin(1, 4), f.col(pref_dwf_pagamento+'vl_pagamento_origem')).otherwise(f.lit(0))) \
                .withColumn(pref_dwf_pagamento+'vl_empenhado_origem', f.when(f.col(pref_dwf_pagamento+'tp_solicitacao_id_tipo_solicitacao').isin(0), f.col(pref_dwf_pagamento+'vl_pagamento_origem')).otherwise(f.lit(0))) \
                .withColumn(pref_dwf_pagamento+'vl_pago_brl', f.when(f.col(pref_dwf_pagamento+'tp_solicitacao_id_tipo_solicitacao').isin(1, 4), f.col(pref_dwf_pagamento+'vl_pagamento_brl')).otherwise(f.lit(0))) \
                .withColumn(pref_dwf_pagamento+'vl_empenhado_brl', f.when(f.col(pref_dwf_pagamento+'tp_solicitacao_id_tipo_solicitacao').isin(0), f.col(pref_dwf_pagamento+'vl_pagamento_brl')).otherwise(f.lit(0))) \
                .withColumn(pref_dwf_pagamento+'vl_pago_usd', f.when(f.col(pref_dwf_pagamento+'tp_solicitacao_id_tipo_solicitacao').isin(1, 4), f.col(pref_dwf_pagamento+'vl_pagamento_usd')).otherwise(f.lit(0))) \
                .withColumn(pref_dwf_pagamento+'vl_empenhado_usd', f.when(f.col(pref_dwf_pagamento+'tp_solicitacao_id_tipo_solicitacao').isin(0), f.col(pref_dwf_pagamento+'vl_pagamento_usd')).otherwise(f.lit(0)))

In [11]:
%%time
_cache(pagamento)

CPU times: user 8.95 ms, sys: 3.45 ms, total: 12.4 ms
Wall time: 21.3 s


1507942

### Benefício concedido

In [86]:
##Prefixo
pref_dwd_beneficio_bolsa = 'beneficio_'
pref_dwd_rubrica_bb = 'rubrica_'
pref_dwd_moeda_bb = 'moeda_'

In [292]:
dwd_beneficio_bolsa_bb = dwd_beneficio_bolsa.select([f.col(c) for c in dwd_beneficio_bolsa.columns if 'ID' in c] + \
    [f.col(c).alias(c.lower()) for c in dwd_beneficio_bolsa.columns if 'ID' not in c]) \
    .drop('CD_MOEDA', 'VL_TOTAL')
dwd_rubrica_bb = dwd_rubrica.filter(f.col('in_registro_ativo')=='S').select(['ID_RUBRICA']+[f.col(c).alias(pref_dwd_rubrica_bb+c.lower()) for c in dwd_rubrica.columns])
dwd_moeda_bb = dwd_moeda.filter(f.col('in_registro_ativo')=='S').select(['ID_MOEDA']+[f.col(c).alias(pref_dwd_moeda_bb+c.lower()) for c in dwd_moeda.columns])
dwd_processo_bolsa_bb = dwd_processo_bolsa.filter(f.col('in_registro_ativo')=='S').select('SK_PROCESSO_BOLSA', f.col('ID_PROCESSO_BOLSA').alias('ID_PROCESSO'))

In [298]:
beneficio_bolsa = \
dwd_beneficio_bolsa_bb \
.join(dwd_rubrica_bb, 'ID_RUBRICA') \
.join(dwd_processo_bolsa_bb, 'ID_PROCESSO') \
.join(dwd_moeda_bb, 'ID_MOEDA') \
.withColumn('processo_sk_processo', f.col('SK_PROCESSO_BOLSA')) \
.withColumnRenamed('SK_PROCESSO_BOLSA', 'bolsa_sk_processo') \
## Remove as SKs
beneficio_bolsa = \
beneficio_bolsa \
.select(['processo_sk_processo', 'bolsa_sk_processo']+[f.col(c) for c in beneficio_bolsa.columns if 'sk' not in c.lower()]) \
.drop('ID_PROCESSO', 'ID_RUBRICA', 'ID_MOEDA')
## Adiciona o prefixo 'pagamento_' as colunas
beneficio_bolsa = \
beneficio_bolsa \
.select([f.col(c).alias(pref_dwd_beneficio_bolsa+c) for c in beneficio_bolsa.columns])
## Transforma as colunas 'dt' em data
beneficio_bolsa_columns = []
for i in [f.col(c) for c in beneficio_bolsa.columns if 'sk' not in c.lower()]:
    if '_dt_' in str(i).lower():
        beneficio_bolsa_columns.append(i.cast('date'))
    else:
        beneficio_bolsa_columns.append(i)
beneficio_bolsa = beneficio_bolsa.select(\
                             [f.col(pref_dwd_beneficio_bolsa+'processo_sk_processo').alias('processo_sk_processo'), \
                              f.col(pref_dwd_beneficio_bolsa+'bolsa_sk_processo').alias('bolsa_sk_processo')] \
                             + beneficio_bolsa_columns)
## UDF
def nr_parcelas(x):
    return [i for i in range(1, x+1)]
nr_parcelas_udf = f.udf(nr_parcelas, ArrayType(StringType()))
## Explode a lista
beneficio_bolsa = \
    beneficio_bolsa \
    .withColumn('beneficio_qt_rubrica', nr_parcelas_udf(f.col('beneficio_qt_rubrica'))) \
    .withColumn('beneficio_qt_rubrica', f.explode('beneficio_qt_rubrica'))

In [304]:
pagamento.columns

['processo_sk_processo',
 'bolsa_sk_processo',
 'auxpe_sk_processo',
 'pagamento_nr_solicitacao',
 'pagamento_vl_pagamento_origem',
 'pagamento_vl_pagamento_usd',
 'pagamento_vl_pagamento_brl',
 'pagamento_dh_carga',
 'pagamento___index_level_0__',
 'pagamento_rubrica_id_rubrica',
 'pagamento_rubrica_nm_rubrica',
 'pagamento_rubrica_cd_natureza_despesa',
 'pagamento_rubrica_ds_natureza_despesa',
 'pagamento_rubrica_in_mensalidade',
 'pagamento_rubrica_tp_frequencia_pagamento',
 'pagamento_rubrica_tp_natureza',
 'pagamento_rubrica_in_somente_dependente',
 'pagamento_rubrica_in_bolsa_exterior',
 'pagamento_rubrica_in_bolsa_pais',
 'pagamento_rubrica_in_auxilio_pais',
 'pagamento_rubrica_in_auxilio_exterior',
 'pagamento_rubrica_in_rubrica_ativa',
 'pagamento_rubrica_in_registro_ausente',
 'pagamento_rubrica_dt_carga',
 'pagamento_rubrica_dt_versionamento',
 'pagamento_rubrica_in_registro_ativo',
 'pagamento_tp_solicitacao_id_tipo_solicitacao',
 'pagamento_tp_solicitacao_ds_tipo_solicitac

In [324]:
window_pagamento = Window.partitionBy('processo_sk_processo', 'pagamento_rubrica_id_rubrica').orderBy('pagamento_dt_referencia')

pagamento\
.filter(f.col(pref_dwf_pagamento+'tp_solicitacao_id_tipo_solicitacao').isin(1, 4))\
.filter(f.col('bolsa_sk_processo').isin(-1, -2)==False)\
.filter(f.col('bolsa_sk_processo')==637584)\
.select('processo_sk_processo', 'bolsa_sk_processo', 'pagamento_rubrica_id_rubrica', 'pagamento_vl_pago_origem', 'pagamento_dt_pagamento', 'pagamento_dt_referencia') \
.sort('processo_sk_processo', 'pagamento_dt_referencia') \
.withColumn('rank', f.rank().over(window_pagamento)) \
.show()

+--------------------+-----------------+----------------------------+------------------------+----------------------+-----------------------+----+
|processo_sk_processo|bolsa_sk_processo|pagamento_rubrica_id_rubrica|pagamento_vl_pago_origem|pagamento_dt_pagamento|pagamento_dt_referencia|rank|
+--------------------+-----------------+----------------------------+------------------------+----------------------+-----------------------+----+
|              637584|           637584|                          38|                  1300.0|            2019-11-07|             2019-12-01|   1|
|              637584|           637584|                          37|                   665.0|            2020-07-08|             2020-05-01|   1|
|              637584|           637584|                          41|                  1300.0|            2019-11-07|             2019-12-01|   1|
|              637584|           637584|                          41|                  1300.0|            2019-11-07| 

### Prestação de contas

In [12]:
##Prefixo
pref_dwf_prestacao_conta = 'prestacao_'
pref_dwd_tipo_solicitacao_pc = 'tp_solicitacao_'
pref_dwd_elemento_despesa_pc = 'elem_despesa_'
pref_dwd_situacao_prestacao_pc = 'st_prestacao_'
pref_dwd_rubrica_pc = 'rubrica_'
pref_dwd_moeda_pc = 'moeda_'

In [13]:
dwf_prestacao_conta_pc = dwf_prestacao_conta.select([f.col(c) for c in dwf_prestacao_conta.columns if 'SK' in c] + \
    [f.col(c).alias(c.lower()) for c in dwf_prestacao_conta.columns if 'SK' not in c]) \
    .withColumnRenamed('SK_SITUACAO_PRESTACO', 'SK_SITUACAO_PRESTACAO') \
    .withColumn('SK_MOEDA', f.lit(1))
dwd_rubrica_pc = dwd_rubrica.select(['SK_RUBRICA']+[f.col(c).alias(pref_dwd_rubrica_pgt+c.lower()) for c in dwd_rubrica.columns])
dwd_tipo_solicitacao_pc = dwd_tipo_solicitacao.select(['SK_TIPO_SOLICITACAO']+[f.col(c).alias(pref_dwd_tipo_solicitacao_pc+c.lower()) for c in dwd_tipo_solicitacao.columns])
dwd_elemento_despesa_pc = dwd_elemento_despesa.select(['SK_ELEMENTO_DESPESA']+[f.col(c).alias(pref_dwd_elemento_despesa_pc+c.lower()) for c in dwd_elemento_despesa.columns])
dwd_situacao_prestacao_pc = dwd_situacao_prestacao.select(['SK_SITUACAO_PRESTACAO']+[f.col(c).alias(pref_dwd_situacao_prestacao_pc+c.lower()) for c in dwd_situacao_prestacao.columns])
dwd_moeda_pc = dwd_moeda.select(['SK_MOEDA']+[f.col(c).alias(pref_dwd_moeda_pgt+c.lower()) for c in dwd_moeda.columns])

In [14]:
prestacao_conta = \
dwf_prestacao_conta_pc \
.join(dwd_rubrica_pc, 'SK_RUBRICA') \
.join(dwd_tipo_solicitacao_pc, 'SK_TIPO_SOLICITACAO') \
.join(dwd_elemento_despesa_pc, 'SK_ELEMENTO_DESPESA') \
.join(dwd_situacao_prestacao_pc, 'SK_SITUACAO_PRESTACAO') \
.join(dwd_moeda_pgt, 'SK_MOEDA') \
.withColumn('dt_pagamento', f.to_date(f.col('SK_DATA_LANCAMENTO').cast('string'), 'yyyyMMdd')) \
.withColumn('dt_referencia', f.to_date(f.col('SK_DATA_LANCAMENTO').cast('string'), 'yyyyMMdd')) \
.withColumn('processo_sk_processo', f.col('SK_PROCESSO_AUXPE')) \
.withColumnRenamed('SK_PROCESSO_AUXPE', 'auxpe_sk_processo')
## Remove as SKs
prestacao_conta = \
prestacao_conta \
.select(['processo_sk_processo', 'auxpe_sk_processo']+[f.col(c) for c in prestacao_conta.columns if 'sk' not in c.lower()])
## Adiciona o prefixo 'pagamento_' as colunas
prestacao_conta = \
prestacao_conta \
.select([f.col(c).alias(pref_dwf_prestacao_conta+c) for c in prestacao_conta.columns])
## Transforma as colunas 'dt' em data
prestacao_conta_columns = []
for i in [f.col(c) for c in prestacao_conta.columns if 'sk' not in c.lower()]:
    if '_dt_' in str(i).lower():
        prestacao_conta_columns.append(i.cast('date'))
    else:
        prestacao_conta_columns.append(i)
prestacao_conta = prestacao_conta.select(\
                             [f.col(pref_dwf_prestacao_conta+'processo_sk_processo').alias('processo_sk_processo'), \
                              f.col(pref_dwf_prestacao_conta+'auxpe_sk_processo').alias('auxpe_sk_processo')] \
                             + prestacao_conta_columns)

#### Colunas calculadas

In [15]:
prestacao_conta = prestacao_conta \
                    .withColumn('vl_prestado_conta', \
                                f.when((f.col('prestacao_tp_solicitacao_id_tipo_solicitacao').isin(5) | f.col('prestacao_st_prestacao_id_situacao_prestacao').isin(0)), \
                                       f.col('prestacao_vl_lancado')).otherwise(f.lit(0))) \
                    .withColumn('vl_devolvido', \
                                f.when((f.col('prestacao_tp_solicitacao_id_tipo_solicitacao').isin(4) | f.col('prestacao_st_prestacao_id_situacao_prestacao').isin(0)), \
                                       f.col('prestacao_vl_lancado')).otherwise(f.lit(0))) 


### Gasto no cartão pesquisador

In [16]:
##Prefixo
pref_dwf_extrato_cartao_pesquisador = 'cartao_'
pref_dwd_moeda_pc = 'moeda_'

In [17]:
dwf_extrato_cartao_pesquisador_cartao = dwf_extrato_cartao_pesquisador.select([f.col(c) for c in dwf_extrato_cartao_pesquisador.columns if 'SK' in c] \
    + [f.lit(1).alias('SK_MOEDA')] \
    + [f.col(c).alias(c.lower()) for c in dwf_extrato_cartao_pesquisador.columns if 'SK' not in c])
dwd_moeda_cartao = dwd_moeda.select(['SK_MOEDA']+[f.col(c).alias(pref_dwd_moeda_pgt+c.lower()) for c in dwd_moeda.columns])

In [18]:
cartao = \
dwf_extrato_cartao_pesquisador_cartao \
.join(dwd_moeda_pgt, 'SK_MOEDA') \
.withColumn('dt_pagamento', f.to_date(f.col('SK_DATA_TRANSACAO').cast('string'), 'yyyyMMdd')) \
.withColumn('dt_referencia', f.to_date(f.col('SK_DATA_TRANSACAO').cast('string'), 'yyyyMMdd')) \
.withColumn('processo_sk_processo', f.col('SK_PROCESSO_AUXPE')) \
.withColumnRenamed('SK_PROCESSO_AUXPE', 'auxpe_sk_processo')
## Remove as SKs
cartao = \
cartao \
.select(['processo_sk_processo', 'auxpe_sk_processo']+[f.col(c) for c in cartao.columns if 'sk' not in c.lower()])
## Adiciona o prefixo 'cartao_' as colunas
cartao = \
cartao \
.select([f.col(c).alias(pref_dwf_extrato_cartao_pesquisador+c) for c in cartao.columns])
## Transforma as colunas 'dt' em data
cartao_columns = []
for i in [f.col(c) for c in cartao.columns if 'sk' not in c.lower()]:
    if '_dt_' in str(i).lower():
        cartao_columns.append(i.cast('date'))
    else:
        cartao_columns.append(i)
cartao = cartao.select(\
                        [f.col(pref_dwf_extrato_cartao_pesquisador+'processo_sk_processo').alias('processo_sk_processo'), \
                         f.col(pref_dwf_extrato_cartao_pesquisador+'auxpe_sk_processo').alias('auxpe_sk_processo')] \
                         + cartao_columns)

## Union tabelas fato

In [163]:
pagamento_union = pagamento.select([f.col(c).alias(str(c).replace(pref_dwf_pagamento, "")) for c in pagamento.columns]) \
                   .withColumn('nm_tabela', f.lit('pagamento'))
prestacao_conta_union = prestacao_conta.select([f.col(c).alias(str(c).replace(pref_dwf_prestacao_conta, "")) for c in prestacao_conta.columns]) \
                   .withColumn('nm_tabela', f.lit('prestacao'))
cartao_union = cartao.select([f.col(c).alias(str(c).replace(pref_dwf_extrato_cartao_pesquisador, "")) for c in cartao.columns]) \
                   .withColumn('nm_tabela', f.lit('cartao'))
beneficio_bolsa_union = beneficio_bolsa.select([f.col(c).alias(str(c).replace(pref_dwd_beneficio_bolsa, "")) for c in beneficio_bolsa.columns]) \
                           .withColumn('nm_tabela', f.lit('beneficio'))
## Prefixo das colunas em comum
pref_fatos = 'fato_'

## Renomeia as colunas exclusivas Pagamento
colunas_fato = []
colunas_pagamento = []
for i in pagamento_union.columns:
    if i not in prestacao_conta_union.columns+cartao_union.columns:
        colunas_pagamento.append(f.col(i).alias(pref_dwf_pagamento+i))
    if i in prestacao_conta_union.columns+cartao_union.columns:
        colunas_fato.append(f.col(i).alias(pref_fatos+i))
colunas_fato_pagamento = colunas_fato+colunas_pagamento

## Renomeia as colunas exclusivas Prestação
colunas_fato = []
colunas_prestacao = []
for i in prestacao_conta_union.columns:
    if i not in pagamento_union.columns+cartao_union.columns:
        colunas_prestacao.append(f.col(i).alias(pref_dwf_prestacao_conta+i))
    if i in pagamento_union.columns+cartao_union.columns:
        colunas_fato.append(f.col(i).alias(pref_fatos+i))
colunas_fato_prestacao = colunas_fato+colunas_prestacao

## Renomeia as colunas exclusivas Cartão
colunas_fato = []
colunas_cartao = []
for i in cartao_union.columns:
    if i not in prestacao_conta_union.columns+pagamento_union.columns:
        colunas_cartao.append(f.col(i).alias(pref_dwf_extrato_cartao_pesquisador+i))
    if i in prestacao_conta_union.columns+pagamento_union.columns:
        colunas_fato.append(f.col(i).alias(pref_fatos+i))
colunas_fato_cartao = colunas_fato+colunas_cartao

## Renomeia as colunas exclusivas Cartão
colunas_fato = []
colunas_beneficio = []
for i in beneficio_bolsa_union.columns:
    if i not in prestacao_conta_union.columns+pagamento_union.columns+cartao_union.columns:
        colunas_beneficio.append(f.col(i).alias(pref_dwd_beneficio_bolsa+i))
    if i in prestacao_conta_union.columns+pagamento_union.columns+cartao_union.columns:
        colunas_fato.append(f.col(i).alias(pref_fatos+i))
colunas_fato_beneficio = colunas_fato+colunas_beneficio

## Tabelas renomeadas
pagamento_union = pagamento_union.select(colunas_fato_pagamento) \
                    .withColumnRenamed('fato_processo_sk_processo', 'processo_sk_processo') \
                    .withColumnRenamed('fato_auxpe_sk_processo', 'auxpe_sk_processo') \
                    .withColumnRenamed('pagamento_bolsa_sk_processo', 'bolsa_sk_processo')
prestacao_conta_union = prestacao_conta_union.select(colunas_fato_prestacao) \
                    .withColumnRenamed('fato_processo_sk_processo', 'processo_sk_processo') \
                    .withColumnRenamed('fato_auxpe_sk_processo', 'auxpe_sk_processo')
cartao_union = cartao_union.select(colunas_fato_cartao) \
                    .withColumnRenamed('fato_processo_sk_processo', 'processo_sk_processo') \
                    .withColumnRenamed('fato_auxpe_sk_processo', 'auxpe_sk_processo')
beneficio_bolsa_union = beneficio_bolsa_union.select(colunas_fato_beneficio) \
                        .withColumnRenamed('fato_processo_sk_processo', 'processo_sk_processo') \
                        .withColumnRenamed('fato_bolsa_sk_processo', 'bolsa_sk_processo')

In [164]:
pagamento_prestacao_cartao = \
    pagamento_union \
    .unionByName(prestacao_conta_union, allowMissingColumns=True) \
    .unionByName(cartao_union, allowMissingColumns=True)
pagamento_beneficio = \
    pagamento_union \
    .unionByName(beneficio_bolsa_union, allowMissingColumns=True)

#### Colunas calculadas

In [165]:
pagamento_prestacao_cartao = pagamento_prestacao_cartao_beneficio \
    .withColumn(pref_fatos+'vl_pago_origem', f.col('pagamento_vl_pago_origem')) \
    .withColumn(pref_fatos+'vl_empenhado_origem', f.col('pagamento_vl_empenhado_origem')) \
    .withColumn(pref_fatos+'vl_pago_brl', f.col('pagamento_vl_pago_brl')) \
    .withColumn(pref_fatos+'vl_empenhado_brl', f.col('pagamento_vl_empenhado_brl')) \
    .withColumn(pref_fatos+'vl_pago_usd', f.col('pagamento_vl_pago_usd')) \
    .withColumn(pref_fatos+'vl_empenhado_usd', f.col('pagamento_vl_empenhado_usd')) \
    .withColumn(pref_fatos+'vl_prestado_conta', f.col('prestacao_vl_prestado_conta')) \
    .withColumn(pref_fatos+'vl_devolvido', f.col('prestacao_vl_devolvido')) \
    .withColumn(pref_fatos+'vl_gasto_cartao', f.col('cartao_vl_transacao_brl')) \
    .na.fill({pref_fatos+'vl_pago_origem': 0, pref_fatos+'vl_empenhado_origem': 0, pref_fatos+'vl_pago_brl': 0, pref_fatos+'vl_empenhado_brl': 0,
             pref_fatos+'vl_pago_usd': 0, pref_fatos+'vl_empenhado_usd': 0, pref_fatos+'vl_prestado_conta': 0, pref_fatos+'vl_devolvido': 0,
             pref_fatos+'vl_gasto_cartao': 0})

In [166]:
pagamento_beneficio = pagamento_beneficio \
    .withColumn(pref_fatos+'vl_pago_origem', f.col('pagamento_vl_pago_origem')) \
    .withColumn(pref_fatos+'vl_pago_brl', f.col('pagamento_vl_pago_brl')) \
    .withColumn(pref_fatos+'vl_pago_usd', f.col('pagamento_vl_pago_usd')) \
    .withColumn(pref_fatos+'vl_beneficio_bolsa', f.col('beneficio_vl_rubrica')) \
    .na.fill({pref_fatos+'vl_pago_origem': 0, pref_fatos+'vl_pago_brl': 0,
             pref_fatos+'vl_pago_usd': 0, pref_fatos+'vl_beneficio_bolsa': 0})

In [22]:
%%time
_cache(pagamento_prestacao_cartao)

CPU times: user 13.8 ms, sys: 5.59 ms, total: 19.4 ms
Wall time: 23.8 s


1646279

## Tabelas dimensão

### Projeto

In [23]:
##Prefixo
pref_dwd_processo_projeto_proj = 'proj_'
pref_dwd_unidade_organizacional_proj = 'uo_'
pref_dwd_programa_proj = 'programa_'
pref_dwd_edital = 'edital_'
pref_dwd_tipo_beneficio = 'tp_beneficio_'
pref_ies_principal = 'ies_princip_'
pref_geografia_resp_principal = 'geo_resp_princp_'
pref_responsavel_principal = 'resp_princp_'
pref_dwd_situacao_processo_proj = 'st_proc_'

In [24]:
dwd_processo_projeto_proj = dwd_processo_projeto.select([f.col(c) for c in dwd_processo_projeto.columns if 'SK' in c] + \
    [f.col(c).alias(c.lower()) for c in dwd_processo_projeto.columns if 'SK' not in c]) \
    .filter(f.col('in_registro_ativo')=='S')
dwd_unidade_organizacional_proj = dwd_unidade_organizacional.select(['SK_UNIDADE_ORGANIZACIONAL']+[f.col(c).alias(pref_dwd_unidade_organizacional_proj+c.lower()) for c in dwd_unidade_organizacional.columns])
dwd_programa_proj = dwd_programa.select(['SK_PROGRAMA']+[f.col(c).alias(pref_dwd_programa_proj+c.lower()) for c in dwd_programa.columns])
dwd_edital_proj = dwd_edital.select(['SK_EDITAL']+[f.col(c).alias(pref_dwd_edital+c.lower()) for c in dwd_edital.columns])
dwd_tipo_beneficio_proj = dwd_tipo_beneficio.select(['SK_TIPO_BENEFICIO']+[f.col(c).alias(pref_dwd_tipo_beneficio+c.lower()) for c in dwd_tipo_beneficio.columns])
ies_principal = dwd_instituicao_ensino.withColumnRenamed('SK_INSTITUICAO_ENSINO', 'SK_IES_PRINCIPAL')
ies_principal = ies_principal.select(['SK_IES_PRINCIPAL']+[f.col(c).alias(pref_ies_principal+c.lower()) for c in ies_principal.columns])
geografia_resp_principal = dwd_geografia.withColumnRenamed('SK_GEOGRAFIA', 'SK_GEOGRAFIA_RESP_PRINCIPAL')
geografia_resp_principal = geografia_resp_principal.select(['SK_GEOGRAFIA_RESP_PRINCIPAL']+[f.col(c).alias(pref_geografia_resp_principal+c.lower()) for c in geografia_resp_principal.columns])
responsavel_principal = dwd_pessoa.withColumnRenamed('SK_PESSOA', 'SK_RESPONSAVEL_PRINCIPAL')
responsavel_principal = responsavel_principal.select(['SK_RESPONSAVEL_PRINCIPAL']+[f.col(c).alias(pref_responsavel_principal+c.lower()) for c in responsavel_principal.columns])
dwd_situacao_processo_proj = dwd_situacao_processo.select(['SK_SITUACAO_PROCESSO']+[f.col(c).alias(pref_dwd_situacao_processo_proj+c.lower()) for c in dwd_situacao_processo.columns])

In [25]:
projeto = \
dwd_processo_projeto_proj \
.join(dwd_unidade_organizacional_proj, 'SK_UNIDADE_ORGANIZACIONAL') \
.join(dwd_programa_proj, 'SK_PROGRAMA') \
.join(dwd_edital_proj, 'SK_EDITAL') \
.join(dwd_tipo_beneficio_proj, 'SK_TIPO_BENEFICIO') \
.join(ies_principal, 'SK_IES_PRINCIPAL') \
.join(geografia_resp_principal, 'SK_GEOGRAFIA_RESP_PRINCIPAL') \
.join(responsavel_principal, 'SK_RESPONSAVEL_PRINCIPAL') \
.join(dwd_situacao_processo_proj, 'SK_SITUACAO_PROCESSO')
## Remove as SKs
projeto = \
projeto \
.select(['sk_processo_projeto']+[f.col(c) for c in projeto.columns if 'sk' not in c.lower()])
## Adiciona o prefixo 'proj_' as colunas
projeto = \
projeto \
.select([f.col(c).alias(pref_dwd_processo_projeto_proj+c) for c in projeto.columns])
## Transforma as colunas 'dt' em data
proj_columns = []
for i in [f.col(c) for c in projeto.columns]:
    if '_dt_' in str(i).lower():
        proj_columns.append(i.cast('date'))
    else:
        proj_columns.append(i)
projeto = \
projeto \
.select(proj_columns)

### Bolsas

In [26]:
##Prefixo
pref_dwd_processo_bolsa_bolsa = 'bolsa_'
pref_dwd_unidade_organizacional_bolsa = 'uo_'
pref_dwd_programa_bolsa = 'programa_'
pref_dwd_edital_bolsa = 'edital_'
pref_dwd_tipo_beneficio_bolsa = 'tp_beneficio_'
pref_bolsista_bolsa = 'beneficiario_'
pref_geografia_exterior_bolsa = 'geo_exterior_'
pref_geografia_brasil_bolsa = 'geo_brasil_'
pref_ies_origem_bolsa = 'ies_ori_'
pref_geografia_ies_origem_bolsa = 'geo_ies_ori_'
pref_ies_estudo_bolsa = 'ies_estu_'
pref_geografia_ies_estudo_bolsa = 'geo_ies_estu_'
pref_dwd_modalidade_bolsa = 'modalidade_'
pref_dwd_situacao_processo_bolsa = 'st_proc_'

In [27]:
dwd_processo_bolsa_bolsa = dwd_processo_bolsa.select([f.col(c) for c in dwd_processo_bolsa.columns if 'SK' in c] + \
    [f.col(c).alias(c.lower()) for c in dwd_processo_bolsa.columns if 'SK' not in c]) \
    .withColumnRenamed('SK_PROCESSO_PROJETO', 'proj_sk_processo_projeto') \
    .filter(f.col('in_registro_ativo')=='S')
dwd_unidade_organizacional_bolsa = dwd_unidade_organizacional.select(['SK_UNIDADE_ORGANIZACIONAL']+[f.col(c).alias(pref_dwd_unidade_organizacional_bolsa+c.lower()) for c in dwd_unidade_organizacional.columns])
dwd_programa_bolsa = dwd_programa.select(['SK_PROGRAMA']+[f.col(c).alias(pref_dwd_programa_bolsa+c.lower()) for c in dwd_programa.columns])
dwd_edital_bolsa = dwd_edital.select(['SK_EDITAL']+[f.col(c).alias(pref_dwd_edital_bolsa+c.lower()) for c in dwd_edital.columns])
dwd_tipo_beneficio_bolsa = dwd_tipo_beneficio.select(['SK_TIPO_BENEFICIO']+[f.col(c).alias(pref_dwd_tipo_beneficio_bolsa+c.lower()) for c in dwd_tipo_beneficio.columns])
bolsista = dwd_pessoa.withColumnRenamed('SK_PESSOA', 'SK_BOLSISTA')
bolsista = bolsista.select(['SK_BOLSISTA']+[f.col(c).alias(pref_bolsista_bolsa+c.lower()) for c in bolsista.columns])
geografia_exterior = dwd_geografia.withColumnRenamed('SK_GEOGRAFIA', 'SK_GEO_EXTERIOR')
geografia_exterior = geografia_exterior.select(['SK_GEO_EXTERIOR']+[f.col(c).alias(pref_geografia_exterior_bolsa+c.lower()) for c in geografia_exterior.columns])
geografia_brasil = dwd_geografia.withColumnRenamed('SK_GEOGRAFIA', 'SK_GEO_BRASIL')
geografia_brasil = geografia_brasil.select(['SK_GEO_BRASIL']+[f.col(c).alias(pref_geografia_brasil_bolsa+c.lower()) for c in geografia_brasil.columns])
ies_origem = dwd_instituicao_ensino.withColumnRenamed('SK_INSTITUICAO_ENSINO', 'SK_IES_ORIGEM')
ies_origem = ies_origem.select(['SK_IES_ORIGEM']+[f.col(c).alias(pref_ies_origem_bolsa+c.lower()) for c in ies_origem.columns])
geografia_ies_origem = dwd_geografia.withColumnRenamed('SK_GEOGRAFIA', 'SK_GEOGRAFIA_IES_ORIGEM')
geografia_ies_origem = geografia_ies_origem.select(['SK_GEOGRAFIA_IES_ORIGEM']+[f.col(c).alias(pref_geografia_ies_origem_bolsa+c.lower()) for c in geografia_ies_origem.columns])
ies_estudo = dwd_instituicao_ensino.withColumnRenamed('SK_INSTITUICAO_ENSINO', 'SK_IES_ESTUDO')
ies_estudo = ies_estudo.select(['SK_IES_ESTUDO']+[f.col(c).alias(pref_ies_estudo_bolsa+c.lower()) for c in ies_estudo.columns])
geografia_ies_estudo = dwd_geografia.withColumnRenamed('SK_GEOGRAFIA', 'SK_GEOGRAFIA_IES_ESTUDO')
geografia_ies_estudo = geografia_ies_estudo.select(['SK_GEOGRAFIA_IES_ESTUDO']+[f.col(c).alias(pref_geografia_ies_estudo_bolsa+c.lower()) for c in geografia_ies_estudo.columns])
dwd_modalidade_bolsa_bolsa = dwd_modalidade_bolsa.select(['SK_MODALIDADE_BOLSA']+[f.col(c).alias(pref_dwd_modalidade_bolsa+c.lower()) for c in dwd_modalidade_bolsa.columns])
dwd_situacao_processo_bolsa = dwd_situacao_processo.select(['SK_SITUACAO_PROCESSO']+[f.col(c).alias(pref_dwd_situacao_processo_bolsa+c.lower()) for c in dwd_situacao_processo.columns])

In [28]:
bolsa = \
dwd_processo_bolsa_bolsa \
.join(projeto, 'proj_sk_processo_projeto') \
.join(dwd_unidade_organizacional_bolsa, 'SK_UNIDADE_ORGANIZACIONAL') \
.join(dwd_programa_bolsa, 'SK_PROGRAMA') \
.join(dwd_edital_bolsa, 'SK_EDITAL') \
.join(dwd_tipo_beneficio_bolsa, 'SK_TIPO_BENEFICIO') \
.join(bolsista, 'SK_BOLSISTA') \
.join(geografia_exterior, 'SK_GEO_EXTERIOR') \
.join(geografia_brasil, 'SK_GEO_BRASIL') \
.join(ies_origem, 'SK_IES_ORIGEM') \
.join(geografia_ies_origem, 'SK_GEOGRAFIA_IES_ORIGEM') \
.join(ies_estudo, 'SK_IES_ESTUDO') \
.join(dwd_modalidade_bolsa_bolsa, 'SK_MODALIDADE_BOLSA') \
.join(dwd_situacao_processo_bolsa, 'SK_SITUACAO_PROCESSO')
#.join(geografia_ies_estudo, 'SK_GEOGRAFIA_IES_ESTUDO') \
## Remove as SKs
bolsa = \
bolsa \
.select([f.col('sk_processo_bolsa').alias('sk_processo')]+[f.col(c) for c in bolsa.columns if 'sk' not in c.lower()])
## Adiciona o prefixo 'bolsa_' as colunas
bolsa = \
bolsa \
.select([f.col(c).alias(pref_dwd_processo_bolsa_bolsa+c) for c in bolsa.columns if 'proj_' not in c]+[f.col(c) for c in bolsa.columns if 'proj_' in c])
## Transforma as colunas 'dt' em data
bolsa_columns = []
for i in [f.col(c) for c in bolsa.columns]:
    if '_dt_' in str(i).lower():
        bolsa_columns.append(i.cast('date'))
    else:
        bolsa_columns.append(i)
bolsa = \
bolsa \
.select(bolsa_columns)

### AUXPE

In [29]:
##Prefixo
pref_dwd_processo_auxpe_auxpe = 'auxpe_'
pref_dwd_unidade_organizacional_auxpe = 'uo_'
pref_dwd_programa_auxpe = 'programa_'
pref_dwd_edital_auxpe = 'edital_'
pref_dwd_tipo_beneficio_auxpe = 'tp_beneficio_'
pref_beneficiario_auxpe = 'beneficiario_'
pref_geografia_exterior_auxpe = 'geo_exterior_'
pref_geografia_brasil_auxpe = 'geo_brasil_'
pref_ies_origem_auxpe = 'ies_ori_'
pref_geografia_ies_origem_auxpe = 'geo_ies_ori_'
pref_ies_estudo_auxpe = 'ies_estu_'
pref_geografia_ies_estudo_auxpe = 'geo_ies_estu_'
pref_dwd_situacao_processo_auxpe = 'st_proc_'

In [30]:
dwd_processo_auxpe_auxpe = dwd_processo_auxpe.select([f.col(c) for c in dwd_processo_auxpe.columns if 'SK' in c] + \
    [f.col(c).alias(c.lower()) for c in dwd_processo_auxpe.columns if 'SK' not in c]) \
    .withColumnRenamed('SK_PROCESSO_PROJETO', 'proj_sk_processo_projeto') \
    .filter(f.col('in_registro_ativo')=='S')
dwd_unidade_organizacional_auxpe = dwd_unidade_organizacional.select(['SK_UNIDADE_ORGANIZACIONAL']+[f.col(c).alias(pref_dwd_unidade_organizacional_auxpe+c.lower()) for c in dwd_unidade_organizacional.columns])
dwd_programa_auxpe = dwd_programa.select(['SK_PROGRAMA']+[f.col(c).alias(pref_dwd_programa_auxpe+c.lower()) for c in dwd_programa.columns])
dwd_edital_auxpe = dwd_edital.select(['SK_EDITAL']+[f.col(c).alias(pref_dwd_edital_auxpe+c.lower()) for c in dwd_edital.columns])
dwd_tipo_beneficio_auxpe = dwd_tipo_beneficio.select(['SK_TIPO_BENEFICIO']+[f.col(c).alias(pref_dwd_tipo_beneficio_auxpe+c.lower()) for c in dwd_tipo_beneficio.columns])
beneficiario = dwd_pessoa.withColumnRenamed('SK_PESSOA', 'SK_BENEFICIARIO')
beneficiario = beneficiario.select(['SK_BENEFICIARIO']+[f.col(c).alias(pref_beneficiario_auxpe+c.lower()) for c in beneficiario.columns])
geografia_exterior = dwd_geografia.withColumnRenamed('SK_GEOGRAFIA', 'SK_GEO_EXTERIOR')
geografia_exterior = geografia_exterior.select(['SK_GEO_EXTERIOR']+[f.col(c).alias(pref_geografia_exterior_auxpe+c.lower()) for c in geografia_exterior.columns])
geografia_brasil = dwd_geografia.withColumnRenamed('SK_GEOGRAFIA', 'SK_GEO_BRASIL')
geografia_brasil = geografia_brasil.select(['SK_GEO_BRASIL']+[f.col(c).alias(pref_geografia_brasil_auxpe+c.lower()) for c in geografia_brasil.columns])
ies_origem = dwd_instituicao_ensino.withColumnRenamed('SK_INSTITUICAO_ENSINO', 'SK_IES_ORIGEM')
ies_origem = ies_origem.select(['SK_IES_ORIGEM']+[f.col(c).alias(pref_ies_origem_auxpe+c.lower()) for c in ies_origem.columns])
geografia_ies_origem = dwd_geografia.withColumnRenamed('SK_GEOGRAFIA', 'SK_GEOGRAFIA_IES_ORIGEM')
geografia_ies_origem = geografia_ies_origem.select(['SK_GEOGRAFIA_IES_ORIGEM']+[f.col(c).alias(pref_geografia_ies_origem_auxpe+c.lower()) for c in geografia_ies_origem.columns])
ies_estudo = dwd_instituicao_ensino.withColumnRenamed('SK_INSTITUICAO_ENSINO', 'SK_IES_ESTUDO')
ies_estudo = ies_estudo.select(['SK_IES_ESTUDO']+[f.col(c).alias(pref_ies_estudo_auxpe+c.lower()) for c in ies_estudo.columns])
geografia_ies_estudo = dwd_geografia.withColumnRenamed('SK_GEOGRAFIA', 'SK_GEOGRAFIA_IES_ESTUDO')
geografia_ies_estudo = geografia_ies_estudo.select(['SK_GEOGRAFIA_IES_ESTUDO']+[f.col(c).alias(pref_geografia_ies_estudo_auxpe+c.lower()) for c in geografia_ies_estudo.columns])
dwd_situacao_processo_auxpe = dwd_situacao_processo.select(['SK_SITUACAO_PROCESSO']+[f.col(c).alias(pref_dwd_situacao_processo_auxpe+c.lower()) for c in dwd_situacao_processo.columns])

In [31]:
auxpe = \
dwd_processo_auxpe_auxpe \
.join(projeto, 'proj_sk_processo_projeto') \
.join(dwd_unidade_organizacional_auxpe, 'SK_UNIDADE_ORGANIZACIONAL') \
.join(dwd_programa_auxpe, 'SK_PROGRAMA') \
.join(dwd_edital_auxpe, 'SK_EDITAL') \
.join(dwd_tipo_beneficio_auxpe, 'SK_TIPO_BENEFICIO') \
.join(beneficiario, 'SK_BENEFICIARIO') \
.join(geografia_exterior, 'SK_GEO_EXTERIOR') \
.join(geografia_brasil, 'SK_GEO_BRASIL') \
.join(ies_origem, 'SK_IES_ORIGEM') \
.join(geografia_ies_origem, 'SK_GEOGRAFIA_IES_ORIGEM') \
.join(ies_estudo, 'SK_IES_ESTUDO') \
.join(dwd_situacao_processo_auxpe, 'SK_SITUACAO_PROCESSO')
## Remove as SKs
auxpe = \
auxpe \
.select([f.col('sk_processo_auxpe').alias('sk_processo')]+[f.col(c) for c in auxpe.columns if 'sk' not in c.lower()])
## Adiciona o prefixo 'auxpe_' as colunas
auxpe = \
auxpe \
.select([f.col(c).alias(pref_dwd_processo_auxpe_auxpe+c) for c in auxpe.columns if 'proj_' not in c]+[f.col(c) for c in auxpe.columns if 'proj_' in c])
## Transforma as colunas 'dt' em data
auxpe_columns = []
for i in [f.col(c) for c in auxpe.columns]:
    if '_dt_' in str(i).lower():
        auxpe_columns.append(i.cast('date'))
    else:
        auxpe_columns.append(i)
auxpe = \
auxpe \
.select(auxpe_columns)

## Union Bolsa e AUXPE

In [32]:
auxpe_union = auxpe.select([f.col(c).alias(str(c).replace("auxpe_", "")) for c in auxpe.columns]) \
                   .withColumnRenamed('id_processo_auxpe', 'id_processo')
bolsa_union = bolsa.select([f.col(c).alias(str(c).replace("bolsa_", "")) for c in bolsa.columns]) \
                   .withColumnRenamed('id_processo_bolsa', 'id_processo') \
                   .withColumn('dt_termino', f.col('dt_termino_acompanhamento'))
## Renomeia as colunas exclusivas AUXPE
colunas_processo = []
colunas_auxpe = []
for i in auxpe_union.columns:
    if i not in bolsa_union.columns:
        colunas_auxpe.append(f.col(i).alias('auxpe_'+i))
    if i in bolsa_union.columns:
        colunas_processo.append(f.col(i).alias('processo_'+i))
colunas_processo_auxpe = colunas_processo+colunas_auxpe
## Renomeia as colunas exclusivas Bolsa
colunas_processo = []
colunas_bolsa = []
for i in bolsa_union.columns:
    if i not in auxpe_union.columns:
        colunas_bolsa.append(f.col(i).alias('bolsa_'+i))
    if i in auxpe_union.columns:
        colunas_processo.append(f.col(i).alias('processo_'+i))
colunas_processo_bolsa = colunas_processo+colunas_bolsa
auxpe_union = auxpe_union.select(colunas_processo_auxpe)
bolsa_union = bolsa_union.select(colunas_processo_bolsa)

In [33]:
bolsa_auxpe = bolsa_union.unionByName(auxpe_union, allowMissingColumns=True)
bolsa_auxpe = bolsa_auxpe.select([f.col(c) for c in bolsa_auxpe.columns if 'proj_' not in c]+[f.col(c).alias(c.replace('processo_', '')) for c in bolsa_auxpe.columns if 'proj_' in c])

### Junção pagamento e processo

#### Pagamento AUXPE

In [34]:
pagamento_auxpe = pagamento_prestacao_cartao.join(f.broadcast(auxpe), 'auxpe_sk_processo', 'right')

#### Pagamento Bolsa

In [149]:
pagamento_bolsa = pagamento_beneficio.join(f.broadcast(bolsa), 'bolsa_sk_processo', 'right')

#### Pagamento Bolsa AUXPE

In [36]:
pagamento_bolsa_auxpe = pagamento_prestacao_cartao.join(f.broadcast(bolsa_auxpe), 'processo_sk_processo', 'right')

### Salva df.csv.zip

In [45]:
##Salva CSV
def salva_csv(df, nm):
    compression_opts = dict(method='zip', archive_name=f'{nm}.csv')
    df.toPandas().to_csv(path_or_buf=f'/home/bosco/{nm}.zip', sep=';', na_rep='', float_format=None, columns=None, header=True, index=False, index_label=None, mode='w', encoding=None, compression=compression_opts, quoting=None, quotechar='"', line_terminator=None, chunksize=200000, date_format=None, doublequote=True, escapechar=None, decimal=',', errors='strict', storage_options=None)

##Salva CSV
def salva_csv(df, nm):
    df.repartition(6).write.csv(f'/home/bosco/{nm}', sep=';', header=True, mode='overwrite')

In [37]:
##Salva parquet
def salva_csv(df, nm):
    df.write.parquet(f'/home/bosco/dw_desnormalizado/{nm}', mode='overwrite')

In [38]:
%%time
salva_csv(projeto, 'projeto')

CPU times: user 2.9 ms, sys: 1.19 ms, total: 4.09 ms
Wall time: 3.96 s


In [46]:
%%time
salva_csv(bolsa, 'bolsa')

CPU times: user 43.6 s, sys: 270 ms, total: 43.9 s
Wall time: 1min 14s


In [47]:
%%time
salva_csv(auxpe, 'auxpe')

CPU times: user 1.58 s, sys: 7.93 ms, total: 1.59 s
Wall time: 7.88 s


In [49]:
%%time
salva_csv(bolsa_auxpe, 'bolsa_auxpe')

CPU times: user 44.9 s, sys: 1.3 s, total: 46.2 s
Wall time: 1min 17s


In [42]:
%%time
salva_csv(pagamento_auxpe, 'pagamento_auxpe')

CPU times: user 35.6 ms, sys: 0 ns, total: 35.6 ms
Wall time: 2min 9s


In [43]:
%%time
salva_csv(pagamento_bolsa, 'pagamento_bolsa')

CPU times: user 17.3 ms, sys: 6.31 ms, total: 23.6 ms
Wall time: 1min 10s


In [44]:
%%time
salva_csv(pagamento_bolsa_auxpe, 'pagamento_bolsa_auxpe')

CPU times: user 15.1 ms, sys: 6.23 ms, total: 21.3 ms
Wall time: 1min 13s
