In [0]:
from pyspark.sql.functions import lit, round, concat, col, sum, explode, split, regexp_replace, when, upper
from small_files.small_files import SmallFiles
from pyspark.sql import SparkSession
from datetime import datetime
from sys import argv
import os


class Conciliacao_Contabil:
    global spark
    try:
        spark = SparkSession.builder \
            .appName('CRA_Conciliacao_Contabil') \
            .enableHiveSupport() \
            .config('spark.sql.hive.convertMetastoreParquet', 'False') \
            .config('spark.debug.maxToStringFields', '100') \
            .config('spark.sql.crossJoin.enabled', 'True') \
            .config('hive.exec.dynamic.partition', 'true') \
            .config('hive.exec.dynamic.partition.mode', 'nonstrict') \
            .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
            .config("spark.sql.filesmaxPartitionBytes", "128mb") \
            .config('hive.exec.parallel', 'true') \
            .config('hive.execution.engine', 'spark') \
            .getOrCreate()
    except ConnectionError as e1:
        print("ERRO: FALHA AO INICIAR O SPARK")

    def __init__(self, sistema, odate):
        self.sistema = sistema
        self.odate = odate
        spark.catalog.setCurrentDatabase('cra')

    def import_contrato(self):
        df = spark.sql("""SELECT co.cod_sistema sistema,
                                 co.cd_emp empresa,
                                 co.cosif,
                                 co.cd_pad_ctb,
                                 co.valor_pad_ctb,
                                 co.data_base,
                                 conta, 
                                 saldo_cra
       
                          FROM tb_in_contrato_m co
                          lateral VIEW posexplode(split(co.cd_pad_ctb,'#')) col1 AS pos3, conta
                          lateral VIEW posexplode(split(co.valor_pad_ctb,'#')) col2 AS pos4, saldo_cra
                          WHERE pos3 = pos4
                                AND cod_sistema = '{0}'
                                AND dat_ref_carga = '{1}'"""
                       .format(self.sistema, '20211231'))
        return df

    def import_expressinho(self):
        df = spark.sql("""SELECT ex.dt_processamento dt_proc,
                                 trim(ex.empresa) emp_ex,
                                 trim(ex.conta) conta_ex,
                                 ex.nome_conta,
                                 replace(ex.saldo_ha, '+', '') saldo_ha
                          FROM Expressinho ex""")
        return df

    def import_lh(self):
        df = spark.sql("""SELECT '033' lh_empresa,
                                 lh.conta_contabil_1 conta_lh_1,
                                 lh.valor_contabil_1 saldo_lh_1,
                                 lh.conta_contabil_2 conta_lh_2,
                                 lh.valor_contabil_2 saldo_lh_2,
                                 lh.conta_contabil_3 conta_lh_3,
                                 lh.valor_contabil_3 saldo_lh_3,
                                 lh.conta_contabil_4 conta_lh_4,
                                 lh.valor_contabil_4 saldo_lh_4,
                                 lh.conta_contabil_5 conta_lh_5,
                                 lh.valor_contabil_5 saldo_lh_5,
                                 lh.data_contabil
                          FROM tb_lh_leo_m_temp lh"""
                       )
        return df

    def small_files(self, dir_parquet, tableName, odate):
        sistema = argv[1]
        df_small = SmallFiles(spark, "{0}{1}/dat_ref_carga={2}/sistema={3}"
                              .format(dir_parquet, tableName, odate, sistema))
        df_small.small_files()

    def process_data(self):
        df_contrato = self.import_contrato()
        df_lh = self.import_lh()
        df_expressinho = self.import_expressinho()
        origem = "/sistemas/cra/conciliacao_contabil/bst_Conciliacao_Contabil_DG.txt"

        data_atual = datetime.now().strftime("%d%m%Y")

        df_contrato = df_contrato.select('data_base', 'empresa', 'sistema', 'conta', 'saldo_cra',
                                         explode(split(col('cosif'), '#')).alias('cosif'))

        df_contrato = df_contrato.filter((col('conta') != '') & (col('saldo_cra') != '') & (col('cosif') != ''))
        df_valor_positivo = df_contrato.filter(col('saldo_cra') > 0)
        df_valor_negativo = df_contrato.filter(col('saldo_cra').like('-%'))
        df_valor_positivo = df_valor_positivo.dropDuplicates()
        df_valor_negativo = df_valor_negativo.dropDuplicates()

        df_valor_positivo = df_valor_positivo.groupBy([df_contrato.empresa, df_contrato.conta, df_contrato.sistema,
                                                       df_contrato.data_base, df_contrato.cosif]).agg(sum('saldo_cra'))

        df_valor_positivo = df_valor_positivo.withColumnRenamed('sum(saldo_cra)', 'saldo_cra_posit')

        # df_valor_negativo = df_valor_negativo.withColumn('saldo_cra', regexp_replace('saldo_cra', '-', ''))
        df_valor_negativo = df_valor_negativo.groupBy([df_contrato.empresa, df_contrato.conta, df_contrato.sistema,
                                                       df_contrato.data_base, df_contrato.cosif]).agg(sum('saldo_cra'))

        df_valor_negativo = df_valor_negativo \
            .withColumnRenamed('sum(saldo_cra)', 'saldo_cra_negativ') \
            .withColumnRenamed('data_base', 'data_base_neg') \
            .withColumnRenamed('empresa', 'empresa_neg') \
            .withColumnRenamed('conta', 'conta_neg') \
            .withColumnRenamed('cosif', 'cosif_neg') \
            .withColumnRenamed('sistema', 'sistema_neg')

        df_contrato_join = (df_valor_positivo
                            .join(df_valor_negativo,
                                  (df_valor_positivo.conta == df_valor_negativo.conta_neg) &
                                  (df_valor_positivo.empresa == df_valor_negativo.empresa_neg) &
                                  (df_valor_positivo.data_base == df_valor_negativo.data_base_neg), how='inner')
                            )

        df_contrato_join = df_contrato_join \
            .withColumn('saldo_cra', col('saldo_cra_posit') - col('saldo_cra_negativ'))

        df_contrato_join = df_contrato_join.select('sistema', 'empresa', 'cosif', 'conta', 'data_base', 'saldo_cra')
        df_contrato_join = df_contrato_join.dropDuplicates(['sistema', 'empresa', 'cosif', 'conta', 'data_base'])

        df_contrato_not_join = (df_contrato
                                .join(df_contrato_join,
                                      (df_contrato.conta == df_contrato_join.conta) &
                                      (df_contrato.empresa == df_contrato_join.empresa), how='leftanti'))

        df_contrato_not_join = df_contrato_not_join.select('sistema', 'empresa', 'cosif', 'conta', 'data_base',
                                                           'saldo_cra')
        df_contrato_not_join = df_contrato_not_join \
            .dropDuplicates(['sistema', 'empresa', 'conta', 'data_base', 'saldo_cra'])
        df_contrato_unificado = df_contrato_not_join.union(df_contrato_join)
        df_contrato_unificado = df_contrato_unificado.withColumn('saldo_cra', col('saldo_cra').cast('float'))

        # -------------------------Fim do Contrato---------------------------#

        df_valor_pos_exp = df_expressinho.filter(~col('saldo_ha').like('-%'))
        df_valor_neg_exp = df_expressinho.filter(col('saldo_ha').like('-%'))
        df_valor_neg_exp = df_valor_neg_exp.withColumn('saldo_ha', regexp_replace('saldo_ha', '-', ''))
        df_valor_pos_exp = df_valor_pos_exp.dropDuplicates()
        df_valor_neg_exp = df_valor_neg_exp.dropDuplicates()

        df_valor_pos_exp = df_valor_pos_exp.select('emp_ex', 'conta_ex', 'nome_conta', 'saldo_ha', 'dt_proc')
        df_valor_neg_exp = df_valor_neg_exp.select('emp_ex', 'conta_ex', 'nome_conta', 'saldo_ha', 'dt_proc')

        df_valor_neg_exp = df_valor_neg_exp \
            .withColumnRenamed('emp_ex', 'emp_ex_neg') \
            .withColumnRenamed('conta_ex', 'conta_neg') \
            .withColumnRenamed('nome_conta', 'nome_conta_neg') \
            .withColumnRenamed('saldo_ha', 'saldo_ha_neg') \
            .withColumnRenamed('dt_proc', 'dt_proc_neg')

        df_valor_neg_exp = df_valor_neg_exp.withColumn('saldo_ha_neg', regexp_replace('saldo_ha_neg', ',', '.'))
        df_valor_neg_exp = df_valor_neg_exp.groupBy([df_valor_neg_exp.emp_ex_neg, df_valor_neg_exp.conta_neg,
                                                     df_valor_neg_exp.nome_conta_neg, df_valor_neg_exp.dt_proc_neg]) \
            .agg(sum('saldo_ha_neg'))
        df_valor_neg_exp = df_valor_neg_exp.withColumnRenamed('sum(saldo_ha_neg)', 'saldo_ha_neg')

        df_valor_pos_exp = df_valor_pos_exp \
            .withColumnRenamed('emp_ex', 'emp_ex_pos') \
            .withColumnRenamed('conta_ex', 'conta_pos') \
            .withColumnRenamed('nome_conta', 'nome_conta_pos') \
            .withColumnRenamed('saldo_ha', 'saldo_ha_pos') \
            .withColumnRenamed('dt_proc', 'dt_proc_pos')

        df_valor_pos_exp = df_valor_pos_exp.withColumn('saldo_ha_pos', regexp_replace('saldo_ha_pos', ',', '.'))
        df_valor_pos_exp = df_valor_pos_exp.groupBy([df_valor_pos_exp.emp_ex_pos, df_valor_pos_exp.conta_pos,
                                                     df_valor_pos_exp.nome_conta_pos, df_valor_pos_exp.dt_proc_pos]) \
            .agg(sum('saldo_ha_pos'))
        df_valor_pos_exp = df_valor_pos_exp.withColumnRenamed('sum(saldo_ha_pos)', 'saldo_ha_pos')

        df_expressinho_join = (df_valor_pos_exp
                               .join(df_valor_neg_exp,
                                     (df_valor_pos_exp.conta_pos == df_valor_neg_exp.conta_neg) &
                                     (df_valor_pos_exp.emp_ex_pos == df_valor_neg_exp.emp_ex_neg), how='inner')
                               )

        df_expressinho_join = df_expressinho_join.withColumn('saldo_ha', col('saldo_ha_pos') - col('saldo_ha_neg'))

        df_expressinho_join = df_expressinho_join \
            .select('emp_ex_pos', 'conta_pos', 'nome_conta_pos', 'dt_proc_pos', 'saldo_ha')
        df_expressinho_join = df_expressinho_join \
            .dropDuplicates(['emp_ex_pos', 'conta_pos', 'nome_conta_pos', 'dt_proc_pos', 'saldo_ha'])

        df_expressinho_not_join = (df_expressinho
                                   .join(df_expressinho_join,
                                         (df_expressinho.conta_ex == df_expressinho_join.conta_pos) &
                                         (df_expressinho.emp_ex == df_expressinho_join.emp_ex_pos), how='leftanti'))

        df_expressinho_not_join = df_expressinho_not_join.withColumn('saldo_ha', regexp_replace('saldo_ha', ',', '.'))
        df_expressinho_not_join = df_expressinho_not_join \
            .dropDuplicates(['emp_ex', 'conta_ex', 'nome_conta', 'dt_proc', 'saldo_ha'])
        df_expressinho_unificado = df_expressinho_not_join.union(df_expressinho_join)
        df_expressinho_unificado = df_expressinho_unificado.withColumn('saldo_ha', col('saldo_ha').cast('float'))

        # -------------------------Fim do Expressinho---------------------------#

        df_valor_pos_lh = df_lh.filter(~col('saldo_lh_1').like('-%'))
        df_valor_neg_lh = df_lh.filter(col('saldo_lh_1').like('-%'))
        df_valor_neg_lh = df_valor_neg_lh.withColumn('saldo_lh_1', regexp_replace('saldo_lh_1', '-', ''))
        df_valor_pos_lh = df_valor_pos_lh.dropDuplicates()
        df_valor_neg_lh = df_valor_neg_lh.dropDuplicates()

        df_valor_pos_lh = df_valor_pos_lh \
            .groupBy([df_valor_pos_lh.lh_empresa, df_valor_pos_lh.conta_lh_1]).agg(sum('saldo_lh_1'))

        df_valor_neg_lh = df_valor_neg_lh \
            .groupBy([df_valor_neg_lh.lh_empresa, df_valor_neg_lh.conta_lh_1]).agg(sum('saldo_lh_1'))

        df_valor_pos_lh = df_valor_pos_lh.select('lh_empresa', 'conta_lh_1', 'sum(saldo_lh_1)')
        df_valor_neg_lh = df_valor_neg_lh.select('lh_empresa', 'conta_lh_1', 'sum(saldo_lh_1)')

        df_valor_neg_lh = df_valor_neg_lh \
            .withColumnRenamed('lh_empresa', 'lh_empresa_neg') \
            .withColumnRenamed('conta_lh_1', 'conta_lh_1_neg') \
            .withColumnRenamed('sum(saldo_lh_1)', 'saldo_lh_1_negativ')

        df_valor_pos_lh = df_valor_pos_lh.withColumnRenamed('sum(saldo_lh_1)', 'saldo_lh_1_posit')

        df_lh_join = (df_valor_pos_lh
                      .join(df_valor_neg_lh,
                            (df_valor_pos_lh.lh_empresa == df_valor_neg_lh.lh_empresa_neg) &
                            (df_valor_pos_lh.conta_lh_1 == df_valor_neg_lh.conta_lh_1_neg), how='inner')
                      )

        df_lh_join = df_lh_join.withColumn('saldo_lh', col('saldo_lh_1_posit') - col('saldo_lh_1_negativ'))
        df_lh_not_join = (df_lh
                          .join(df_lh_join,
                                (df_lh.lh_empresa == df_lh_join.lh_empresa_neg) &
                                (df_lh.conta_lh_1 == df_lh_join.conta_lh_1_neg), how='leftanti'))

        df_lh_join = df_lh_join.select('lh_empresa', 'conta_lh_1', 'saldo_lh')
        df_lh_not_join = df_lh_not_join.select('lh_empresa', 'conta_lh_1', 'saldo_lh_1')

        df_lh_not_join = df_lh_not_join.dropDuplicates(['lh_empresa', 'conta_lh_1', 'saldo_lh_1'])
        df_lh_unificado = df_lh_join.union(df_lh_not_join)
        df_lh_unificado = df_lh_unificado.withColumn('saldo_lh', col('saldo_lh').cast('float'))

        # -------------------------Fim do LH---------------------------#

        df_conciliacao_contabil = (df_contrato_unificado
                                   .join(df_expressinho_unificado,
                                         (df_contrato_unificado.empresa == df_expressinho_unificado.emp_ex) &
                                         (df_contrato_unificado.conta == df_expressinho_unificado.conta_ex), how='left')
                                   .join(df_lh_unificado,
                                         (df_contrato_unificado.empresa == df_lh_unificado.lh_empresa) &
                                         (df_contrato_unificado.conta == df_lh_unificado.conta_lh_1), how='left'))

        df_conciliacao_contabil = df_conciliacao_contabil.select('sistema', 'empresa', 'cosif', 'conta', 'data_base',
                                                                 'saldo_cra', 'dt_proc', 'nome_conta', 'saldo_ha',
                                                                 'saldo_lh')

        df_conciliacao_contabil = (df_conciliacao_contabil
                                   .withColumn('dif_cra_ha',
                                               df_conciliacao_contabil.saldo_cra - df_conciliacao_contabil.saldo_ha)
                                   .withColumn('dif_lh_ha',
                                               df_conciliacao_contabil.saldo_lh - df_conciliacao_contabil.saldo_ha)
                                   .withColumn('dif_lh_cra',
                                               df_conciliacao_contabil.saldo_lh - df_conciliacao_contabil.saldo_cra))

        df_conciliacao_contabil = df_conciliacao_contabil \
            .groupBy([df_conciliacao_contabil.sistema, df_conciliacao_contabil.empresa, df_conciliacao_contabil.cosif,
                      df_conciliacao_contabil.conta, df_conciliacao_contabil.data_base, df_conciliacao_contabil.dt_proc,
                      df_conciliacao_contabil.nome_conta]) \
            .agg({'saldo_cra': 'sum', 'saldo_ha': 'sum', 'saldo_lh': 'sum', 'dif_cra_ha': 'sum', 'dif_lh_ha': 'sum',
                  'dif_lh_cra': 'sum'})

        df_conciliacao_contabil = df_conciliacao_contabil \
            .withColumnRenamed('sum(saldo_cra)', 'saldo_cra') \
            .withColumnRenamed('sum(saldo_ha)', 'saldo_ha') \
            .withColumnRenamed('sum(saldo_lh)', 'saldo_lh') \
            .withColumnRenamed('sum(dif_cra_ha)', 'dif_cra_ha') \
            .withColumnRenamed('sum(dif_lh_ha)', 'dif_lh_ha') \
            .withColumnRenamed('sum(dif_lh_cra)', 'dif_lh_cra')

        df_conciliacao_contabil = (df_conciliacao_contabil
                                   .withColumn('perc_dif_cra_ha', (df_conciliacao_contabil.saldo_cra /
                                                                   df_conciliacao_contabil.saldo_ha) * 100)
                                   .withColumn('perc_dif_lh_ha', (df_conciliacao_contabil.saldo_lh /
                                                                  df_conciliacao_contabil.saldo_ha) * 100)
                                   .withColumn('dat_ref_carga', lit(data_atual)))

        df_conciliacao_contabil = df_conciliacao_contabil \
            .na.fill(value=0.00, subset=['saldo_ha', 'saldo_lh', 'dif_cra_ha', 'dif_lh_ha', 'dif_lh_cra',
                                         'perc_dif_cra_ha', 'perc_dif_lh_ha'])

        df_conciliacao_contabil = (df_conciliacao_contabil
                                   .withColumn("dif_cra_ha", when(df_conciliacao_contabil['dif_cra_ha'] == 0.0,
                                                                  df_conciliacao_contabil.saldo_cra)
                                               .otherwise(df_conciliacao_contabil.dif_cra_ha))
                                   .withColumn("dif_lh_ha", when(df_conciliacao_contabil['dif_lh_ha'] == 0.0,
                                                                 df_conciliacao_contabil.saldo_lh)
                                               .otherwise(df_conciliacao_contabil.dif_lh_ha))
                                   .withColumn("dif_lh_cra", when(df_conciliacao_contabil['dif_lh_cra'] == 0.0,
                                                                  df_conciliacao_contabil.saldo_lh)
                                               .otherwise(df_conciliacao_contabil.dif_lh_cra)))

        df_conciliacao_contabil = df_conciliacao_contabil \
            .select('empresa', 'cosif', 'conta', 'data_base', round('saldo_cra', 2).alias('saldo_cra'),
                    'dt_proc', 'nome_conta', round('saldo_ha', 2).alias('saldo_ha'),
                    round('saldo_lh', 2).alias('saldo_lh'), round('dif_cra_ha', 2).alias('dif_cra_ha'),
                    round('dif_lh_ha', 2).alias('dif_lh_ha'), round('dif_lh_cra', 2).alias('dif_lh_cra'),
                    round('perc_dif_cra_ha', 2).alias('perc_dif_cra_ha'),
                    round('perc_dif_lh_ha', 2).alias('perc_dif_lh_ha'), 'dat_ref_carga', 'sistema')

        df_conciliacao_contabil = df_conciliacao_contabil \
            .withColumn('perc_dif_cra_ha', regexp_replace('perc_dif_cra_ha', '-', '')) \
            .withColumn('perc_dif_lh_ha', regexp_replace('perc_dif_lh_ha', '-', ''))

        df_conciliacao_contabil = df_conciliacao_contabil \
            .sort(df_conciliacao_contabil.data_base.desc(), df_conciliacao_contabil.dat_ref_carga.desc(),
                  df_conciliacao_contabil.sistema.desc(), df_conciliacao_contabil.empresa.desc(),
                  df_conciliacao_contabil.cosif.desc(), df_conciliacao_contabil.conta.desc())

        df_conciliacao_contabil.write.insertInto("tb_conciliacao_contabil", overwrite=True)
        self.small_files('/sistemas/cra/conciliacao_contabil/', 'tb_conciliacao_contabil/', data_atual)

        os.system(
            'hdfs dfs -rm -R /sistemas/cra/conciliacao_contabil/bst_Conciliacao_Contabil_' + self.sistema + '.txt')

        os.system(
            'hdfs dfs -touchz /sistemas/cra/conciliacao_contabil/bst_Conciliacao_Contabil_' + self.sistema + '.txt')

        spark.stop()


conciliacao_contabil = Conciliacao_Contabil(argv[1], argv[2])
conciliacao_contabil.process_data()