In [1]:
#ESSE SCRIPT TEM COMO OBJETIVO DE EXTRAIR TODOS OS DADOS DOS DEPUTADOS E DOS PARTIDOS CRIANDO ARQUIVO deputadosCompleto e partidosCompleto na bronze
# 
# 
import pandas as pd
import requests
import numpy as np
import re
import pyarrow as pa
import io
import os
import pyarrow.parquet as pq
from concurrent.futures import ThreadPoolExecutor, as_completed
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient


# Substitua pelos valores reais obtidos no Passo 1
account_name = 'uvabr'
account_key = os.environ['ACCOUNT_KEY']
container_name = 'bronze'

# Conecte-se ao serviço de armazenamento de blob
connect_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connect_str)

# Acesse o contêiner
container_client = blob_service_client.get_container_client(container_name)


# Função para extrair dados da API
def extrair_dados_api(url):
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        return data['dados']
    else:
        print(f"Erro ao acessar a API: {response.status_code}")
        return None

# Função para transformar dados em dataframe
def transformar_dados_para_dataframe(dados):
    df = pd.json_normalize(dados)
    return df

# Função para extrair dados das URIs em paralelo
def extrair_dados_uris(uris):
    dados_completos = []
    contagem = 0

    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_url = {executor.submit(extrair_dados_api, uri): uri for uri in uris}
        for future in as_completed(future_to_url):
            uri = future_to_url[future]
            try:
                dados_api = future.result()
                if dados_api:
                    dados_completos.append(dados_api)
                contagem += 1
                print(f"Processando {contagem}: {uri}")
            except Exception as e:
                print(f"Erro ao processar {uri}: {e}")

    if dados_completos:
        df_completo = pd.json_normalize(dados_completos)
        return df_completo
    else:
        return pd.DataFrame()
    
def enviar_parquet_azure(account_name, account_key, container_name_send, file_name, dataframe):
    # Conecte-se ao serviço de armazenamento de blob
    connect_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
    blob_service_client = BlobServiceClient.from_connection_string(connect_str)

    # Acesse o contêiner
    container_client = blob_service_client.get_container_client(container_name_send)

    # Converta DataFrame para tabela Parquet
    table = pa.Table.from_pandas(dataframe)

    # Crie um buffer de memória para escrever os dados Parquet
    parquet_buffer = io.BytesIO()
    pq.write_table(table, parquet_buffer)

    # Envie os dados para o Azure Blob Storage
    blob_client = container_client.get_blob_client(file_name)
    blob_client.upload_blob(parquet_buffer.getvalue(), overwrite=True)

    print(f"Arquivo Parquet '{file_name}' enviado com sucesso para o Azure Blob Storage.")

def transformar_coluna_data(dfdata):
    for coluna in dfdata.columns:
        if 'data' in coluna.lower():  # Verifica se o nome da coluna contém 'data'
            try:
                dfdata[coluna] = dfdata[coluna].apply(lambda x: '' if (x == None or x < ('1800-01-01') or x > ('2099-12-31') ) else x)
                dfdata[coluna] = pd.to_datetime(dfdata[coluna], errors='coerce')  # Converte para datetime
            except ValueError:
                print(f"Erro ao converter coluna {coluna} para datetime.")
    return dfdata

def transform_id_cod_to_string(df):
    for coluna in df.columns:
        if 'id' in coluna.lower() or 'cod' in coluna.lower():
            df[coluna] = df[coluna].astype(str)
    return df

def transformar_valores(df):
    for column in df.columns:
        if 'url' not in column.lower():  # Verifica se "url" NÃO está no nome da coluna
            if df[column].dtype != 'datetime64[ns]' and df[column].dtype != 'float64':
                df[column] = df[column].replace(['', '<NA>', '[]', pd.NaT, np.nan, None], 'N/A')
            elif df[column].dtype == 'float64':
                print("nada funciona aqui")
                df[column] = df[column].fillna('N/A').astype('str')
    return df

def transformar_valores_float(df):
    for column in df.columns:
        if 'valor' in column.lower():
            print(column)# Verifica se "url" NÃO está no nome da coluna
            if df[column].dtype == 'object' or df[column].dtype == 'string':
                df[column] = df[column].astype(float)
                print("transformacao concluida com sucessso")  
            else:
                print("Deu ruim")             
    return df

def converter_float64_to_int(df):
    float_cols = df.select_dtypes(include=['float64']).columns
    df[float_cols] = df[float_cols].astype('int')
    return df

# Nova função para substituir pontos por sublinhados nos nomes das colunas
def substituir_pontos_por_sublinhados(df):
    df.columns = [col.replace('.', '_') for col in df.columns]
    return df


# URL para extrair dados iniciais dos deputados
url = "https://dadosabertos.camara.leg.br/arquivos/deputados/json/deputados.json"
dados = extrair_dados_api(url)
df = transformar_dados_para_dataframe(dados)

# Extrair dados das URIs dos deputados e criar o dataframe df_deputados
deputados = extrair_dados_uris(df['uri'])

deputados = substituir_pontos_por_sublinhados(deputados)
deputados = transformar_coluna_data(deputados)
deputados = transform_id_cod_to_string(deputados)
deputados = transformar_valores(deputados)
deputados = transformar_valores_float(deputados)

enviar_parquet_azure(account_name, account_key, 'bronze', f'deputadosCompleto.parquet', deputados)
enviar_parquet_azure(account_name, account_key, 'silver', f'deputadosCompleto.parquet', deputados)
enviar_parquet_azure(account_name, account_key, 'gold', f'deputadosCompleto.parquet', deputados)

# Remover duplicatas do campo 'ultimoStatus.uriPartido' em df_deputados
uris_partidos_unicos = deputados['ultimoStatus_uriPartido'].drop_duplicates()

# Extrair dados das URIs dos partidos e criar um novo dataframe
partidos = extrair_dados_uris(uris_partidos_unicos)
partidos = substituir_pontos_por_sublinhados(partidos)
partidos = transformar_coluna_data(partidos)
partidos = transform_id_cod_to_string(partidos)
partidos = transformar_valores(partidos)
partidos = transformar_valores_float(partidos)
partidos=partidos[['id','sigla','nome','uri','urlLogo','status_lider_uri']]
enviar_parquet_azure(account_name, account_key, 'bronze', f'partidosCompleto.parquet', partidos)
enviar_parquet_azure(account_name, account_key, 'silver', f'partidosCompleto.parquet', partidos)
enviar_parquet_azure(account_name, account_key, 'gold', f'partidosCompleto.parquet', partidos)

Processando 1: https://dadosabertos.camara.leg.br/api/v2/deputados/220593
Processando 2: https://dadosabertos.camara.leg.br/api/v2/deputados/204560
Processando 3: https://dadosabertos.camara.leg.br/api/v2/deputados/220714
Processando 4: https://dadosabertos.camara.leg.br/api/v2/deputados/204379
Processando 5: https://dadosabertos.camara.leg.br/api/v2/deputados/204528
Processando 6: https://dadosabertos.camara.leg.br/api/v2/deputados/221328
Processando 7: https://dadosabertos.camara.leg.br/api/v2/deputados/74646
Processando 8: https://dadosabertos.camara.leg.br/api/v2/deputados/121948
Processando 9: https://dadosabertos.camara.leg.br/api/v2/deputados/160508
Processando 10: https://dadosabertos.camara.leg.br/api/v2/deputados/160527
Processando 11: https://dadosabertos.camara.leg.br/api/v2/deputados/204495
Processando 12: https://dadosabertos.camara.leg.br/api/v2/deputados/178835
Processando 13: https://dadosabertos.camara.leg.br/api/v2/deputados/204549
Processando 14: https://dadosaberto