In [None]:
#OBS: 
# Precisa do azure configurado
# Precisa do purview configurado no azure

In [None]:
# Importando as bibliotecas
import requests
import json
import pandas as pd
import datetime
from datetime import timedelta
import numpy as np

from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient, AtlasEntity, AtlasProcess, TypeCategory
from pyapacheatlas.core.typedef import *
pd.options.mode.chained_assignment = None

In [None]:
# Função para autenticar a entidade de serviço para o URL de recurso fornecido e retorna o token oauth2 de acesso
def azuread_auth(tenant_id: str, client_id: str, client_secret: str, resource_url: str):
    
    url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
    payload= f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&resource={resource_url}'
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded'
    }
    
    response = requests.request("POST", url, headers=headers, data=payload)
    access_token = json.loads(response.text)['access_token']
    
    return access_token

In [None]:
# Função para autenticar no Atlas Endpoint e retorna um objeto Client
def purview_auth(tenant_id: str, client_id: str, client_secret: str, data_catalog_name: str):
  
    oauth2 = ServicePrincipalAuthentication(
        tenant_id = tenant_id,
        client_id = client_id,
        client_secret = client_secret
    )
    client = PurviewClient(
        account_name = data_catalog_name,
        authentication = oauth2
    )
    return client

In [None]:
## Para utilizar no Jupyter

tenant_id = ""
client_id = ""
client_secret = ""
resource_url = "https://purview.azure.net"
data_catalog_name = ""

#pegando data atual e formatando
data_today = datetime.datetime.today() - timedelta(hours=3, minutes=0)
data_now_str = data_today.strftime("%A %d %B %y %H:%M")
date_now = datetime.datetime.strptime(data_now_str, "%A %d %B %y %H:%M")

## OBS: REMOVA Todo CONTEÚDO DO EX: SPARK

In [None]:
## Para utilizar com Spark

# Abrir conexão com o datalake
spark.conf.set(
  "", # Endpoint do datalake
  "" # Acess Key
)

In [None]:
#criar json com as chaves de acesso do azure e purview

# Recuperar caminho até o json de parâmetros da Autentificação do Purview
path_params = dbutils.fs.ls('''path do json no azure''')

# Recuperar dados do json de parâmetros da Autentificação do Purview
parameters = spark.read.format("json").load(path_params)

client_id = parameters.select('client_id').collect()[0][0]
client_secret = parameters.select('client_secret').collect()[0][0]
data_catalog_name = parameters.select('data_catalog_name').collect()[0][0]
resource_url = parameters.select('resource_url').collect()[0][0]
tenant_id = parameters.select('tenant_id').collect()[0][0]

#pegando data atual e formatando
data_today = datetime.datetime.today() - timedelta(hours=3, minutes=0)
data_now_str = data_today.strftime("%A %d %B %y %H:%M")
date_now = datetime.datetime.strptime(data_now_str, "%A %d %B %y %H:%M")

# Recuperar objetos de autenticação
azuread_access_token = azuread_auth(tenant_id, client_id, client_secret, resource_url)
purview_client = purview_auth(tenant_id, client_id, client_secret, data_catalog_name)

# OBS: REMOVA TODO O CONTEÚDO DO EX: JUPYTER

In [None]:
# função para fazer requisição de dados de acordo com a coleção no Purview
def requestCollection(endpoint: str, url: str, response: any, payload: any, headers: any):
    endpoint = endpoint
    url = url
    payload = payload
    headers = headers
    response = response
        
    return response

In [None]:
## função para fazer requisição de dados classificados
def requestId(collectionId: str):
    endpoint = "https://"+data_catalog_name+".purview.azure.com/"
    url = f"{endpoint}/catalog/api/search/query?api-version=2021-05-01-preview"
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {azuread_access_token}'
       }
    payload = json.dumps({
            "orderby":["name"],
            "limit": 1000,
            "keywords": None,
            "filter": {
                "and": [
                        {
                    "or": [
                      {
                        "collectionId": collectionId
                      },
                    ]
                  },
                  {
                    "or": [
                      {
                        "classification": "sua classificação",
                        "includeSubClassifications": True
                      },
                      {
                        "classification": "sua classificação",
                        "includeSubClassifications": True
                      },
                      {
                        "classification": "MICROSOFT.SYSTEM.TEMP_FILE",
                        "includeSubClassifications": True
                      }
                    ]
                  }
                ]
              }
    })
    
    response = requests.post(url, headers=headers, data=payload)
    
    if(response.status_code != 200):
      print("Status:",response.status_code, "Erro no código")
    else:
      print("Status:",response.status_code, "Ok")
    
    return response.text

In [None]:
# Função para exportar na pasta temporária databricks e depois mover para pasta no azure
def export_file(dataframe, ext_target, target, new_name):
  df_res = dataframe
  if ext_target == 'csv':
    df_res.to_csv('/dbfs/tmp/meuCaminho/'+new_name+'.'+ext_target, sep= ';', index=False)
  if ext_target == 'parquet':
    df_res.to_parquet('/dbfs/tmp/meuCaminho/'+new_name+'.'+ext_target, index=False)
  dbutils.fs.mv("/tmp/purview/"+new_name+'.'+ext_target, target+'/'+new_name+'.'+ext_target)
  print ('Novo arquivo {} salvo em {}'.format(new_name, target))

In [None]:
# Trazendo todos os dados das coleções do Purview
endpoint = "https://"+data_catalog_name+".purview.azure.com/"
url = f"{endpoint}/account/collections?api-version=2019-11-01-preview"
headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {azuread_access_token}'
       }

payload = azuread_access_token

response = requests.get(url, headers=headers, data=payload)

if(response.status_code != 200):
    print("Status:",response.status_code, "Erro no código", response.text)
else:
    print("Status:",response.status_code, "Ok", response.text)
    
requestCollection(endpoint, url, headers, payload, response)

In [None]:
# dataframe com todos os dados
df = pd.DataFrame(json.loads(response.text)['value'])

# listando somente as coleções que vamos utilizar
list_colection = ((df['friendlyName'] == 'nomeColeção') | (df['friendlyName'] == 'nomeColeção') | (df['friendlyName'] == 'nomeColeção'))
df_colection = df[list_colection]
df_colection

In [None]:
## requisição em cada coleção para trazer os dados
list_collections = df_colection.loc[:, 'name']

dict_results = {}

for list_db in list_collections:    
    db_result = requestId(list_db)
    dict_results['db_id_'+str(list_db)] = db_result
    
list_results = list(dict_results.values())

## dataframe com as tabelas de cada coleção
df_results = pd.DataFrame()

for x in list_results:
    df_value = pd.DataFrame(json.loads(x)['value'])
    df_results = pd.concat([df_results, df_value])

In [None]:
## Todas as tabelas e colunas das coleções
list_response = []
for id_ in df_results['id']:
    endpoint = "https://"+data_catalog_name+".purview.azure.com/"
    url = f"{endpoint}/catalog/api/atlas/v2/entity/bulk?excludeRelationshipTypes=dataset_process_inputs&excludeRelationshipTypes=process_parent&excludeRelationshipTypes=direct_lineage_dataset_dataset&guid={id_}&includeTermsInMinExtInfo=false&minExtInfo=false&ignoreRelationships=false"
    headers = {
              'Content-Type': 'application/json',
              'Authorization': f'Bearer {azuread_access_token}'
             }
    payload = azuread_access_token

    response = requests.get(url, headers=headers, data=payload)
    requestCollection(endpoint, url, response, headers, payload)
    list_response.append(response.text)

In [None]:
# dataframe com todos os dados
df_list_ = pd.DataFrame(list_response)

## percorrendo e listando cada coluna no dataframe para buscar nomes e classificações
df_list_res_referredEntities = pd.DataFrame()
for x in list_response:
    df_list = pd.DataFrame(json.loads(x)['referredEntities']).reindex(['collectionId','attributes','classifications']).transpose().dropna()
    df_list_res_referredEntities = pd.concat([df_list_res_referredEntities, df_list])

df_list_res_referredEntities.reset_index(inplace=True)
df_list_res_referredEntities = df_list_res_referredEntities.rename(columns = {'index':'IDs'})

## listando dataframe com nome das colunas e criando nova coluna para filtrar as classificações
df_columns = df_list_res_referredEntities
df_columns['classificação'] = ''
df_columns['data'] = date_now

for i in range(df_columns.shape[0]):
    list_=[]
    df_columns.loc[i,'attributes'] = df_columns.loc[i,'attributes']['name']
    list_.append([item_['typeName'] for item_ in df_columns.classifications[i]])
    df_columns.loc[:,'classificação'].iloc[i] = list_[0]  

## dataframe com colunas e suas classificações
df_col = df_columns.loc[:, ['IDs','collectionId','attributes', 'classificação', 'data']]

## dataframe com tabelas e suas classificações
df_tables = df_results.loc[:, ['id','collectionId','name', 'classification', 'qualifiedName']]
df_tables.reset_index(drop=True, inplace=True)

In [None]:
# dataframe com nome das tabelas e filtrando as classificações para nova tabela
df_tables['data2'] = date_now

## dataframe com tabelas e suas classificações
df_tab = df_tables.loc[:, ['id', 'collectionId', 'name', 'classification', 'data2', 'qualifiedName']]

# Igualando os IDs das tabelas com os das colunas
df_tab.loc[:,'Id_simplificado'] = ''
for count, item in enumerate(df_tab.id):
    df_tab.loc[:,'Id_simplificado'].iloc[count] = item[:33]
    
df_col.loc[:,'Id_simplificado'] = ''    
for count, item in enumerate(df_col.IDs):
    df_col.loc[:,'Id_simplificado'].iloc[count] = item[:33]

# Tratando a URL dos bancos para fazer o agrupamento  
df_tab.loc[:,'Owner'] = ''    
for count, item in enumerate(df_tab.qualifiedName):
    df_tab.loc[:,'Owner'].iloc[count] = item[:47]  

## dataframe final com todas as tabelas e suas colunas com id, nome e classificação
tb_merge = df_tab.merge(df_col, left_on=['Id_simplificado', 'collectionId'], right_on=['Id_simplificado', 'collectionId'])

## dataframe final com tabelas e colunas classificadas
df = tb_merge[['collectionId', 'name', 'classification', 'attributes', 'classificação', 'data', 'Owner']]\
.rename(columns={'name':'vc_nm_tabela', 
                 'classification':'vc_nm_grupoclassificacao', 
                 'attributes':'vc_nm_coluna', 
                 'classificação':'vc_ds_classificacao', 
                 'data': 'sd_dt_carga',
                 'Owner': 'vc_nm_owner'
                }
       )

#convertendo data para string
df.sd_dt_carga = df.sd_dt_carga.astype(str)

In [None]:
# agrupando os dados de acordo com a coleção
grouped_df = df.groupby('vc_nm_owner')

df_1 = grouped_df.get_group('mssql://path_do_banco_purview').drop(['collectionId', 'vc_nm_owner'], axis=1)

df_2 = grouped_df.get_group('mssql://path_do_banco_purview').drop(['collectionId', 'vc_nm_owner'], axis=1)

df_3 = grouped_df.get_group('mssql://path_do_banco_purview').drop(['collectionId', 'vc_nm_owner'], axis=1)

df_4 = grouped_df.get_group('mssql://path_do_banco_purview').drop(['collectionId', 'vc_nm_owner'], axis=1)

In [None]:
# Criar arquivo parquet ou csv no container landing
df_list_.columns = df_list_.columns.astype(str)
dataframe = df_list_
ext_target="parquet"
path_raw = 'path_landing do azure'
new_name = f"nome_do_arquivo"
export_file(dataframe, ext_target, path_raw, new_name)

In [None]:
## exportando os df csv ou parquet no container raw
# Defina os dados e os nomes dos arquivos em uma lista
datasets = [
    (df_1, 'df_1'),
    (df_2, 'df_2'),
    (df_3, 'df_3'),
    (df_4, 'df_4')
]

In [None]:
# Loop através dos datasets
for dataset, filename in datasets:
    dataset.reset_index(inplace=True, drop=True)
    dataset['vc_nm_grupoclassificacao'] = dataset['vc_nm_grupoclassificacao'].map(str)
    dataset['vc_ds_classificacao'] = dataset['vc_ds_classificacao'].map(str)

    # Criar arquivo parquet ou csv no container raw
    dataframe = dataset
    ext_target = "parquet"
    path_raw = 'path_raw do azure'
    new_name = f"{filename}"
    export_file(dataframe, ext_target, path_raw + filename, new_name)