In [0]:
# import necessaries libs
from pyspark.sql.functions import col, current_date, to_date
from pyspark.sql.types     import StringType, DoubleType
import requests
import subprocess
import json

In [0]:
## parameters
# aws s3 parameters
access_key = dbutils.secrets.get(scope = "aws", key = "aws-access-key")
secret_key = dbutils.secrets.get(scope = "aws", key = "aws-secret-key")
encoded_secret_key = secret_key.replace("/", "%2F")
aws_bucket_name = "stonedatalake"
mount_name = "stonedatalake"

# datalake paths
landing_path = f"dbfs:/mnt/{mount_name}/landing/divida-ativa"
bronze_path = f"dbfs:/mnt/{mount_name}/bronze/divida-ativa"
silver_path = f"dbfs:/mnt/{mount_name}/silver/divida-ativa"
gold_path = f"dbfs:/mnt/{mount_name}/gold/divida-ativa"

bronze_path_21388 = f"dbfs:/mnt/{mount_name}/bronze/serie-21388"
silver_path_21388 = f"dbfs:/mnt/{mount_name}/silver/serie-21388"
gold_path_21388 = f"dbfs:/mnt/{mount_name}/gold/serie-21388"

bronze_path_21395 = f"dbfs:/mnt/{mount_name}/bronze/serie-21395"
silver_path_21395 = f"dbfs:/mnt/{mount_name}/silver/serie-21395"
gold_path_21395 = f"dbfs:/mnt/{mount_name}/gold/serie-21395"

In [0]:
# auxiliare function
def download_file(url, path):
    r = requests.get(url, stream=True)
    if r.status_code == requests.codes.OK:
        with open(path, 'wb') as new_file:
                for part in r.iter_content(chunk_size=256):
                    new_file.write(part)
        print(f"Download {path}")
    else:
        r.raise_for_status()

def execute(command): 
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
    stdout, stderr = process.communicate()
    stdout, stderr = stdout.decode('utf-8'), stderr.decode('utf-8')
    return str(process.returncode), stdout, stderr

def parse_json_dataframe(json_list):
    string_list = [json.dumps(i) for i in json_list]
    rdd = sc.parallelize(string_list)
    return spark.read.json(rdd)
  
def download_api_to_dataframe(url):
    r = requests.get(url)
    j = r.json()
    return parse_json_dataframe(j)

In [0]:
# mount s3 bucket
dbutils.fs.mount(f"s3a://{access_key}:{encoded_secret_key}@{aws_bucket_name}", f"/mnt/{mount_name}")

In [0]:
#### Divida ativa data ####

In [0]:
# extract "divida ativa" data from Procuradoria Geral da Fazenda
url_base = 'http://dadosabertos.pgfn.gov.br/'
zip_name = 'Dados_abertos_Nao_Previdenciario.zip'

download_file(url_base+zip_name, f"/tmp/{zip_name}")

ret, out, err = execute(f"unzip /tmp/{zip_name}")
print(out)

lista_csv = [i for i in dbutils.fs.ls("file:/databricks/driver/") if ".csv" in i.path]

for i in lista_csv:
  dbutils.fs.mv(i.path, f"{landing_path}/{i.name}")

In [0]:
# bronze layer

In [0]:
## reading raw data divida-ativa and sink in bronze layer (delta format)
#reading
df_divida_ativa_raw = spark.read.csv(landing_path, sep=";", header=True, encoding="ISO-8859-1")
# drop sensitive data
df_divida_ativa_raw = df_divida_ativa_raw.drop(col('NOME_DEVEDOR')).drop(col('NUMERO_INSCRICAO'))
# adding date_load
df_divida_ativa_raw = df_divida_ativa_raw.withColumn('date_load', current_date().cast(StringType()))
# sink
df_divida_ativa_raw.write.format("delta").partitionBy('date_load').save(bronze_path)

dbutils.fs.rm(landing_path, True)

In [0]:
# silver layer

In [0]:
## data handling and sink to silver layer
df_divida_ativa_bronze = spark.read.format("delta").load(bronze_path)
# cast correct data type
df_divida_ativa_bronze = df_divida_ativa_bronze.withColumn('VALOR_CONSOLIDADO', col('VALOR_CONSOLIDADO').cast(DoubleType()))\
                                               .withColumn('DATA_INSCRICAO', to_date(col('DATA_INSCRICAO'), "dd/MM/yyyy"))

# drop NA
df_divida_ativa_bronze = df_divida_ativa_bronze.na.drop()
# sink
df_divida_ativa_bronze.write.format("delta").partitionBy('date_load').save(silver_path)

In [0]:
# gold layer
# Make the necessary aggregations and make available to users

In [0]:
#### Banco Central do Brasil data ####

In [0]:
## get api data and sink to bronze layer
# reading
url_serie_21388 = 'http://api.bcb.gov.br/dados/serie/bcdata.sgs.21388/dados?formato=json'
df_serie_21388_raw = download_api_to_dataframe(url_serie_21388)
# adding date_load
df_serie_21388_raw = df_serie_21388_raw.withColumn('date_load', current_date().cast(StringType()))
# sink
df_serie_21388_raw.write.format("delta").partitionBy('date_load').save(bronze_path_21388)

In [0]:
## data handling and sink to silver layer
df_serie_21388 = spark.read.format("delta").load(bronze_path_21388)
# cast correct data type
df_serie_21388 = df_serie_21388.withColumn('valor', col('valor').cast(DoubleType()))\
                               .withColumn('data', to_date(col('data'), "dd/MM/yyyy"))
# drop NA
df_serie_21388 = df_serie_21388.na.drop()
# sink
df_serie_21388.write.format("delta").partitionBy('date_load').save(silver_path_21388)

In [0]:
# gold layer
# Make the necessary aggregations and make available to users

In [0]:
## get api data and sink to bronze layer
# reading
url_serie_21395 = 'http://api.bcb.gov.br/dados/serie/bcdata.sgs.21395/dados?formato=json'
df_serie_21395_raw = download_api_to_dataframe(url_serie_21395)
# adding date_load
df_serie_21395_raw = df_serie_21388_raw.withColumn('date_load', current_date().cast(StringType()))
# sink
df_serie_21395_raw.write.format("delta").partitionBy('date_load').save(bronze_path_21395)

In [0]:
## data handling and sink to silver layer
df_serie_21395 = spark.read.format("delta").load(bronze_path_21395)
# cast correct data type
df_serie_21395 = df_serie_21395.withColumn('valor', col('valor').cast(DoubleType()))\
                               .withColumn('data', to_date(col('data'), "dd/MM/yyyy"))
# drop NA
df_serie_21395 = df_serie_21395.na.drop()
# sink
df_serie_21395.write.format("delta").partitionBy('date_load').save(silver_path_21395)

In [0]:
# gold layer
# Make the necessary aggregations and make available to users

In [0]:
#### Creating metastore ####

In [0]:
spark.sql('CREATE DATABASE IF NOT EXISTS stone')

In [0]:
def create_hive_store(db, table, schema, partition, location):
    spark.sql(f'''
              CREATE TABLE IF NOT EXISTS {db}.{table}
              (
              {schema}
              )
              USING DELTA
              PARTITIONED BY ({partition})
              LOCATION '{location}'
              ''')

In [0]:
# divida_ativa_silver
create_hive_store('stone',
                  'divida_ativa_silver',
                  '''
                  CPF_CNPJ string,
                  TIPO_PESSOA string,
                  TIPO_DEVEDOR string,
                  UF_UNIDADE_RESPONSAVEL string,
                  UNIDADE_RESPONSAVEL string,
                  TIPO_SITUACAO_INSCRICAO string,
                  SITUACAO_INSCRICAO string,
                  RECEITA_PRINCIPAL string,
                  DATA_INSCRICAO date,
                  INDICADOR_AJUIZADO string,
                  VALOR_CONSOLIDADO double,
                  date_load string
                  ''',
                  'date_load',
                  silver_path
                 )

In [0]:
# serie_21388_silver
create_hive_store('stone',
                  'serie_21388_silver',
                  '''
                  data date,
                  valor double,
                  date_load string
                  ''',
                  'date_load',
                  silver_path_21388
                 )

In [0]:
# serie_21395_silver
create_hive_store('stone',
                  'serie_21395_silver',
                  '''
                  data date,
                  valor double,
                  date_load string
                  ''',
                  'date_load',
                  silver_path_21395
                 )