<img src="https://raw.githubusercontent.com/lcpassuncao/public/main/images/vibra.png">

%md
### Version Code Control

| versão | data | autor | e-mail | alterações |
| --- | --- | --- | --- | --- |
| 1.0 | 20-JUN-2024 | Rafael Santos | rafaelsantos@vibraenergia.com.br | Primeira versão  |

%md
### Descrição

| projeto | aplicação | módulo | tabela | objetivo |
| --- | --- | --- | --- | --- |
| Acadêmico | Laboratório 2 | ETL Landing | Diversas JSON | Ingestão de arquivos publicos com gasto dos senadores em JSON - bases de teste para o Treinamento e conhecimento da solução do Databricks |

In [0]:
import requests
import datetime
import json
from pyspark.dbutils import DBUtils

dbutils = DBUtils(spark)

#obtem o ano atual
ano_atual = datetime.datetime.now().year

for year in range(2008, ano_atual+1):
    url = f'https://adm.senado.gov.br/adm-dadosabertos/api/v1/senadores/despesas_ceaps/{year}'
    response = requests.get(url)
    
    if response.status_code == 200:
        df = response.json()
        directory_path = '/mnt/stedlk01databrickspoc/general/landing/senadores/depesas/'
        file_path = f'{directory_path}despesas_ceaps_{year}.json'
        
        print(f'\n{year}')
        # Ensure the directory exists
        dbutils.fs.mkdirs(directory_path)
        
        # Use dbutils.fs.put to write the file to DBFS
        dbutils.fs.put(file_path, json.dumps(df), overwrite=True)
    else:
        print(f'Falha para pegar o dado de {year}. Erro: {response.status_code}')


2008
Wrote 933962 bytes.


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:103)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:103)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecutio

In [0]:
#Diretório da landing
input_landing = '/mnt/stedlk01databrickspoc/general/landing/senadores/depesas/'
# Diretório para salvar as tabelas Delta na camada bronze
output_bronze = '/mnt/stedlk01databrickspoc/general/bronze/senadores/depesas/'

In [0]:
%python
dir = dbutils.fs.ls(directory_path)
arquivos = [dir_name.name for dir_name in dir]

for x in arquivos:
    # Ler o arquivo JSON e salvar como delta parquet
    file_path = input_landing + x
    df = spark.read.json(file_path)
    df.write \
    .format('delta') \
    .mode('overwrite') \
    .save(output_bronze+x.replace('.json', ''))

In [0]:
from functools import reduce
from pyspark.sql import DataFrame

# Diretório base onde os arquivos Delta estão armazenados na camada bronze
output_bronze = '/mnt/stedlk01databrickspoc/general/bronze/senadores/depesas/'

# Lista os diretórios no diretório base
dir = dbutils.fs.ls(output_bronze)
arquivos = [dir_name.name for dir_name in dir]

# Inicializa uma lista para armazenar os DataFrames lidos
dfs = []

# Itera sobre cada diretório e lê o arquivo Delta
for x in arquivos:
    file_path = output_bronze + x
    df = spark.read.format('delta').load(file_path)
    dfs.append(df)

# Combina todos os DataFrames em um único DataFrame
df_gastos_senadores = reduce(DataFrame.unionAll, dfs)

# Defina o nome do catálogo e esquema
catalog_name = "rafaelsantos"
schema_name = "gastos_senadores"
table_name = "senadores_bronze_data"

# Cria o esquema se não existir
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")

# Salva o DataFrame combinado em uma tabela Delta no Unity Catalog
df_gastos_senadores.write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

# Exibir a tabela criada para verificação
display(spark.sql(f"SELECT * FROM {catalog_name}.{schema_name}.{table_name} LIMIT 5"))

ano,cpfCnpj,data,detalhamento,documento,fornecedor,id,mes,nomeSenador,tipoDespesa,tipoDocumento,valorReembolsado
2008,,,,,,2008050352202,5,FÁTIMA CLEIDE,"Aquisição de material de consumo para uso no escritório político, inclusive aquisição ou locação de software, despesas postais, aquisição de publicações, locação de móveis e de equipamentos.",,8529.78
2008,,,,,,2008050352203,5,FÁTIMA CLEIDE,"Locomoção, hospedagem, alimentação, combustíveis e lubrificantes",,4543.25
2008,,,,,,2008060352202,6,FÁTIMA CLEIDE,"Aquisição de material de consumo para uso no escritório político, inclusive aquisição ou locação de software, despesas postais, aquisição de publicações, locação de móveis e de equipamentos.",,11940.0
2008,,,,,,2008060352204,6,FÁTIMA CLEIDE,"Contratação de consultorias, assessorias, pesquisas, trabalhos técnicos e outros serviços de apoio ao exercício do mandato parlamentar",,460.0
2008,,,,,,2008060352203,6,FÁTIMA CLEIDE,"Locomoção, hospedagem, alimentação, combustíveis e lubrificantes",,2791.44
