In [None]:
%load_ext autotime
%load_ext autoreload
%autoreload 2

In [None]:
# We need to create a root logger so that other logs are piped to the notebook.
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [None]:
from pprint import pprint

import pandas as pd
import requests
from apache_beam.options.pipeline_options import PipelineOptions
from IPython.display import SVG, Image, display
from vulkan.beam.local.convert import build_beam_policy
from vulkan.core.policy import Policy

from vulkan_public.spec.dependency import INPUT_NODE

In [None]:
import os
import platform
import signal
import socket
import subprocess


def is_server_running(host: str, port: int, timeout: float = 1.0) -> bool:
    """Check if a server is running on the given host and port.

    Args:
        host (str): The hostname or IP address of the server.
        port (int): The port number to check.
        timeout (float): The timeout for the connection in seconds (default is 1.0).

    Returns:
        bool: True if a server is running, False otherwise.
    """
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.settimeout(timeout)  # Set timeout for the connection attempt
        try:
            sock.connect((host, port))
            return True
        except (socket.timeout, ConnectionRefusedError, OSError):
            return False


def render_pipeline(
    pipeline_builder,
    output_image_path: str,
    render_port: int = -1,
):
    if render_port > 0:
        busy = is_server_running("localhost", render_port, timeout=1)
        if busy:
            msg = (
                f"There is already a server running on port {render_port}. \n"
                "Skipping starting a server for this render to avoid conflict."
            )
            print(msg)
            render_port = -1

    args = [
        "--runner=apache_beam.runners.render.RenderRunner",
        f"--render_output={output_image_path}",
        f"--render_port={render_port}",
    ]
    options = PipelineOptions(args)

    p = pipeline_builder.build_single_run_pipeline(
        input_data={}, pipeline_options=options
    )
    p.run()
    return SVG(output_image_path)

In [None]:
import json

from vulkan.beam.pipeline import LOCAL_RESULTS_FILE_NAME


def run_batch(pipeline_builder, input_data_path: str, run_id: str = "test-id"):
    args = [
        "--runner=DirectRunner",
    ]
    options = PipelineOptions(args)

    p = pipeline_builder.build_batch_pipeline(
        backfill_id=run_id,
        input_data_path=input_data_path,
        pipeline_options=options,
    )
    run = p.run()

    output_data = pd.read_parquet(pipeline_builder.output_path)

    return output_data


def run_local(pipeline_builder, data: dict):
    args = [
        "--runner=DirectRunner",
    ]
    options = PipelineOptions(args)

    p = pipeline_builder.build_single_run_pipeline(
        input_data=data, pipeline_options=options
    )
    run = p.run()

    output_path = os.path.join(pipeline_builder.output_path, LOCAL_RESULTS_FILE_NAME)
    with open(output_path, "r") as fp:
        results = json.load(fp)

    return results[1]

In [None]:
df = pd.read_csv("simple_bkt_lg.csv").iloc[:5]
df["month"] = df["month"].astype(str)
df["tax_id"] = df["tax_id"].astype(str)

df.to_parquet("input.parquet")

lookup_df = df[["tax_id"]]
lookup_df["squared"] = lookup_df["tax_id"].astype(int) ** 2
lookup_df.to_parquet("file_data_source.parquet")

## Define the Policy

In [None]:
from enum import Enum

from vulkan_public.spec.dependency import INPUT_NODE, Dependency
from vulkan_public.spec.nodes import BranchNode, DataInputNode, TerminateNode
from vulkan_public.spec.policy import PolicyDefinition


class Status(Enum):
    APPROVED = "APPROVED"
    DENIED = "DENIED"


sample_api = DataInputNode(
    name="sample_api",
    description="DataInputNode data",
    source="vendor-name:api-name:v0.0.1",
    dependencies={"inputs": Dependency(INPUT_NODE)},
)

sample_file_input = DataInputNode(
    name="sample_file_input",
    description="DataInputNode with File Input",
    source="file-input:api-name:v0.0.2",
    dependencies={"inputs": Dependency(INPUT_NODE)},
)


# Branching node
def branch_condition_1(context, scores, file_inputs, **kwargs):
    context.log.info(f"BranchNode data: {scores}")
    context.log.info(f"File Input data: {file_inputs}")
    if scores["score"] > context.env.get("SCORE_CUTOFF", 500):
        return "approved"
    return "denied"


branch_1 = BranchNode(
    func=branch_condition_1,
    name="branch_1",
    description="BranchNode data",
    dependencies={
        "scores": Dependency(sample_api.name),
        "file_inputs": Dependency(sample_file_input.name),
    },
    outputs=["approved", "denied"],
)


approved = TerminateNode(
    name="approved",
    description="TerminateNode data branch",
    return_status=Status.APPROVED,
    dependencies={"condition": Dependency("branch_1", "approved")},
)


denied = TerminateNode(
    name="denied",
    description="TerminateNode data branch",
    return_status=Status.DENIED,
    dependencies={"condition": Dependency("branch_1", "denied")},
)


demo_policy = PolicyDefinition(
    nodes=[
        sample_api,
        sample_file_input,
        branch_1,
        approved,
        denied,
    ],
    components=[],
    config_variables=["SCORE_CUTOFF"],
    input_schema={"tax_id": str, "score": int},
)

In [None]:
policy = Policy.from_definition(demo_policy)

## Run the Policy Locally 

In [None]:
from vulkan_public.schemas import DataSourceSpec

test_url_schema = {
    "name": "vendor-name:api-name:v0.0.1",
    "keys": ["tax_id"],
    "source": {
        "url": "http://localhost:5000/serasa",
    },
    "caching": {
        "enabled": False,
    },
}

# TODO: file inputs should be equivalent locally and remote so we can run on DataFlow
test_file_schema = {
    "name": "file-input:api-name:v0.0.2",
    "keys": ["tax_id"],
    "source": {
        # "id": "..."
        "path": "file_data_source.parquet",
    },
    "caching": {
        "enabled": False,
    },
}

data_sources = [
    DataSourceSpec.model_validate(test_url_schema),
    DataSourceSpec.model_validate(test_file_schema),
]

# TODO: The current implementation assumes a single "backfill" job, ie.
#       a single set of parameters.
#       We still need to support the "backtest" interface for testing multiple
#       configurations for the same pipeline.
config_variables = {"SCORE_CUTOFF": 500}

builder = build_beam_policy(
    policy,
    data_sources=data_sources,
    output_path="./output/",
    config_variables=config_variables,
)

In [None]:
output_image_path = "dag.svg"

render_pipeline(builder, output_image_path, render_port=21111)

In [None]:
result = run_local(builder, {"tax_id": "0", "score": 100})

result

In [None]:
batch_results = run_batch(builder, input_data_path="input.parquet")
batch_results.head()

1. Suite de desenvolvimento atrativa o suficiente
    - Fácil de escrever
    - Fácil de ler
    - Fácil de testar e iterar
    - Fácil de levar pra produção
        - Em uma solução self-service, conseguir colocar em produção em 1 linha
        - Em uma empresa menor, precisa conseguir passar pra equipe que roda
        - Em uma empresa que já tem uma solução, precisa ser fácil integrar com o que já existe
            - Fácil explicar
            - Talvez gerar um fluxograma pra implementação?
            - Integrar via API com soluções existentes
2. Produto usado em Desenvolvimento
    1. Qual o trigger pra testar?
3. Vantagens de usar a aplicação?
    1. Na etapa de análise
        - Escalabilidade -> Plataforma de desenvolvimento: Jupyter-like? Databricks?
        - Rodar múltiplos testes / backtests simultaneamente
    2. Na etapa de servir
        - Automação
        - Experimentação fácil
4. Produto usado em Execução e Monitoramento 

## Running remotely in Vulkan Engine

In [None]:
from vulkan_public.cli import client
from vulkan_public.cli.context import Context

In [None]:
ctx = Context()

In [None]:
api_data_source_id = client.data.create_data_source(
    ctx,
    config=test_url_schema,
)

In [None]:
file_data_source_id = client.data.create_data_source(
    ctx,
    config=test_file_schema,
)

In [None]:
policy_path = os.path.abspath("..")

policy_id = client.policy.create_policy(ctx, "Test Beam Policy")
policy_version = client.policy.create_policy_version(
    ctx, policy_id=policy_id, version_name="v0", repository_path=policy_path
)