# Spark notebook

This notebook makes a request to Portal da Transparência and gets the data that will be loaded to Cassandra.

## Make the request to the API

This section makes the request to Portal da Transparência. Provided you have the unique key to make the requests to the API, it then makes the request to the "contratos" get request, using the variables "organization_code" (código do órgão (SIAFI)), "initial_date", and "number_of_pages" that you want to request. For this case, I requested 500 pages.

It will then write into a GCS bucket for further processing.

In [2]:
import requests
import json
from google.cloud import storage

def get_api_data(organization_code, initial_date, number_of_pages):
    url = "https://api.portaldatransparencia.gov.br/api-de-dados/contratos"
    key = "<insert_key_here>" # This is where you insert the key acquired after registration in Portal da Transparência

    params = {"codigoOrgao": organization_code, "quantidade": 100, "dataInicial": initial_date, "page": 1}
    headers = {"accept": "*/*", "chave-api-dados": key}

    all_data = []  # Create a single list to store all data

    while params["page"] <= number_of_pages:
        try:
            response = requests.get(url, params=params, headers=headers)
            response.raise_for_status()
            json_data = response.json()

            if not json_data:
                break

            all_data.extend(json_data)  # Append data to the single list
            params["page"] += 1

        except requests.exceptions.RequestException as e:
            print("Error making the request:", e)
            break

    return all_data

In [3]:
# Save the raw files in a GCS bucket
def save_in_gcs(data, bucket_name, file_name):
    client = storage.Client()
    bucket = client.bucket(bucket_name)

    blob = bucket.blob(file_name)
    blob.upload_from_string(json.dumps(data, ensure_ascii=False, indent=2), content_type='application/json')

In [4]:
# Call the function with the given parameters
def main():
    organization_code = "52111"
    initial_date = "01/01/2018"
    number_of_pages = 500
    bucket_name = "cassandra-project-leorickli"
    file_name = "raw/contratos_raw.json"

    page_data = get_api_data(organization_code, initial_date, number_of_pages)
    save_in_gcs(page_data, bucket_name, file_name)

if __name__ == "__main__":
    main()

## Clean, transform and convert the json file into csv

This section prepares the json file by cleaning it, transforming into csv and sending it into a "curated" bucket, so Cassandra can read it in the proper way.

In [5]:
from typing import Final, Dict, Tuple
from pyspark.sql.session import SparkSession
from pyspark.sql import DataFrame as SDF
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, ArrayType
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode_outer, col, when, regexp_replace

In [6]:
def flatten_json(df_arg, parent_name=""):
    df = df_arg
    stack = [(df, "")]

    while stack:
        current_df, parent_name = stack.pop()

        for field in current_df.schema.fields:
            data_type = str(field.dataType)
            column_name = field.name
            full_column_name = f"{parent_name}_{column_name}" if parent_name else column_name

            if data_type.startswith('ArrayType'):
                current_df = current_df.withColumn(full_column_name, explode_outer(col(column_name)))

            elif data_type.startswith('StructType'):
                struct_fields = [f"{column_name}.{subfield.name} AS {full_column_name}_{subfield.name}" for subfield in field.dataType.fields]
                current_df = current_df.selectExpr("*", *struct_fields).drop(column_name)

        stack.extend([(current_df, full_column_name) for field in current_df.schema.fields if str(field.dataType).startswith('StructType')])

    # Rename specific columns
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestoraCompras_orgaoVinculado_codigoSIAFI", "unidadeGestoraCompras_orgaoVinculado_codigoSIAFI")
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestoraCompras_orgaoVinculado_cnpj", "unidadeGestoraCompras_orgaoVinculado_cnpj")
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestoraCompras_orgaoVinculado_sigla", "unidadeGestoraCompras_orgaoVinculado_sigla")
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestoraCompras_orgaoVinculado_nome", "unidadeGestoraCompras_orgaoVinculado_nome")    
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestoraCompras_orgaoMaximo_codigo", "unidadeGestoraCompras_orgaoMaximo_codigo")    
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestoraCompras_orgaoMaximo_sigla", "unidadeGestoraCompras_orgaoMaximo_sigla")    
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestoraCompras_orgaoMaximo_nome", "unidadeGestoraCompras_orgaoMaximo_nome")
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestora_orgaoVinculado_codigoSIAFI", "unidadeGestora_orgaoVinculado_codigoSIAFI")  
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestora_orgaoVinculado_cnpj", "unidadeGestora_orgaoVinculado_cnpj")  
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestora_orgaoVinculado_sigla", "unidadeGestora_orgaoVinculado_sigla")  
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestora_orgaoVinculado_nome", "unidadeGestora_orgaoVinculado_nome")  
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestora_orgaoMaximo_codigo", "unidadeGestora_orgaoMaximo_codigo")  
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestora_orgaoMaximo_sigla", "unidadeGestora_orgaoMaximo_sigla")  
    current_df = current_df.withColumnRenamed("valorInicialCompra_unidadeGestora_orgaoMaximo_nome", "unidadeGestora_orgaoMaximo_nome") 

    # Handle empty strings
    for field in current_df.schema.fields:
        data_type = str(field.dataType)
        column_name = field.name

        if 'StringType' in data_type:
            current_df = current_df.withColumn(column_name, when(col(column_name) == '""', None).otherwise(col(column_name)))

    # Remove or replace \r\n in text columns
    for field in current_df.schema.fields:
        data_type = str(field.dataType)
        column_name = field.name

        if 'StringType' in data_type:
            current_df = current_df.withColumn(column_name, regexp_replace(col(column_name), r'\r\n', ' '))
            current_df = current_df.withColumn(column_name, regexp_replace(col(column_name), r'\"', ''))


    return current_df

In [7]:
if __name__ == "__main__":
    # Create a Spark session
    spark = SparkSession.builder.appName("FlatJson").config("spark.driver.memory", "4g").master("local[*]").getOrCreate()

    # Your GCS bucket and JSON file path
    gcs_bucket = "cassandra-project-leorickli"
    json_file_path = f"gs://{gcs_bucket}/raw/contratos_raw.json"

    # Read the JSON file into a DataFrame with UTF-8 encoding
    df = spark.read.option("multiline", "true").option("encoding", "UTF-8").json(json_file_path)

    # Flatten the DataFrame with nested fields named using the parent object's name as a prefix
    flattened_df = flatten_json(df)

    # Define the output CSV file path in GCS
    gcs_output_path = f"gs://{gcs_bucket}/curated/contratos_curated"

    # Write the CSV file to GCS with UTF-8 encoding
    flattened_df.coalesce(1).write.option("encoding", "UTF-8").csv(gcs_output_path, header=True, mode='overwrite')

    # Show the flattened DataFrame
    flattened_df.show(20, False)

23/12/24 12:06:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
23/12/24 12:07:09 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 2:>                                                          (0 + 1) / 1]

+--------------+---------------+------------------+-----------------+-----------------------------------------------------------------------------------------------------------------------------+---------+----------------------------+---------+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+----------------+------------------+------------------------------------+-------------+---------------------+------------------------------------------------------------------------------------------------------------------------------------

                                                                                