In [1]:
import sys
import uuid
from pyspark.sql.functions import struct, col, to_json
from lib import ConfigLoader, Utils, DataLoader #,Transformations
from lib.logger import Log4j

In [2]:
from pyspark.sql.functions import struct, lit, col, array, when, isnull, filter, current_timestamp, date_format, expr, \
    collect_list

In [3]:
    # Definindo o ambiente de execução e a data de carregamento
    # Aqui, o ambiente é configurado como "LOCAL", e a data é fixada
    job_run_env = "LOCAL"  # Definindo o ambiente como 'local'
    load_date = "2022-08-02"  # Data fixa para carregamento

    # Gerando um identificador único para esta execução do job
    job_run_id = "SBDL-" + str(uuid.uuid4())  

    # Imprime no console o início da execução do job
    print(f"Initializing SBDL Job in {job_run_env} Job ID: {job_run_id}")

    # Carrega as configurações do arquivo correspondente ao ambiente (ex.: conf/local.conf)
    conf = ConfigLoader.get_config(job_run_env)

    # Verifica se o Hive está habilitado nas configurações, para decidir se será utilizado
    enable_hive = True if conf["enable.hive"] == "true" else False
    hive_db = conf["hive.database"]  # Obtém o nome do banco de dados Hive das configurações

    # Inicializando a sessão Spark, essencial para processamento distribuído de dados
    print("Creating Spark Session")
    spark = Utils.get_spark_session(job_run_env)  # Cria a sessão Spark apropriada ao ambiente

    # Inicializa o logger para registrar informações no log
    logger = Log4j(spark)

Initializing SBDL Job in LOCAL Job ID: SBDL-1f7233ff-8a99-4ff4-875b-f3d4c4a8f40d
Creating Spark Session
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-656f6d05-dda6-493b-833a-aafeae3c871e;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report 

24/10/05 13:40:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Carrega os dados das contas do sistema (DataFrame de contas)
logger.info("Reading SBDL Account DF")
accounts_df = DataLoader.read_accounts(spark, job_run_env, enable_hive, hive_db)
accounts_df.show(5)  
accounts_df.printSchema()  

24/10/05 13:40:18 INFO sbdl: Reading SBDL Account DF
+----------+----------+----------+----------+-------------------+---------------+--------------+-----------+------------------+-----------+-------------+
| load_date|active_ind|account_id|source_sys| account_start_date|  legal_title_1| legal_title_2|tax_id_type|            tax_id|branch_code|      country|
+----------+----------+----------+----------+-------------------+---------------+--------------+-----------+------------------+-----------+-------------+
|2022-08-02|         1|6982391060|       COH|2018-03-24 08:26:45|  Tiffany Riley|Matthew Davies|        EIN|ZLCK91795330413525|   ACXMGBA5|       Mexico|
|2022-08-02|         1|6982391061|       ADS|2018-07-19 05:54:49|Garcia and Sons| Taylor Guzman|        SSP|CADU39916151090321|   SHJFGBML|United States|
|2022-08-02|         1|6982391067|       BDL|2018-08-29 11:48:59|     Acosta Inc|  David Walker|        SSP|UJLN20870916345792|   WZTEGBTG|       Canada|
|2022-08-02|         1|

                                                                                

In [5]:
def get_contract(df):
    contract_title = array(when(~isnull("legal_title_1"),
                                struct(lit("lgl_ttl_ln_1").alias("contractTitleLineType"),
                                       col("legal_title_1").alias("contractTitleLine")).alias("contractTitle")),
                           when(~isnull("legal_title_2"),
                                struct(lit("lgl_ttl_ln_2").alias("contractTitleLineType"),
                                       col("legal_title_2").alias("contractTitleLine")).alias("contractTitle"))
                           )

    contract_title_nl = filter(contract_title, lambda x: ~isnull(x))

    tax_identifier = struct(col("tax_id_type").alias("taxIdType"),
                            col("tax_id").alias("taxId")).alias("taxIdentifier")

    return df.select("account_id", get_insert_operation(col("account_id"), "contractIdentifier"),
                     get_insert_operation(col("source_sys"), "sourceSystemIdentifier"),
                     get_insert_operation(col("account_start_date"), "contactStartDateTime"),
                     get_insert_operation(contract_title_nl, "contractTitle"),
                     get_insert_operation(tax_identifier, "taxIdentifier"),
                     get_insert_operation(col("branch_code"), "contractBranchCode"),
                     get_insert_operation(col("country"), "contractCountry"),
                     )
def get_insert_operation(column, alias):
    return struct(lit("INSERT").alias("operation"),
                  column.alias("newValue"),
                  lit(None).alias("oldValue")).alias(alias)

# Aplica transformações para criar um DataFrame de contratos a partir das contas
contract_df = get_contract(accounts_df)
contract_df.show(5)
contract_df.printSchema()

+----------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|account_id|  contractIdentifier|sourceSystemIdentifier|contactStartDateTime|       contractTitle|       taxIdentifier|  contractBranchCode|     contractCountry|
+----------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|6982391060|{INSERT, 69823910...|   {INSERT, COH, null}|{INSERT, 2018-03-...|{INSERT, [{lgl_tt...|{INSERT, {EIN, ZL...|{INSERT, ACXMGBA5...|{INSERT, Mexico, ...|
|6982391061|{INSERT, 69823910...|   {INSERT, ADS, null}|{INSERT, 2018-07-...|{INSERT, [{lgl_tt...|{INSERT, {SSP, CA...|{INSERT, SHJFGBML...|{INSERT, United S...|
|6982391067|{INSERT, 69823910...|   {INSERT, BDL, null}|{INSERT, 2018-08-...|{INSERT, [{lgl_tt...|{INSERT, {SSP, UJ...|{INSERT, WZTEGBTG...|{INSERT, Canada, ...|
|6982391064|{INSERT, 6982391

In [6]:
    # Carrega os dados das partes envolvidas (DataFrame de clientes ou entidades relacionadas)
    logger.info("Reading SBDL Party DF")
    parties_df = DataLoader.read_parties(spark, job_run_env, enable_hive, hive_db)
    parties_df.show(5)
    parties_df.printSchema()

24/10/05 13:40:46 INFO sbdl: Reading SBDL Party DF
+----------+----------+----------+-------------+-------------------+
| load_date|account_id|  party_id|relation_type|relation_start_date|
+----------+----------+----------+-------------+-------------------+
|2022-08-02|6982391060|9823462810|          F-N|2019-07-29 00:51:32|
|2022-08-02|6982391061|9823462811|          F-N|2018-08-30 23:57:22|
|2022-08-02|6982391062|9823462812|          F-N|2018-08-25 10:20:29|
|2022-08-02|6982391063|9823462813|          F-N|2018-05-11 01:53:28|
|2022-08-02|6982391064|9823462814|          F-N|2019-06-06 08:48:12|
+----------+----------+----------+-------------+-------------------+
only showing top 5 rows

root
 |-- load_date: date (nullable = true)
 |-- account_id: string (nullable = true)
 |-- party_id: string (nullable = true)
 |-- relation_type: string (nullable = true)
 |-- relation_start_date: timestamp (nullable = true)



In [7]:
def get_relations(df):
    return df.select("account_id", "party_id",
                     get_insert_operation(col("party_id"), "partyIdentifier"),
                     get_insert_operation(col("relation_type"), "partyRelationshipType"),
                     get_insert_operation(col("relation_start_date"), "partyRelationStartDateTime")
                     )
# Aplica transformações para obter as relações entre essas partes
relations_df = get_relations(parties_df)
relations_df.show(5)
relations_df.printSchema()

+----------+----------+--------------------+---------------------+--------------------------+
|account_id|  party_id|     partyIdentifier|partyRelationshipType|partyRelationStartDateTime|
+----------+----------+--------------------+---------------------+--------------------------+
|6982391060|9823462810|{INSERT, 98234628...|  {INSERT, F-N, null}|      {INSERT, 2019-07-...|
|6982391061|9823462811|{INSERT, 98234628...|  {INSERT, F-N, null}|      {INSERT, 2018-08-...|
|6982391062|9823462812|{INSERT, 98234628...|  {INSERT, F-N, null}|      {INSERT, 2018-08-...|
|6982391063|9823462813|{INSERT, 98234628...|  {INSERT, F-N, null}|      {INSERT, 2018-05-...|
|6982391064|9823462814|{INSERT, 98234628...|  {INSERT, F-N, null}|      {INSERT, 2019-06-...|
+----------+----------+--------------------+---------------------+--------------------------+
only showing top 5 rows

root
 |-- account_id: string (nullable = true)
 |-- party_id: string (nullable = true)
 |-- partyIdentifier: struct (nullable = f

In [8]:
# Carrega os dados de endereços (DataFrame de endereços)
logger.info("Reading SBDL Address DF")
address_df = DataLoader.read_address(spark, job_run_env, enable_hive, hive_db)
address_df.show(5)
address_df.printSchema()

24/10/05 13:41:59 INFO sbdl: Reading SBDL Address DF
+----------+----------+------------------+--------------------+--------------+-----------+------------------+------------------+
| load_date|  party_id|    address_line_1|      address_line_2|          city|postal_code|country_of_address|address_start_date|
+----------+----------+------------------+--------------------+--------------+-----------+------------------+------------------+
|2022-08-02|9823462810| 45229 Drake Route|   13306 Corey Point|     Shanefort|      77163|            Canada|        2019-02-26|
|2022-08-02|9823462811|361 Robinson Green|3511 Rebecca Mission|   North Tyler|      34118|            Canada|        2018-01-28|
|2022-08-02|9823462812|  039 Daniel Mount|8219 Hernandez Lo...| Boltonborough|      71648|            Mexico|        2018-12-07|
|2022-08-02|9823462813|05550 Nancy Rapids| 9471 Zachary Canyon|East Davidport|      02504|     United States|        2019-04-02|
|2022-08-02|9823462814| 5227 Wagner Pines|18

In [11]:
def get_address(df):
    address = struct(col("address_line_1").alias("addressLine1"),
                     col("address_line_2").alias("addressLine2"),
                     col("city").alias("addressCity"),
                     col("postal_code").alias("addressPostalCode"),
                     col("country_of_address").alias("addressCountry"),
                     col("address_start_date").alias("addressStartDate")
                     )

    return df.select("party_id", get_insert_operation(address, "partyAddress"))

# Aplica transformações para obter o DataFrame de endereços relacionados
relation_address_df = get_address(address_df)
relation_address_df.show(5)
relation_address_df.printSchema()

+----------+--------------------+
|  party_id|        partyAddress|
+----------+--------------------+
|9823462810|{INSERT, {45229 D...|
|9823462811|{INSERT, {361 Rob...|
|9823462812|{INSERT, {039 Dan...|
|9823462813|{INSERT, {05550 N...|
|9823462814|{INSERT, {5227 Wa...|
+----------+--------------------+
only showing top 5 rows

root
 |-- party_id: string (nullable = true)
 |-- partyAddress: struct (nullable = false)
 |    |-- operation: string (nullable = false)
 |    |-- newValue: struct (nullable = false)
 |    |    |-- addressLine1: string (nullable = true)
 |    |    |-- addressLine2: string (nullable = true)
 |    |    |-- addressCity: string (nullable = true)
 |    |    |-- addressPostalCode: string (nullable = true)
 |    |    |-- addressCountry: string (nullable = true)
 |    |    |-- addressStartDate: date (nullable = true)
 |    |-- oldValue: void (nullable = true)



In [13]:
def join_party_address(p_df, a_df):
    return p_df.join(a_df, "party_id", "left_outer") \
        .groupBy("account_id") \
        .agg(collect_list(struct("partyIdentifier",
                                 "partyRelationshipType",
                                 "partyRelationStartDateTime",
                                 "partyAddress"
                                 ).alias("partyDetails")
                          ).alias("partyRelations"))

# Junta os dados de relações de partes com os endereços para criar um DataFrame único
logger.info("Join Party Relations and Address")
party_address_df = join_party_address(relations_df, relation_address_df)
party_address_df.show(5)
party_address_df.printSchema()

24/10/05 13:45:25 INFO sbdl: Join Party Relations and Address




+----------+--------------------+
|account_id|      partyRelations|
+----------+--------------------+
|6982391067|[{{INSERT, 982346...|
|6982391064|[{{INSERT, 982346...|
|6982391066|[{{INSERT, 982346...|
|6982391061|[{{INSERT, 982346...|
|6982391060|[{{INSERT, 982346...|
+----------+--------------------+
only showing top 5 rows

root
 |-- account_id: string (nullable = true)
 |-- partyRelations: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- partyIdentifier: struct (nullable = false)
 |    |    |    |-- operation: string (nullable = false)
 |    |    |    |-- newValue: string (nullable = true)
 |    |    |    |-- oldValue: void (nullable = true)
 |    |    |-- partyRelationshipType: struct (nullable = false)
 |    |    |    |-- operation: string (nullable = false)
 |    |    |    |-- newValue: string (nullable = true)
 |    |    |    |-- oldValue: void (nullable = true)
 |    |    |-- partyRelationStartDateTime: struct (nullable = false)
 |    

                                                                                

In [14]:
def join_contract_party(c_df, p_df):
    return c_df.join(p_df, "account_id", "left_outer")

# Junta os dados de contratos com os dados de partes e endereços
logger.info("Join Account and Parties")
data_df = join_contract_party(contract_df, party_address_df)
data_df.show(5)
data_df.printSchema()

24/10/05 13:46:13 INFO sbdl: Join Account and Parties


                                                                                

+----------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|account_id|  contractIdentifier|sourceSystemIdentifier|contactStartDateTime|       contractTitle|       taxIdentifier|  contractBranchCode|     contractCountry|      partyRelations|
+----------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|6982391067|{INSERT, 69823910...|   {INSERT, BDL, null}|{INSERT, 2018-08-...|{INSERT, [{lgl_tt...|{INSERT, {SSP, UJ...|{INSERT, WZTEGBTG...|{INSERT, Canada, ...|[{{INSERT, 982346...|
|6982391064|{INSERT, 69823910...|   {INSERT, ADS, null}|{INSERT, 2018-03-...|{INSERT, [{lgl_tt...|{INSERT, {CPR, WJ...|{INSERT, OCCKGB65...|{INSERT, Canada, ...|[{{INSERT, 982346...|
|6982391066|{INSERT, 69823910...|   {INSERT, BDL, null}|{INSERT, 2017-08-...|{INSERT,

In [16]:
def apply_header(spark, df):
    header_info = [("SBDL-Contract", 1, 0), ]
    header_df = spark.createDataFrame(header_info) \
        .toDF("eventType", "majorSchemaVersion", "minorSchemaVersion")

    event_df = header_df.hint("broadcast").crossJoin(df) \
        .select(struct(expr("uuid()").alias("eventIdentifier"),
                       col("eventType"), col("majorSchemaVersion"), col("minorSchemaVersion"),
                       lit(date_format(current_timestamp(), "yyyy-MM-dd'T'HH:mm:ssZ")).alias("eventDateTime")
                       ).alias("eventHeader"),
                array(struct(lit("contractIdentifier").alias("keyField"),
                             col("account_id").alias("keyValue")
                             )).alias("keys"),
                struct(col("contractIdentifier"),
                       col("sourceSystemIdentifier"),
                       col("contactStartDateTime"),
                       col("contractTitle"),
                       col("taxIdentifier"),
                       col("contractBranchCode"),
                       col("contractCountry"),
                       col("partyRelations")
                       ).alias("payload")
                )
    return event_df
    
# Aplica um cabeçalho e cria um evento (provavelmente adiciona metadados ou estrutura padrão)
logger.info("Apply Header and create Event")
final_df = apply_header(spark, data_df)
final_df.show(5)
final_df.printSchema()

24/10/05 13:46:58 INFO sbdl: Apply Header and create Event




+--------------------+--------------------+--------------------+
|         eventHeader|                keys|             payload|
+--------------------+--------------------+--------------------+
|{3e82cdba-2b2e-4d...|[{contractIdentif...|{{INSERT, 6982391...|
|{cb834f58-94d6-4a...|[{contractIdentif...|{{INSERT, 6982391...|
|{a52fcdb6-34d2-42...|[{contractIdentif...|{{INSERT, 6982391...|
|{7e136f59-a40e-44...|[{contractIdentif...|{{INSERT, 6982391...|
|{93e53d9a-07a5-4d...|[{contractIdentif...|{{INSERT, 6982391...|
+--------------------+--------------------+--------------------+
only showing top 5 rows

root
 |-- eventHeader: struct (nullable = false)
 |    |-- eventIdentifier: string (nullable = false)
 |    |-- eventType: string (nullable = true)
 |    |-- majorSchemaVersion: long (nullable = true)
 |    |-- minorSchemaVersion: long (nullable = true)
 |    |-- eventDateTime: string (nullable = false)
 |-- keys: array (nullable = false)
 |    |-- element: struct (containsNull = false)


                                                                                