In [None]:
# Importando as bibliotecas necessárias
import requests  # Para fazer solicitações HTTP
import pandas as pd  # Para manipulação de dados em formato de tabela
from azure.storage.blob import BlobServiceClient  # Para acessar o Azure Blob Storage
import csv  # Para trabalhar com arquivos CSV
from io import StringIO  # Para trabalhar com dados de texto em memória

from pyspark.sql import SparkSession  # Para trabalhar com Spark SQL

# Crie uma instância da sessão Spark
spark = SparkSession.builder.appName("MinhaAplicacao").getOrCreate()

# Fontes externas de dados

# Capturando dados do IBGE através de sua API
url = "https://servicodados.ibge.gov.br/api/v1/localidades/distritos"
response = requests.get(url)

# Capturando dados do Azure Blob Storage

# Definindo a string de conexão do Azure Blob Storage
connection_string = "------"  # Aqui deveria estar a string de conexão real
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

# Nome do container onde está o arquivo CSV
container_name = "neydados"
blob_name = "tbfuncionario.csv"

# Obtendo uma referência para o blob no Azure Blob Storage
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

# Baixando o conteúdo do blob
blob_data = blob_client.download_blob()

# Lendo o conteúdo do blob como texto e convertendo para CSV
conteudo = blob_data.content_as_text()
dados_csv = list(csv.reader(StringIO(conteudo)))

# Imprimindo os dados CSV obtidos do blob
print(dados_csv)

In [87]:
# Criando um DataFrame a partir dos dados CSV obtidos do Azure Blob Storage.
# Os dados estão contidos em 'dados_csv', onde 'dados_csv[0]' contém os cabeçalhos das colunas e 'dados_csv[1:]' contém os dados.
# O método 'spark.createDataFrame()' do Spark cria um DataFrame a partir de uma lista de linhas e um esquema opcional.
df_funcionario_blob = spark.createDataFrame(data=dados_csv[1:], schema=dados_csv[0])

# Criando um DataFrame a partir da resposta JSON obtida da API do IBGE.
# A resposta JSON contém dados sobre distritos, que serão convertidos em um DataFrame.
# O método 'response.json()' retorna os dados JSON da resposta HTTP.
df_ibge = spark.createDataFrame(response.json())

In [88]:
# Mostra o conteúdo do DataFrame df_funcionario_blob.
# O método .show() é usado para exibir as primeiras linhas do DataFrame na saída padrão.

df_funcionario_blob.show()

+--------------+--------+---------------+------------+-------------+-------+------------+
|id_funcionario|    nome|          cargo|departamento|data_admissao|salario|id_municipio|
+--------------+--------+---------------+------------+-------------+-------+------------+
|             1|    João|Sales Associate|      Vendas|   2020-01-01|2500.00|   211400705|
|             2|   Maria|  Sales Manager|      Vendas|   2018-05-15|3500.00|   310020305|
|             3|  Carlos|     Accountant|    Finanças|   2019-03-10|3000.00|   211400705|
|             4|  Marcos|     Accountant|    Finanças|   2019-03-10|3000.00|   310020305|
|             5|Danielly|      Associate|      Vendas|   2019-03-10|3000.00|   310020305|
+--------------+--------+---------------+------------+-------------+-------+------------+



In [89]:
# Mostra o conteúdo do DataFrame df_ibge.
# O método .show() é usado para exibir as primeiras linhas do DataFrame na saída padrão.


df_ibge.show()

+---------+--------------------+--------------------+
|       id|           municipio|                nome|
+---------+--------------------+--------------------+
|520005005|{microrregiao -> ...|     Abadia de Goiás|
|310010405|{microrregiao -> ...| Abadia dos Dourados|
|520010005|{microrregiao -> ...|           Abadiânia|
|520010010|{microrregiao -> ...|      Posse d'Abadia|
|310020305|{microrregiao -> ...|              Abaeté|
|150010705|{microrregiao -> ...|          Abaetetuba|
|150010710|{microrregiao -> ...|                Beja|
|230010105|{microrregiao -> ...|             Abaiara|
|230010120|{microrregiao -> ...|            São José|
|230010125|{microrregiao -> ...|               Oitis|
|230010130|{microrregiao -> ...|           Triângulo|
|230010135|{microrregiao -> ...|Olho D'Água de Pe...|
|290010805|{microrregiao -> ...|              Abaíra|
|290010810|{microrregiao -> ...|             Catolés|
|290020705|{microrregiao -> ...|               Abaré|
|290020710|{microrregiao -> 

In [90]:
# Tratamento do DataFrame df_ibge

# Renomeando a coluna 'nome' para 'nome_municipio' no DataFrame df_ibge.
df_ibge = df_ibge.withColumnRenamed('nome', 'nome_municipio')

# Selecionando apenas as colunas 'nome_municipio' e 'id' no DataFrame df_ibge.
df_ibge = df_ibge.select('nome_municipio', 'id')


# Tratamento do DataFrame df_funcionario_blob

# Selecionando apenas as colunas 'id_municipio', 'nome', 'cargo' e 'id_funcionario' no DataFrame df_funcionario_blob.
df_funcionario_blob = df_funcionario_blob.select('id_municipio', 'nome', 'cargo', 'id_funcionario')

In [91]:
# Mostra o conteúdo do DataFrame apos o tratamento
df_funcionario_blob.show()


+------------+--------+---------------+--------------+
|id_municipio|    nome|          cargo|id_funcionario|
+------------+--------+---------------+--------------+
|   211400705|    João|Sales Associate|             1|
|   310020305|   Maria|  Sales Manager|             2|
|   211400705|  Carlos|     Accountant|             3|
|   310020305|  Marcos|     Accountant|             4|
|   310020305|Danielly|      Associate|             5|
+------------+--------+---------------+--------------+



In [92]:
df_ibge.show()

+--------------------+---------+
|      nome_municipio|       id|
+--------------------+---------+
|     Abadia de Goiás|520005005|
| Abadia dos Dourados|310010405|
|           Abadiânia|520010005|
|      Posse d'Abadia|520010010|
|              Abaeté|310020305|
|          Abaetetuba|150010705|
|                Beja|150010710|
|             Abaiara|230010105|
|            São José|230010120|
|               Oitis|230010125|
|           Triângulo|230010130|
|Olho D'Água de Pe...|230010135|
|              Abaíra|290010805|
|             Catolés|290010810|
|               Abaré|290020705|
|                 Ibó|290020710|
|              Abatiá|410010305|
|       Abdon Batista|420005105|
|     Abel Figueiredo|150013105|
|        Abelardo Luz|420010105|
+--------------------+---------+
only showing top 20 rows



In [93]:
# Junção entre os DataFrames df_ibge e df_funcionario_blob
# Utilizando a coluna 'id' do df_ibge e a coluna 'id_municipio' do df_funcionario_blob como chaves de junção.
# A junção é do tipo 'inner', ou seja, apenas as linhas que possuem correspondência em ambos os DataFrames serão mantidas.


df_relacion=df_ibge.join(df_funcionario_blob,df_ibge.id==df_funcionario_blob.id_municipio,'inner').drop('id')


In [94]:
df_relacion.show()

+--------------+------------+--------+---------------+--------------+
|nome_municipio|id_municipio|    nome|          cargo|id_funcionario|
+--------------+------------+--------+---------------+--------------+
|       Zé Doca|   211400705|    João|Sales Associate|             1|
|        Abaeté|   310020305|   Maria|  Sales Manager|             2|
|       Zé Doca|   211400705|  Carlos|     Accountant|             3|
|        Abaeté|   310020305|  Marcos|     Accountant|             4|
|        Abaeté|   310020305|Danielly|      Associate|             5|
+--------------+------------+--------+---------------+--------------+



In [95]:
for dado in df_relacion.collect():
    
    dados = {
    "id":dado[4],
    "nome_municipio": dado[0],
    "id_municipio": dado[1],
    "nome": dado[2],
    "cargo": dado[3],
    }
    print(dados)


{'id': '1', 'nome_municipio': 'Zé Doca', 'id_municipio': '211400705', 'nome': 'João', 'cargo': 'Sales Associate'}
{'id': '2', 'nome_municipio': 'Abaeté', 'id_municipio': '310020305', 'nome': 'Maria', 'cargo': 'Sales Manager'}
{'id': '3', 'nome_municipio': 'Zé Doca', 'id_municipio': '211400705', 'nome': 'Carlos', 'cargo': 'Accountant'}
{'id': '4', 'nome_municipio': 'Abaeté', 'id_municipio': '310020305', 'nome': 'Marcos', 'cargo': 'Accountant'}
{'id': '5', 'nome_municipio': 'Abaeté', 'id_municipio': '310020305', 'nome': 'Danielly', 'cargo': 'Associate'}


In [96]:
# Defina a string de conexão para o seu banco de dados Cosmos
connection_string =                                                                                             ""

# Crie uma instância do cliente Cosmos utilizando a string de conexão
client = CosmosClient.from_connection_string(connection_string)

# Acesse ou crie um banco de dados e um contêiner no Cosmos
database_name =                                                                                                       ""
container_name = ""
database = client.get_database_client(database_name)
container = database.get_container_client(container_name)

# Dados de exemplo para inserir no contêiner
for dado in df_relacion.collect():
    # Cria um dicionário 'dados' contendo os valores de cada linha do DataFrame
    dados = {
        "id": dado[4],
        "nome_municipio": dado[0],
        "id_municipio": dado[1],
        "nome": dado[2],
        "cargo": dado[3],
    }
    # Imprime os dados de cada linha
    print(dados)
    # Insere os dados no contêiner do Cosmos
    container.create_item(body=dados)

# Mensagem indicando que os dados foram inseridos com sucesso
print("Dados inseridos com sucesso!")

{'id': '1', 'nome_municipio': 'Zé Doca', 'id_municipio': '211400705', 'nome': 'João', 'cargo': 'Sales Associate'}
{'id': '2', 'nome_municipio': 'Abaeté', 'id_municipio': '310020305', 'nome': 'Maria', 'cargo': 'Sales Manager'}
{'id': '3', 'nome_municipio': 'Zé Doca', 'id_municipio': '211400705', 'nome': 'Carlos', 'cargo': 'Accountant'}
{'id': '4', 'nome_municipio': 'Abaeté', 'id_municipio': '310020305', 'nome': 'Marcos', 'cargo': 'Accountant'}
{'id': '5', 'nome_municipio': 'Abaeté', 'id_municipio': '310020305', 'nome': 'Danielly', 'cargo': 'Associate'}
Dados inseridos com sucesso!
