# Config

> Esta seção tem configurações gerais do projeto, como importações de pacotes, carregamento de variáveis de ambiente entre outros.

In [1]:
# Importação de módulos.
import requests
import pandas as pd
import warnings
import json
import yaml
import gzip
import os
import glob
import importlib
from datetime import datetime, timedelta
from work.bdt_data_integration.src.utils import Utils, WebhookNotifier, DiscordNotifier
from work.bdt_data_integration.src.writers import DataWriter
from work.bdt_data_integration.src.extractors import NotionDatabaseAPIExtractor
from work.bdt_data_integration.src.loaders import PostgresLoader
from work.bdt_data_integration.src.transformers import NotionTransformer

ImportError: cannot import name 'NotionDatabaseApiExtractor' from 'work.bdt_data_integration.src.extractors' (/work/bdt_data_integration/src/extractors.py)

In [None]:
# Carregamento do arquivo config.yaml
config = Utils.load_config()
schema = config.get("PRODUCTION_SCHEMA")

# Carregamento das variáveis de ambiente
api_key = os.getenv('NOTION_APIKEY')
database_id = os.getenv('UTD_DATABASE_ID')
host = os.environ["NEON_HOST"]
user =os.environ["NEON_ROOT_USER"]
password = os.environ["NEON_ROOT_PASSWORD"]
db_name = os.environ["NEON_DATABASE_NAME"]
discord_token = os.getenv('DISCORD_TOKEN')
discord_channel = os.getenv('DISCORD_CHANNEL')
notifier_url = os.getenv('MAKE_NOTIFICATION_WEBHOOK')

In [None]:
pipeline = 'notion_pipeline'
source = 'notion'
source_prefix = 'ntn'

In [None]:
notifier = WebhookNotifier(url=notifier_url,pipeline=pipeline)
# notifier.pipeline_start()

# ntn_utd_pages

In [None]:
stream_name = 'universal_task_database'
stream_table_name = f'ntn__utd_pages'

## Extract

In [None]:
try:
    writer = DataWriter(
        source=source,
        stream=stream_name, 
        compression = True,
        config=config
        )
except Exception as e:
    notifier.pipeline_error(e)
    raise e

In [None]:
# Instanciar Stream para a extração dos dados
try:
    extractor = NotionDatabaseAPIExtractor(
        identifier = source,
        base_endpoint = 'https://api.notion.com/v1',
        token = api_key,
        auth_method = 'bearer',
        database_id = database_id,
        writer = writer)
except Exception as e:
    notifier.pipeline_error(e)
    raise e

In [None]:
#Executar a extração dos dados
try:
    records, time = extractor.run()
except Exception as e:
    notifier.pipeline_error(e)
    raise e

INFO:work.bdt_data_integration.src.extractors:Attempting scroll fetch from https://api.notion.com/v1/databases/3290a9e9f0bf4d84bc57aae53f635e7e/query
INFO:work.bdt_data_integration.src.extractors:Gettinng data from endpoint https://api.notion.com/v1/databases/3290a9e9f0bf4d84bc57aae53f635e7e/query
INFO:work.bdt_data_integration.src.extractors:Successfully fetched page 1
INFO:work.bdt_data_integration.src.extractors:Gettinng data from endpoint https://api.notion.com/v1/databases/3290a9e9f0bf4d84bc57aae53f635e7e/query
INFO:work.bdt_data_integration.src.extractors:Successfully fetched page 2
INFO:work.bdt_data_integration.src.extractors:Gettinng data from endpoint https://api.notion.com/v1/databases/3290a9e9f0bf4d84bc57aae53f635e7e/query
INFO:work.bdt_data_integration.src.extractors:Successfully fetched page 3
INFO:work.bdt_data_integration.src.extractors:Gettinng data from endpoint https://api.notion.com/v1/databases/3290a9e9f0bf4d84bc57aae53f635e7e/query
INFO:work.bdt_data_integration.s

## Transform

In [None]:
# Instanciar tranformador de dados
try:
    transformer = NotionTransformer()
except Exception as e:
    notifier.pipeline_error(e)
    raise e

### processing

In [None]:
try:
    file_path = Utils.get_latest_file(f'/work/data/raw/{source}/{stream_name}', '.txt.gz')
    if file_path:
        records = Utils.read_records(file_path)
    else:
        print('No files found in the specified directory')
except Exception as e:
    notifier.pipeline_error(e)
    raise e

In [None]:
try:
    # Obter Dataframe com propriedades extraídas
    processed_data = transformer.extract_pages_from_records(records)

    # Converter todas as colunas de lista do dataframe para strings separadas por vírgulas
    transformer.process_list_columns(processed_data)

    # Converter todas as colunas de lista do dataframe para strings separadas por vírgulas
    processed_data['Etapa'] = processed_data['Etapa'].str[4:]

    # Atualizar a coluna Task Interval com o atributo 'start' do objeto
    processed_data['Task Interval']  = processed_data['Task Interval'].apply(lambda x: x['start'] if isinstance(x, dict) and 'start' in x else None)

    # Definindo uma coluna para identificar o horário da atualização
    processed_data['loaded_at'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    processed_data['extracted_at'] = time
    processed_data
except Exception as e:
    notifier.pipeline_error(e)
    raise e

In [None]:
try:
    output = writer.get_output_file_path(source, stream_table_name,target_layer='processing') + '.csv'
    os.makedirs(os.path.dirname(output), exist_ok=True)
    processed_data.to_csv(output, index=False)
except Exception as e:
    notifier.pipeline_error(e)
    raise e

### staging

In [None]:
staged_data = (lambda: _deepnote_execute_sql('SELECT id AS page_id,\n       "Task ID" as task_id,\n--       extracted_at as __deepnote_extracted_at,\n--       loaded_at as __deepnote_loaded_at,\n       Branch as branch,\n       archived,\n       in_trash,\n       last_edited_time,\n       created_time,\n       "Título" as titulo,\n       "Created At" as created_at,\n       "Created By" as created_by,\n       "Updated At" as updated_at,\n       "Updated By" as updated_by,\n       "Responsável" as responsavel,\n       "Solicitante" as solicitante,\n       "Tester" as tester,\n       "Área" as area,\n       "Ambiente" as ambiente,\n       "Prioridade" as prioridade,\n       "SLA" as sla,\n       "Etapa" as etapa,\n       "Dias na Etapa" as dias_na_etapa,\n       "Dias até o SLA" as dias_ate_o_sla,\n       "Dias até o Desenvolvimento" as dias_ate_o_desenvolvimento,\n       "Dias de Desenvolvimento" as dias_de_desenvolvimento,\n       "Customer ID" as customer_id,\n       "Nome do Cliente" as nome_do_cliente,\n       "Justificativa" as justificativa,\n       "Sprint" as sprint,\n       "Bloqueando" as bloqueando,\n       "Bloqueado por" as bloqueado_por,\n       "Integração" as integracao,\n       "Projetos" as projetos,\n       "Stage Updated At" as stage_updated_at,\n       "Entered Approved At" as entered_approved_at,\n       "Entered Awaiting Version At" as entered_awaiting_version_at,\n       "Entered Backlog At" as entered_backlog_at,\n       "Entered Correction At" as entered_correction_at,\n       "Entered Doing At" as entered_doing_at,\n       "Entered Done At" as entered_done_at,\n       "Entered Testing At" as entered_testing_at,\n       "Entered To-Do At" as entered_to_do_at,\n       "Entered To-Test At" as entered_to_test_at,\n       "OKR" as okr\nFROM processed_data;', 'SQL_DEEPNOTE_DATAFRAME_SQL', audit_sql_comment='', sql_cache_mode='cache_disabled') if '_deepnote_execute_sql' in globals() else _dntk.execute_sql('SELECT id AS page_id,\n       "Task ID" as task_id,\n--       extracted_at as __deepnote_extracted_at,\n--       loaded_at as __deepnote_loaded_at,\n       Branch as branch,\n       archived,\n       in_trash,\n       last_edited_time,\n       created_time,\n       "Título" as titulo,\n       "Created At" as created_at,\n       "Created By" as created_by,\n       "Updated At" as updated_at,\n       "Updated By" as updated_by,\n       "Responsável" as responsavel,\n       "Solicitante" as solicitante,\n       "Tester" as tester,\n       "Área" as area,\n       "Ambiente" as ambiente,\n       "Prioridade" as prioridade,\n       "SLA" as sla,\n       "Etapa" as etapa,\n       "Dias na Etapa" as dias_na_etapa,\n       "Dias até o SLA" as dias_ate_o_sla,\n       "Dias até o Desenvolvimento" as dias_ate_o_desenvolvimento,\n       "Dias de Desenvolvimento" as dias_de_desenvolvimento,\n       "Customer ID" as customer_id,\n       "Nome do Cliente" as nome_do_cliente,\n       "Justificativa" as justificativa,\n       "Sprint" as sprint,\n       "Bloqueando" as bloqueando,\n       "Bloqueado por" as bloqueado_por,\n       "Integração" as integracao,\n       "Projetos" as projetos,\n       "Stage Updated At" as stage_updated_at,\n       "Entered Approved At" as entered_approved_at,\n       "Entered Awaiting Version At" as entered_awaiting_version_at,\n       "Entered Backlog At" as entered_backlog_at,\n       "Entered Correction At" as entered_correction_at,\n       "Entered Doing At" as entered_doing_at,\n       "Entered Done At" as entered_done_at,\n       "Entered Testing At" as entered_testing_at,\n       "Entered To-Do At" as entered_to_do_at,\n       "Entered To-Test At" as entered_to_test_at,\n       "OKR" as okr\nFROM processed_data;', 'SQL_DEEPNOTE_DATAFRAME_SQL', audit_sql_comment='', sql_cache_mode='cache_disabled'))()
staged_data

Unnamed: 0,page_id,task_id,branch,archived,in_trash,last_edited_time,created_time,titulo,created_at,created_by,...,entered_approved_at,entered_awaiting_version_at,entered_backlog_at,entered_correction_at,entered_doing_at,entered_done_at,entered_testing_at,entered_to_do_at,entered_to_test_at,okr
0,714ad377-1b3a-4c0f-b6e6-8cd575be3423,HF-1016,,False,False,2024-09-12T13:33:00.000Z,2024-09-12T13:31:00.000Z,#1016 - Correções no estoque de variações da g...,2024-09-12T13:31:00.000Z,Cauê Ausec,...,,,,,,,,,,
1,5cb5ad4f-001a-4b3e-91c6-2cd8b3d34390,PRD-1393,,False,False,2024-09-11T20:04:00.000Z,2024-09-11T19:50:00.000Z,Refatoração: Algumas melhorias na usabilidade ...,2024-09-11T19:50:00.000Z,Evelin Szymanski,...,,,,,2024-09-11T16:50:00.000-03:00,,,,,
2,b9cb5a46-f182-46b4-a733-386d0f9618d7,PRD-1391,PRD-1391,False,False,2024-09-11T15:56:00.000Z,2024-09-11T15:46:00.000Z,Implementar filtro por tag na importação de pr...,2024-09-11T15:46:00.000Z,Cauê Ausec,...,,,,,,,,2024-09-11T12:46:00.000-03:00,,
3,1ab43c66-5a91-40be-a44a-4078192cb42b,PRD-1390,PRD-1390,False,False,2024-09-11T15:53:00.000Z,2024-09-11T15:44:00.000Z,Exibir data da última atualização do produto n...,2024-09-11T15:44:00.000Z,Cauê Ausec,...,,,2024-09-11T12:52:00.000-03:00,,,,,2024-09-11T12:45:00.000-03:00,,
4,df559ffe-26f0-464d-9891-b9b7fea94190,HF-1015,,False,False,2024-09-12T13:19:00.000Z,2024-09-09T18:54:00.000Z,#1015 - Loading infinito APP,2024-09-09T18:54:00.000Z,Cristian Kestring,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1192,d38a422c-b44d-4027-b208-d4e7d6cacf7d,HF-264,,False,False,2024-05-14T13:31:00.000Z,2024-03-01T18:26:00.000Z,#264 Não está aplicando valor de desconto por ...,2024-03-01T18:26:00.000Z,Cauê Ausec,...,,,,,,,,,,
1193,dd81b65a-c1c5-4e9e-83d5-7a812132e81e,HF-352,,False,False,2024-04-11T18:30:00.000Z,2024-03-01T18:26:00.000Z,#352 - Erro na importação do Bling,2024-03-01T18:26:00.000Z,Cauê Ausec,...,,,,,,,,,,
1194,e45cd20b-8bfb-4eec-9637-8231ca90dbba,HF-606,,False,False,2024-04-11T18:30:00.000Z,2024-03-01T18:26:00.000Z,#606 - Problemas na importação de clientes Omie,2024-03-01T18:26:00.000Z,Cauê Ausec,...,,,,,,,,,,
1195,31a3ea18-814b-4e00-91da-02770e84df1d,HF-523,,False,False,2024-04-11T18:30:00.000Z,2024-03-01T18:26:00.000Z,#523 - APP criando pedidos duplicados e subind...,2024-03-01T18:26:00.000Z,Cauê Ausec,...,,,,,,,,,,


In [None]:
try:
    output = writer.get_output_file_path(source, stream_table_name,target_layer='staging') + '.csv'
    os.makedirs(os.path.dirname(output), exist_ok=True)
    staged_data.to_csv(output, index=False)
except Exception as e:
    notifier.pipeline_error(e)
    raise e

## Load

In [None]:
# Correcting the instantiation of the PostgresLoader by ensuring parameter names match expected signature
try:
    loader = PostgresLoader(user=user, password=password, host=host, db_name=db_name)
except Exception as e:
    notifier.pipeline_error(e)
    raise e

In [None]:
try:
    staged_data_path = writer.get_output_file_path(source, stream_table_name, target_layer='staging') + '.csv'
    staged_data = pd.read_csv(staged_data_path)
    loader.load_data(dataframe=staged_data, target_table=stream_table_name, mode='replace', target_schema=schema)
except Exception as e:
    notifier.pipeline_error(e)
    raise e

# ntn_utd_users

In [None]:
stream_name = 'utd_users'
stream_table_name = f'ntn__utd_users'

### processing

In [None]:
try:
    file_path = Utils.get_latest_file(f'/work/data/raw/{source}/{stream_name}', '.txt.gz')
    if file_path:
        records = Utils.read_records(file_path)
    else:
        print('No files found in the specified directory')
except Exception as e:
    notifier.pipeline_error(e)
    raise e

No files found in the specified directory


In [None]:
try:
    processed_data = transformer._extract_users_list(records)
    processed_data_path = writer.get_output_file_path(source, stream_name,target_layer='processing') + '.csv'
    os.makedirs(os.path.dirname(processed_data_path), exist_ok=True)
    processed_data.to_csv(processed_data_path, index=False)
except Exception as e:
    notifier.pipeline_error(e)
    raise e

### staging

In [None]:
try:
    # Carregar os arquivos em processing
    processed_data_path = writer.get_output_file_path(source, stream_name,target_layer='processing') + '.csv'
    processed_data = pd.read_csv(processed_data_path)
    processed_data
except Exception as e:
    notifier.pipeline_error(e)
    raise e

In [None]:
try:
    # Gravar os arquivos em staging.
    staging_data_path = writer.get_output_file_path(source, stream_table_name, target_layer='staging') + '.csv'
    os.makedirs(os.path.dirname(staging_data_path), exist_ok=True)
    processed_data.to_csv(output, index=False)
except Exception as e:
    notifier.pipeline_error(e)
    raise e

## load

In [None]:
try:
    # Carregar os arquivos em processing
    staged_data_path = writer.get_output_file_path(source, stream_table_name, target_layer='staging') + '.csv'
    staged_data = pd.read_csv(staged_data_path)
    staged_data
except Exception as e:
    notifier.pipeline_error(e)
    raise e

In [None]:
try:
    # Carregar o arquivo ntn utd users em um dataframe e então enviar para o PostgreSQL.
    staged_data_path = writer.get_output_file_path(source, stream_table_name, target_layer='staging') + '.csv'
    staged_data = pd.read_csv(staged_data_path)
    loader.load_data(dataframe=staged_data, target_table=stream_table_name, mode='replace', target_schema=schema)
except Exception as e:
    notifier.pipeline_error(e)
    raise e

# Callbacks

In [None]:
notifier.pipeline_end()

Accepted


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=022588a7-e3ac-4acd-8e50-3c39b9590c40' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>