# Loading Data To Bronze Layer 

1. The origin of the table dados_arquivo are files and will be loaded in batch using spark. 
2. The origin of the table dados_api is the sales API and will be loaded in streamming using AutoLoader.


* Batch Data : Sales from sales consultant - B2B
* Streaming Data : API Sales

## 1.0 Initial Setup 

In [0]:
%run "/Users/cabreirajm@gmail.com/DataPipelineCabreira/Helpers/data_generator"



The Data
1. **generate_api_data** - This function generate the Sales API streaming data which is in JSON format and is stored in **Landing Zone** : `dbfs:/FileStore/landing/stream/`


* The payload struct has information regarding user registration, the sale itself and the payment method. If an user just visit the website without any other action ( registration and selling), all elements will be set as null. 

An API file example is presented below : 

```
    {
        "access_date":"2024-06-02T19:01:09.000Z",
        "ip_address":"207.198.60.166",
        "access_point":"chrome",
        "payload":{
            "info_usuario":{
                "nome":"Usuario c3e5d305e1",
                "idade":"44",
                "sexo":"F",
                "email":"usuario_c3e5d305e1@outlook.com",
                "profissao":"Desenvolvedor de ETL",
                "estado":"TO"
            },
            "info_produto":{
                "product_uuid":"f260cd97c6c9813b01601e834a2added",
                "valor":"R$ 589,90"
            },
            "info_pagamento":{
                "valor":"589.90",
                "forma_pagamento":"credito",
                "quantidade_parcelas":"2",
                "valor_parcelas":"294.95"
            }
        }
    }
```

2. **generate_files_data** - This function generates the batch data in csv format. As mentioned above, these csv data are data from sales consultant that sells the products to business ( B2B ). All the data are stored in **Landing Zone** `dbfs:/FileStore/landing/files/`.

A batch file example is presented below:

```
    data_venda,nome_empresa,sexo,nome_funcionario,email_functionario,profissao,idade,estado,curso,valor,disconto
    2025-02-26,Empresa A,M,Funcionario 3a0f99b401,funcionario_3a0f99b401@empresaa.com.br,Cientista de Dados,44,RO,Construindo o seu Primeiro Pipeline de Dados com o Databricks,"R$ 789,90",5%


```

In [0]:
# generate api data
query = generate_api_data()

# generate file data
generate_files_data(100000)

generate_api_data-inicio-2024-11-23 09:35:24.478801
generate_api_data-fim-2024-11-23 09:35:28.542894
              
generate_files_data-inicio-2024-11-23 09:35:28.543169
O arquivo .csv com 100000 registros foi gerado no diretorio 'dbfs:/FileStore/landing/files'.
generate_files_data-fim-2024-11-23 09:35:45.524093
              


## 2.0 Create Bronze Schema

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")

DataFrame[]

## 3.0 Streaming Data Ingestion to Bronze Layer 

### 3.1 Read Stream Data as DataFrame

Using the method **spark.readStream** and the AutoLoader **.format('cloudFiles')** we will be able to read the API streming data with no need of a pre defined schema.

* `option('cloudFiles.format', 'json')` : Allows us to read the json data with no need of defining the schema. It is used to identify the file format the Autoloader will process. 
* `option('cloudFiles.schemaLocation', schema_location_api))`: The local where the infered schema will be stored and versioned.
* `option('cloudFiles.inferColumnTypes', True)`: Allows AutoLoader to infer the schema of all columns.
* `load(stream_lading_path)`: Creates the readStream processes pointing to the local where the API is sending the streaming data ( the origin path)

The `auto_loader_df` dataframe is created below following all the above requirements. In addition, we create two new columns : 
1. `source_file_name`: Indicates the identifier of the file that originated the registri. 
2. `processing_timestamp`: The timestamp when the data is ingested and processed. 

Note thar these two columns are added in order to help in possible debuggings.

In [0]:

# Reading the API streaming data
schema_location_api = 'dbfs:/user/hive/warehouse/bronze.db/_schemas/load_api_raw_data'
stream_lading_path  = "dbfs:/FileStore/landing/stream/"
auto_loader_df = (
  spark.readStream
       .format('cloudFiles')
       .option('cloudFiles.format','json')
       .option('cloudFiles.schemaLocation', schema_location_api)
       .option('cloudFiles.inferColumnTypes', True)
       .load(stream_lading_path)
)

auto_loader_df = (
  auto_loader_df.withColumn("source_file_name", col("_metadata.file_name"))
                .withColumn("processing_timestamp", current_timestamp() )
)

auto_loader_df.limit(5).display()

access_date,access_point,ip_address,payload,_rescued_data,source_file_name,processing_timestamp
2024-06-02T04:20:11.000Z,safari,69.127.75.83,"List(null, null, List(usuario_022d744bd9@hotmail.com, PI, 20, Usuario 022d744bd9, Arquiteto de Dados, F))",,part-00004-a3c1111d-bc3c-4dda-9b95-fc4fb36260cd-c000.json,2024-11-23T09:36:08.714Z
2024-06-02T04:53:07.000Z,android,168.18.37.100,"List(null, null, null)",,part-00004-a3c1111d-bc3c-4dda-9b95-fc4fb36260cd-c000.json,2024-11-23T09:36:08.714Z
2024-06-02T05:26:03.000Z,firefox,113.109.66.208,"List(null, null, null)",,part-00004-a3c1111d-bc3c-4dda-9b95-fc4fb36260cd-c000.json,2024-11-23T09:36:08.714Z
2024-06-02T05:58:59.000Z,chrome,87.241.252.59,"List(null, null, List(usuario_17e7c1bc35@hotmail.com, AP, 20, Usuario 17e7c1bc35, Analista de Dados, F))",,part-00004-a3c1111d-bc3c-4dda-9b95-fc4fb36260cd-c000.json,2024-11-23T09:36:08.714Z
2024-06-02T06:31:55.000Z,iphone,188.111.120.11,"List(null, null, null)",,part-00004-a3c1111d-bc3c-4dda-9b95-fc4fb36260cd-c000.json,2024-11-23T09:36:08.714Z


* The `_rescued_data` column contains data that spark couldnt identify during the process of schema evolution. In other words, the autoLoader stores within this column the registers that it was not able to identify its schema.
* Example: Changes in data type might be stored since the autolader may not identify the schema evolution.

### 3.2 Loading API Stream Data into Bronze Layer

We will use the `.writeStream` method to write the auto_loader_df into a **bronze.api_data** table - which will be created on the fly through the method `table()`

The method `outputMode('append')` means that the data will be appended in the bronze layer table.

The `.option('checkpointLocation',api_data_checkpoint_path)` method used to specify the location of the log that will be used by spark to manage the exactly-once semantics.

Note that the process specified bellow is async and will be working as streaming until we stop it

In [0]:
%fs rm -r dbfs:/user/hive/warehouse/bronze.db/api_data

In [0]:
api_data_checkpoint_path = 'dbfs:/user/hive/warehouse/bronze.db/_checkpoint/api_data'
(
    auto_loader_df
        .writeStream
        .format('delta')
        .outputMode('append')
        .option('checkpointLocation',api_data_checkpoint_path)
        .table('bronze.api_data')
)

<pyspark.sql.streaming.query.StreamingQuery at 0x7ff7ece9e6e0>

### 3.3 Check the bronze Layer Table 

* The data is being stored in real time

In [0]:
spark.sql("SELECT * FROM bronze.api_data LIMIT 5").display()

access_date,access_point,ip_address,payload,_rescued_data,source_file_name,processing_timestamp
2024-06-02T04:20:11.000Z,safari,69.127.75.83,"List(null, null, List(usuario_022d744bd9@hotmail.com, PI, 20, Usuario 022d744bd9, Arquiteto de Dados, F))",,part-00004-a3c1111d-bc3c-4dda-9b95-fc4fb36260cd-c000.json,2024-11-23T09:37:34.673Z
2024-06-02T04:53:07.000Z,android,168.18.37.100,"List(null, null, null)",,part-00004-a3c1111d-bc3c-4dda-9b95-fc4fb36260cd-c000.json,2024-11-23T09:37:34.673Z
2024-06-02T05:26:03.000Z,firefox,113.109.66.208,"List(null, null, null)",,part-00004-a3c1111d-bc3c-4dda-9b95-fc4fb36260cd-c000.json,2024-11-23T09:37:34.673Z
2024-06-02T05:58:59.000Z,chrome,87.241.252.59,"List(null, null, List(usuario_17e7c1bc35@hotmail.com, AP, 20, Usuario 17e7c1bc35, Analista de Dados, F))",,part-00004-a3c1111d-bc3c-4dda-9b95-fc4fb36260cd-c000.json,2024-11-23T09:37:34.673Z
2024-06-02T06:31:55.000Z,iphone,188.111.120.11,"List(null, null, null)",,part-00004-a3c1111d-bc3c-4dda-9b95-fc4fb36260cd-c000.json,2024-11-23T09:37:34.673Z


## 4.0 Batch Data Ingestion to Bronze Layer 

%md
We will use the `.read` method to read the batch csv data.

* `.option('header',True)`: States that the first row of the csv file is a header
* `option('sep',",")`: States that the columns are comma separated
* `load(batch_landing_path)`: Points to the batch data path 

In addition, we create two new columns : 
1. `source_file_name`: Indicates the identifier of the file that originated the registri. 
2. `processing_timestamp`: The timestamp when the data is ingested and processed. 



### 4.1 Read batch Data as DataFrame

In [0]:
batch_landing_path = 'dbfs:/FileStore/landing/files'
file_data = (
    spark.read
         .format('csv')
         .option('header',True)
         .option('sep',",")
         .load(batch_landing_path)
    )

file_data = (
    file_data.withColumn("source_file_name", col("_metadata.file_name"))
             .withColumn("processing_timestamp", current_timestamp())
)

We will now create two new variables:

1. **source_file**: Stores the processed file name
2. **qnt_rows_file**: Stores the quantity of rows for each processed file



### 4.2 Loading csv Batch Data into Bronze Layer


* `write`: Used to write the csv batch data in the `bronze.file_data` table 
* `saveAsTable('bronze.file_data')` : Used to save the data in the `bronze.file_data` table 
* `mode('append')`: States that the data will be appended in the `bronze.file_data` table 

In [0]:
%fs rm -r dbfs:/user/hive/warehouse/bronze.db/file_data

In [0]:
(
  file_data
    .write
    .format("csv")
    .mode('append')
    .saveAsTable('bronze.file_data')
)

spark.sql('SELECT * FROM bronze.file_data LIMIT 5').display()

data_venda,nome_empresa,sexo,nome_funcionario,email_functionario,profissao,idade,estado,curso,valor,disconto,source_file_name,processing_timestamp
2025-02-26,Empresa A,M,Funcionario 3284054f49,funcionario_3284054f49@empresaa.com.br,Cientista de Dados,44,RO,Construindo o seu Primeiro Pipeline de Dados com o Databricks,"R$ 789,90",5%,part-00000-tid-4386786483019997920-d94540fd-8cb4-4fe4-ac45-e5e62d250e31-377-1-c000.csv,2024-11-23T09:39:35.95Z
2024-07-01,Empresa A,F,Funcionario 85502baecc,funcionario_85502baecc@empresaa.com.br,Desenvolvedor de ETL,31,AC,Do Primeiro Pipeline ao Data Lakehouse com o Databricks,"R$ 689,90",5%,part-00000-tid-4386786483019997920-d94540fd-8cb4-4fe4-ac45-e5e62d250e31-377-1-c000.csv,2024-11-23T09:39:35.95Z
2025-11-23,Empresa A,M,Funcionario cbdc66550c,funcionario_cbdc66550c@empresaa.com.br,Desenvolvedor de ETL,19,AM,Construindo Pipelines de Dados usando o Spark Structured Streaming,"R$ 549,90",5%,part-00000-tid-4386786483019997920-d94540fd-8cb4-4fe4-ac45-e5e62d250e31-377-1-c000.csv,2024-11-23T09:39:35.95Z
2025-04-07,Empresa A,F,Funcionario 348609e786,funcionario_348609e786@empresaa.com.br,Analista de Dados,21,RR,Construindo o seu Primeiro Pipeline de Dados com o Databricks,"R$ 789,90",5%,part-00000-tid-4386786483019997920-d94540fd-8cb4-4fe4-ac45-e5e62d250e31-377-1-c000.csv,2024-11-23T09:39:35.95Z
2024-08-10,Empresa A,M,Funcionario cfe808cdc0,funcionario_cfe808cdc0@empresaa.com.br,Arquiteto de Dados,44,PA,Do Primeiro Pipeline ao Data Lakehouse com o Databricks,"R$ 689,90",5%,part-00000-tid-4386786483019997920-d94540fd-8cb4-4fe4-ac45-e5e62d250e31-377-1-c000.csv,2024-11-23T09:39:35.95Z


Duas variáveis serão utilizadas no processo:
1. **source_file** - Armazenará o nome do arquivo a ser processado. 
2. **qtde_rows_arquivo** - Armazenará a quantidade de registros a serem processados com origem no arquivo.


We will use those variables in order do identify the files that have already been stored in the table and do the completeness validation to check wheather or not the data from the origin file are stored in the bronze table.

If the data has been stored in bronze table, the data will be deleted from the landing zone in order to avoid duplicated data load

In [0]:
source_file = file_data.select("source_file_name").distinct().collect()[0]['source_file_name']
qtt_rows_files = file_data.count()

print('Total of rows:',qtt_rows_files)
print('File Name:',source_file)

Total of rows: 200000
File Name: part-00000-tid-4386786483019997920-d94540fd-8cb4-4fe4-ac45-e5e62d250e31-377-1-c000.csv


* Now we are able to check if the total rows are the same ( file and table data)
* Once the batch data ingestion does not have **checkpoint**, we have to delete the file loaded to the bronze.file_data from the landing zone.
* In order to do that, we have to make sure that the number of rows in the file and in the bronze.file_data are the same.
* If so, we delete the file. Otherwise, we have to check the process. 

Note: This is done, to avoid duplicated data in the bronze layer. 

In [0]:
  qnt_rows_table = (
  spark.sql(f"""
              SELECT COUNT(*) qtde_rows 
              FROM bronze.file_data
              WHERE source_file_name = '{source_file}'
              """)
  ).collect()[0]['qtde_rows']
  print('Qnt of rows in bromze.file_data table:',qnt_rows_table)

Qnt of rows in bromze.file_data table: 100000


In [0]:
if qnt_rows_table == qtt_rows_files:
  dbutils.fs.rm(f'dbfs:/FileStore/landing/files/{source_file}')

The command below stops the straming process and delete the landing zone directory in order to avoid extra cost.

In [0]:
stop_all_streams()
clean_up_landing_dir()


stop_all_streams-inicio-2024-11-23 09:40:33.446990
O stream None fui finalizado com sucesso.
O stream display_query_2 fui finalizado com sucesso.
O stream generate_api_stream_data fui finalizado com sucesso.
stop_all_streams-fim-2024-11-23 09:40:34.699965
              
clean_up_landing_dir-inicio-2024-11-23 09:40:34.700087
Todos os arquivos e diretórios dentro de 'dbfs:/FileStore/landing/' foram excluidos com sucesso.
clean_up_landing_dir-fim-2024-11-23 09:40:36.235517
              
