In [None]:

from delta.tables import *
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql.utils import *
from pyspark.sql.window import *

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
def calcular_digitos_verificadores(df):
    # Remove caracteres não numéricos do CNPJ
    df = df.withColumn("cnpj_base", expr("regexp_replace(cnpj, '[^0-9]', '')"))

    # Verifica se o CNPJ tem 12 dígitos
    df = df.filter(length(col("cnpj_base")) == 12)

    # Cálculo do primeiro dígito verificador
    df = df.withColumn("soma_1", col("cnpj_base").substr(1, 1).cast("int") * 5 +
                       col("cnpj_base").substr(2, 1).cast("int") * 4 +
                       col("cnpj_base").substr(3, 1).cast("int") * 3 +
                       col("cnpj_base").substr(4, 1).cast("int") * 2 +
                       col("cnpj_base").substr(5, 1).cast("int") * 9 +
                       col("cnpj_base").substr(6, 1).cast("int") * 8 +
                       col("cnpj_base").substr(7, 1).cast("int") * 7 +
                       col("cnpj_base").substr(8, 1).cast("int") * 6 +
                       col("cnpj_base").substr(9, 1).cast("int") * 5 +
                       col("cnpj_base").substr(10, 1).cast("int") * 4 +
                       col("cnpj_base").substr(11, 1).cast("int") * 3 +
                       col("cnpj_base").substr(12, 1).cast("int") * 2)

    df = df.withColumn("resto_1", col("soma_1") % 11)
    df = df.withColumn("digito_1", when(col("resto_1") < 2, 0).otherwise(11 - col("resto_1")))

    # Adiciona o primeiro dígito verificador ao CNPJ
    df = df.withColumn("cnpj_com_digito_1", concat(col("cnpj_base"), col("digito_1").cast(StringType())))

    # Cálculo do segundo dígito verificador
    df = df.withColumn("soma_2", col("cnpj_com_digito_1").substr(1, 1).cast("int") * 6 +
                       col("cnpj_com_digito_1").substr(2, 1).cast("int") * 5 +
                       col("cnpj_com_digito_1").substr(3, 1).cast("int") * 4 +
                       col("cnpj_com_digito_1").substr(4, 1).cast("int") * 3 +
                       col("cnpj_com_digito_1").substr(5, 1).cast("int") * 2 +
                       col("cnpj_com_digito_1").substr(6, 1).cast("int") * 9 +
                       col("cnpj_com_digito_1").substr(7, 1).cast("int") * 8 +
                       col("cnpj_com_digito_1").substr(8, 1).cast("int") * 7 +
                       col("cnpj_com_digito_1").substr(9, 1).cast("int") * 6 +
                       col("cnpj_com_digito_1").substr(10, 1).cast("int") * 5 +
                       col("cnpj_com_digito_1").substr(11, 1).cast("int") * 4 +
                       col("cnpj_com_digito_1").substr(12, 1).cast("int") * 3 +
                       col("cnpj_com_digito_1").substr(13, 1).cast("int") * 2)

    df = df.withColumn("resto_2", col("soma_2") % 11)
    df = df.withColumn("digito_2", when(col("resto_2") < 2, 0).otherwise(11 - col("resto_2")))

    # Combina os dígitos verificadores
    df = df.withColumn("digitos_verificadores",
                       concat(col("digito_1").cast(StringType()), col("digito_2").cast(StringType())))

    return df.select("cnpj", "identificador_socio", "nome_socio", "cnpj_cpf_socio", "qualificacao_socio",
                     "data_entrada_sociedade", "pais", "representante_legal", "nome_representante",
                     "qualificacao_representante_legal", "faixa_etaria", "digitos_verificadores")


In [None]:
##READ FILES

manual_columns = [
    "cnpj_basico", "identificador_socio", "nome_socio", "cnpj_cpf_socio", "qualificacao_socio",
    "data_entrada_sociedade", "pais", "representante_legal", "nome_representante", "qualificacao_representante_legal",
    "faixa_etaria"
]

df_socios = spark.read.format("csv").option("header", "false").option("sep", ";").load('./Socio/*.csv')
df_socios = df_socios.toDF(*manual_columns)
df_socios.createOrReplaceTempView("socios")

In [None]:
df_socios.printSchema()

In [None]:
df_socios_ajustado = spark.sql('''
    SELECT 
        CONCAT(cnpj_basico, '0001') AS cnpj,
        identificador_socio,
        nome_socio,
        cnpj_cpf_socio,
        qualificacao_socio,
        data_entrada_sociedade,
        pais,
        representante_legal,
        nome_representante,
        qualificacao_representante_legal,
        faixa_etaria
    FROM
        socios
''')
df_socios_ajustado.show()

In [None]:
resultado = calcular_digitos_verificadores(df_socios_ajustado)
resultado.createOrReplaceTempView('resultado')

In [None]:
df_final = spark.sql('''
    SELECT 
        CONCAT(cnpj, digitos_verificadores) as cnpj,
        identificador_socio,
        nome_socio,
        cnpj_cpf_socio,
        qualificacao_socio,
        data_entrada_sociedade,
        pais,
        representante_legal,
        nome_representante,
        qualificacao_representante_legal,
        faixa_etaria
    FROM
        resultado
''')
df_final.count()

In [None]:
df_final.coalesce(1).write.mode('overwrite').parquet('./Socio/socios_parquet')