In [56]:
# config.py
ATLAS_CONFIG = {
    "url": "http://localhost:21000",
    "username": "admin", 
    "password": "admin"
}

POSTGRES_CONFIG = {
    "host": "localhost",
    "port": 2001,
    "database": "northwind",
    "user": "postgres",
    "password": "postgres"
}

In [57]:
# atlas_client.py
import sys
import os
import requests
import json
from requests.auth import HTTPBasicAuth

class AtlasClient:
    """Cliente para interagir com a API REST do Apache Atlas."""
    
    def __init__(self, url, username, password):
        self.base_url = f"{url}/api/atlas/v2"
        self.auth = HTTPBasicAuth(username, password)
        self.headers = {"Content-Type": "application/json"}
        print(f"AtlasClient inicializado para {self.base_url}")

    def _make_request(self, method, endpoint, **kwargs):
        """Método helper para fazer requisições e tratar erros."""
        try:
            url = f"{self.base_url}{endpoint}"
            response = requests.request(method, url, auth=self.auth, headers=self.headers, **kwargs)
            response.raise_for_status()  
            return response.json()
        except requests.exceptions.HTTPError as e:
            print(f"Erro HTTP {e.response.status_code} no endpoint {endpoint}: {e.response.text}")
            return None
        except requests.exceptions.RequestException as e:
            print(f"Erro de conexão com Atlas: {e}")
            return None
            
    
    def search_entities(self, query):
        """Busca entidades usando a busca básica (GET /search/basic)."""
        endpoint = f"/search/basic?query={query}&limit=10"
        return self._make_request("GET", endpoint)

    def create_entity(self, entity_data):
        """Cria uma ou mais entidades (POST /entity/bulk)."""
        endpoint = "/entity/bulk"
        payload = {"entities": entity_data if isinstance(entity_data, list) else [entity_data]}
        return self._make_request("POST", endpoint, data=json.dumps(payload))
        
    def get_entity(self, guid):
        """Obtém uma entidade pelo seu GUID (GET /entity/guid/{guid})."""
        endpoint = f"/entity/guid/{guid}"
        return self._make_request("GET", endpoint)
    
    def get_lineage(self, guid):
        """Obtém a linhagem de uma entidade (GET /lineage/{guid})."""
        endpoint = f"/lineage/{guid}"
        return self._make_request("GET", endpoint)

In [58]:
# postgres_extractor.py
import sys
import os
import psycopg2
import pandas as pd

class PostgreSQLExtractor:
    """Extrai metadados de tabelas do banco de dados PostgreSQL Northwind."""

    def __init__(self, host, port, database, user, password):
        self.conn_params = f"host='{host}' port='{port}' dbname='{database}' user='{user}' password='{password}'"
        self.database_name = database
        
    def _execute_query(self, query):
        """Conecta e executa uma query, retornando resultados como lista de tuplas."""
        conn = None
        try:
            conn = psycopg2.connect(self.conn_params)
            cur = conn.cursor()
            cur.execute(query)
            return cur.fetchall()
        except Exception as e:
            print(f"Erro ao conectar ou executar query: {e}")
            return []
        finally:
            if conn:
                conn.close()

    def extract_table_and_column_metadata(self, schema='public'):
        """Extrai nome da tabela, colunas (nome, tipo, nullable, posição) e PKs."""
        query = f"""
        SELECT 
            c.table_name, 
            c.column_name, 
            c.data_type, 
            CASE WHEN c.is_nullable = 'YES' THEN TRUE ELSE FALSE END AS is_nullable,
            c.ordinal_position,
            CASE WHEN tco.constraint_type = 'PRIMARY KEY' THEN TRUE ELSE FALSE END AS is_pk
        FROM 
            information_schema.columns c
        LEFT JOIN 
            information_schema.key_column_usage kcu 
            ON c.table_schema = kcu.table_schema 
            AND c.table_name = kcu.table_name 
            AND c.column_name = kcu.column_name
        LEFT JOIN 
            information_schema.table_constraints tco 
            ON kcu.constraint_name = tco.constraint_name 
            AND kcu.table_schema = tco.table_schema 
            AND kcu.table_name = tco.table_name
        WHERE 
            c.table_schema = '{schema}'
        ORDER BY 
            c.table_name, c.ordinal_position;
        """
        results = self._execute_query(query)
        columns = ['table_name', 'column_name', 'data_type', 'is_nullable', 'position', 'is_pk']
        df = pd.DataFrame(results, columns=columns)
        
        # Estrutura os dados por tabela
        metadata = {}
        for table_name, group in df.groupby('table_name'):
            metadata[table_name] = {
                'database': self.database_name,
                'columns': group[['column_name', 'data_type', 'is_nullable', 'position', 'is_pk']].to_dict('records')
            }
        return metadata

    def extract_relationships(self, schema='public'):
        """Extrai relacionamentos (Foreign Keys) para linhagem."""
        query = f"""
        SELECT 
            tc.table_name AS from_table,
            kcu.column_name AS from_column,
            ccu.table_name AS to_table,
            ccu.column_name AS to_column
        FROM 
            information_schema.table_constraints AS tc 
        JOIN 
            information_schema.key_column_usage AS kcu
            ON tc.constraint_name = kcu.constraint_name
        JOIN 
            information_schema.constraint_column_usage AS ccu
            ON ccu.constraint_name = tc.constraint_name
        WHERE 
            tc.constraint_type = 'FOREIGN KEY' AND tc.table_schema = '{schema}';
        """
        results = self._execute_query(query)
        columns = ['from_table', 'from_column', 'to_table', 'to_column']
        return pd.DataFrame(results, columns=columns).to_dict('records')

In [59]:
# data_catalogger.py
class DataCatalogger:
    """Gerencia a catalogação de metadados no Apache Atlas."""
    
    def __init__(self, atlas_client, extractor):
        self.atlas = atlas_client
        self.extractor = extractor
        self.cluster_name = "cluster1" 
        self.database_entity_guid = None
        self.table_guids = {} 

    def _get_qualified_name(self, entity_type, **kwargs):
        """Gera o qualifiedName (QN) único para entidades do Atlas."""
        db_name = self.extractor.database_name
        
        if entity_type == "hive_db":
            return f"{db_name}@{self.cluster_name}"
        elif entity_type == "hive_table":
            table_name = kwargs['table_name']
            return f"{db_name}.{table_name}@{self.cluster_name}"
        elif entity_type == "hive_column":
            table_name = kwargs['table_name']
            column_name = kwargs['column_name']
            return f"{db_name}.{table_name}.{column_name}@{self.cluster_name}"
        return None

    def _create_database_entity(self):
        """Cria ou obtém o GUID da entidade Database (northwind_postgres)."""
        db_qn = self._get_qualified_name("hive_db")
        
        # 1. Tentar buscar se já existe (simplificado)
        search_result = self.atlas.search_entities(db_qn)
        if search_result and search_result.get('entities'):
            self.database_entity_guid = search_result['entities'][0]['guid']
            print(f"Database '{db_qn}' já catalogada. GUID: {self.database_entity_guid}")
            return

        # 2. Criar se não existir
        db_entity = {
            "typeName": "hive_db",
            "attributes": {
                "name": self.extractor.database_name,
                "qualifiedName": db_qn,
                "clusterName": self.cluster_name,
                "owner": "postgres"
            }
        }
        
        print(f"Catalogando Database: {self.extractor.database_name}")
        response = self.atlas.create_entity(db_entity)
        if response and response.get('entities'):
            self.database_entity_guid = response['entities']['hive_db'][0]['guid']
            print(f"Database catalogada com sucesso. GUID: {self.database_entity_guid}")
            
    def _create_table_and_column_entities(self, table_name, table_data):
        """Cria as entidades Tabela e suas Colunas filhas no Atlas."""
        if not self.database_entity_guid:
            print("Erro: Database GUID não encontrado.")
            return

        column_entities = []
        for col_data in table_data['columns']:
            col_qn = self._get_qualified_name("hive_column", table_name=table_name, column_name=col_data['column_name'])
            column_entities.append({
                "typeName": "hive_column",
                "attributes": {
                    "name": col_data['column_name'],
                    "qualifiedName": col_qn,
                    "type": col_data['data_type'],
                    "position": col_data['position'],
                    "isPrimaryKey": col_data['is_pk'],
                    "isNullable": col_data['is_nullable']
                },
                "status": "ACTIVE",
                "relationshipAttributes": {
                    "table": {"typeName": "hive_table", "uniqueAttributes": {"qualifiedName": self._get_qualified_name("hive_table", table_name=table_name)}}
                }
            })
            
        table_qn = self._get_qualified_name("hive_table", table_name=table_name)
        table_entity = {
            "typeName": "hive_table",
            "attributes": {
                "name": table_name,
                "qualifiedName": table_qn,
                "owner": "postgres",
                "description": f"Tabela '{table_name}' do banco Northwind.",
                "db": {"guid": self.database_entity_guid}, 
                "columns": [{"typeName": "hive_column", "uniqueAttributes": {"qualifiedName": c['attributes']['qualifiedName']}} for c in column_entities]
            }
        }
        
        entities_to_create = [table_entity] + column_entities
        print(f"Catalogando Tabela: {table_name} e {len(column_entities)} colunas...")
        response = self.atlas.create_entity(entities_to_create)
        
        if response and response.get('entities'):
            guid = response['entities']['hive_table'][0]['guid']
            self.table_guids[table_name] = guid
            print(f"Tabela '{table_name}' catalogada. GUID: {guid}")
            return True
        else:
            print(f"Falha ao catalogar tabela {table_name}.")
            return False

    def _create_lineage_entities(self, relationships):
        """Cria entidades de Processo para representar a linhagem via FKs."""
        
       
        processes_created = 0
        
        for rel in relationships:
            from_table = rel['from_table']
            to_table = rel['to_table']
            
            if from_table not in self.table_guids or to_table not in self.table_guids:
                print(f"Pulando linhagem: Tabela não catalogada ({from_table} -> {to_table})")
                continue

            process_name = f"fk_link_{from_table}_to_{to_table}"
            process_qn = f"{process_name}@{self.cluster_name}"
            
            process_entity = {
                "typeName": "Process", # Tipo base que suporta inputs/outputs
                "attributes": {
                    "name": process_name,
                    "qualifiedName": process_qn,
                    "description": f"Relacionamento FK de {from_table} para {to_table}",
                    "inputs": [
                        {"guid": self.table_guids[from_table]} # Tabela de onde vem o dado (from)
                    ],
                    "outputs": [
                        {"guid": self.table_guids[to_table]} # Tabela para onde vai o dado (to)
                    ],
                    "operationType": "FK_RELATIONSHIP", 
                    "owner": "system"
                }
            }
            
            response = self.atlas.create_entity(process_entity)
            if response and response.get('entities'):
                processes_created += 1
            else:
                print(f"Falha ao criar linhagem para {from_table} -> {to_table}")
                
        print(f"Criados {processes_created} processos de linhagem (FKs).")
        return processes_created

    def catalog_all_tables(self):
        """Executa o pipeline completo de extração e catalogação."""
        
        self._create_database_entity()
        if not self.database_entity_guid:
            return {"status": "Erro", "message": "Falha ao catalogar Database."}
        
        table_metadata = self.extractor.extract_table_and_column_metadata()
        
        tables_created = 0
        for table_name, data in table_metadata.items():
            if self._create_table_and_column_entities(table_name, data):
                tables_created += 1

        relationships = self.extractor.extract_relationships()
        processes_created = self._create_lineage_entities(relationships)

        return {
            "status": "Sucesso", 
            "tables_created": tables_created, 
            "lineage_processes": processes_created
        }

In [61]:
# discovery_report.py
import sys
import os
import json
import csv
import pandas as pd
from collections import defaultdict

class DiscoveryReport:
    """Gera um relatório das entidades catalogadas no Atlas."""
    
    def __init__(self, atlas_client):
        self.atlas = atlas_client
        self.report_data = {}

    def _get_all_entities_data(self):
        """Busca todas as entidades catalogadas do tipo hive_table e hive_db."""
        query = "hive_db OR hive_table OR Process" 
        search_result = self.atlas.search_entities(query)
        
        if not search_result or not search_result.get('entities'):
            return []
            
        guids = [e['guid'] for e in search_result['entities']]
        
        # Para um relatório real, o ideal seria usar a API /entity/bulk/list?guid=...
        # Simplificando, vamos processar a lista da busca e fazer contagens.
        return search_result['entities']

    def generate_report(self, filename_base):
        """Gera o relatório, calcula estatísticas e exporta para JSON/CSV."""
        entities = self._get_all_entities_data()
        
        # Estatísticas
        stats = defaultdict(int)
        table_column_counts = {}
        relationships_count = 0
        
        for entity in entities:
            type_name = entity.get('typeName')
            stats[type_name] += 1
            
            if type_name == 'hive_table':
              # stats['total_columns'] += entity['attributes'].get('numCols', 0)
                pass
            
            elif type_name == 'Process':
                relationships_count += 1 # Cada Processo criado representa uma FK/Linhagem
        
        stats['total_columns'] = stats.get('hive_column', 0)
        
        self.report_data = {
            "summary": {
                "total_databases": stats['hive_db'],
                "total_tables": stats['hive_table'],
                "total_columns": stats['total_columns'],
                "total_relationships": relationships_count,
            },
            "tables_by_type": dict(stats)
               }
        
        
        self._export_json(f"{filename_base}.json")
        self._export_csv(f"{filename_base}.csv")

    def _export_json(self, filename):
        """Exporta o relatório para JSON."""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.report_data, f, indent=4)
        print(f"Relatório JSON exportado para {filename}")

    def _export_csv(self, filename):
        """Exporta o resumo principal para CSV."""
        summary = self.report_data.get('summary', {})
        
        
        df_summary = pd.DataFrame(summary.items(), columns=['Métrica', 'Valor'])
        
        df_summary.to_csv(filename, index=False)
        print(f"Relatório CSV exportado para {filename}")

In [62]:
import sys
sys.path.append("/home/jovyan/src")
from config import ATLAS_CONFIG



def main():
    print("--- Inicializando o Catálogo de Dados Apache Atlas ---")
    
    
    print("Aguardando 10 segundos para garantir que o Atlas esteja online...")
    time.sleep(10)
    
    try:
        atlas = AtlasClient(**ATLAS_CONFIG)
        extractor = PostgreSQLExtractor(**POSTGRES_CONFIG)
        catalogger = DataCatalogger(atlas, extractor)
        
        print("\n--- Iniciando catalogação de Database, Tabelas e Colunas ---")
        results = catalogger.catalog_all_tables()
        print(f"\nStatus: {results['status']}")
        print(f"{results['tables_created']} tabelas catalogadas")
        print(f"{results['lineage_processes']} processos de linhagem criados")
        
        print("\n--- Gerando Relatório de Descoberta ---")
        report = DiscoveryReport(atlas)
        time.sleep(5)
        report.generate_report("discovery_report")
        print("Relatório gerado com sucesso!")
        
    except Exception as e:
        print(f"\nOcorreu um erro fatal no processo: {e}")

if __name__ == "__main__":
    main()

ModuleNotFoundError: No module named 'config'