Skip to content

Commit

Permalink
metadata: create specs secret mask (airbytehq#25201)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored and marcosmarxm committed Jun 8, 2023
1 parent c44d18b commit bbd48e5
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from google.cloud import storage
from typing import List
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
from dataclasses import dataclass
from typing import List

from google.cloud import storage

SPEC_CACHE_BUCKET_NAME = "io-airbyte-cloud-spec-cache"
CACHE_FOLDER = "specs"
Expand Down Expand Up @@ -56,3 +61,9 @@ def list_cached_specs() -> List[CachedSpec]:
blobs = bucket.list_blobs(prefix=CACHE_FOLDER)

return [get_docker_info_from_spec_cache_path(blob.name) for blob in blobs]


def get_cached_spec(spec_cache_path: str) -> dict:
client = storage.Client.create_anonymous_client()
bucket = client.bucket(SPEC_CACHE_BUCKET_NAME)
return json.loads(bucket.blob(spec_cache_path).download_as_string())
Original file line number Diff line number Diff line change
@@ -1,50 +1,45 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dagster import Definitions

from orchestrator.resources.gcp import gcp_gcs_client, gcs_bucket_manager, gcs_file_manager, gcs_file_blob, gcs_directory_blobs
from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory
from orchestrator.resources.local import simple_local_file_manager

from orchestrator.assets.github import github_connector_folders
from orchestrator.assets.spec_cache import cached_specs
from orchestrator.assets.catalog_report import (
all_sources_dataframe,
all_destinations_dataframe,
connector_catalog_location_markdown,
connector_catalog_location_html,
)
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER
from orchestrator.assets.catalog import (
oss_destinations_dataframe,
cloud_catalog_from_metadata,
cloud_destinations_dataframe,
oss_sources_dataframe,
cloud_sources_dataframe,
latest_oss_catalog_dict,
latest_cloud_catalog_dict,
latest_oss_catalog_dict,
oss_catalog_from_metadata,
cloud_catalog_from_metadata,
oss_destinations_dataframe,
oss_sources_dataframe,
)
from orchestrator.assets.metadata import (
catalog_derived_metadata_definitions,
valid_metadata_report_dataframe,
metadata_definitions,
from orchestrator.assets.catalog_report import (
all_destinations_dataframe,
all_sources_dataframe,
connector_catalog_location_html,
connector_catalog_location_markdown,
)

from orchestrator.assets.dev import (
persist_metadata_definitions,
overrode_metadata_definitions,
oss_catalog_diff,
cloud_catalog_diff,
cloud_catalog_diff_dataframe,
oss_catalog_diff_dataframe,
metadata_directory_report,
oss_catalog_diff,
oss_catalog_diff_dataframe,
overrode_metadata_definitions,
persist_metadata_definitions,
)

from orchestrator.jobs.catalog import generate_catalog_markdown, generate_local_metadata_files, generate_catalog
from orchestrator.assets.github import github_connector_folders
from orchestrator.assets.metadata import catalog_derived_metadata_definitions, metadata_definitions, valid_metadata_report_dataframe
from orchestrator.assets.spec_cache import cached_specs
from orchestrator.assets.specs_secrets_mask import all_specs_secrets, specs_secrets_mask_yaml
from orchestrator.config import CATALOG_FOLDER, CONNECTOR_REPO_NAME, CONNECTORS_PATH, REPORT_FOLDER
from orchestrator.jobs.catalog import generate_catalog, generate_catalog_markdown, generate_local_metadata_files
from orchestrator.resources.gcp import gcp_gcs_client, gcs_bucket_manager, gcs_directory_blobs, gcs_file_blob, gcs_file_manager
from orchestrator.resources.github import github_client, github_connector_repo, github_connectors_directory
from orchestrator.resources.local import simple_local_file_manager
from orchestrator.sensors.catalog import catalog_updated_sensor
from orchestrator.sensors.metadata import metadata_updated_sensor

from orchestrator.config import REPORT_FOLDER, CATALOG_FOLDER, CONNECTORS_PATH, CONNECTOR_REPO_NAME
from metadata_service.constants import METADATA_FILE_NAME, METADATA_FOLDER

ASSETS = [
oss_destinations_dataframe,
cloud_destinations_dataframe,
Expand All @@ -68,6 +63,8 @@
cloud_catalog_from_metadata,
cloud_catalog_diff_dataframe,
oss_catalog_diff_dataframe,
all_specs_secrets,
specs_secrets_mask_yaml,
metadata_directory_report,
metadata_definitions,
]
Expand All @@ -83,6 +80,7 @@
}
),
"gcs_bucket_manager": gcs_bucket_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}}),
"catalog_directory_manager": gcs_file_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": CATALOG_FOLDER}),
"catalog_report_directory_manager": gcs_file_manager.configured({"gcs_bucket": {"env": "METADATA_BUCKET"}, "prefix": REPORT_FOLDER}),
"metadata_file_blobs": gcs_directory_blobs.configured({"prefix": METADATA_FOLDER, "suffix": METADATA_FILE_NAME}),
"latest_oss_catalog_gcs_file": gcs_file_blob.configured({"prefix": CATALOG_FOLDER, "gcs_filename": "oss_catalog.json"}),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import pandas as pd
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import copy
import json
from typing import List

from dagster import asset, OpExecutionContext

import pandas as pd
from dagster import OpExecutionContext, asset
from metadata_service.spec_cache import get_cached_spec
from orchestrator.models.metadata import PartialMetadataDefinition
from orchestrator.utils.dagster_helpers import OutputDataFrame, output_dataframe
from orchestrator.utils.object_helpers import deep_copy_params
from orchestrator.models.metadata import PartialMetadataDefinition


GROUP_NAME = "catalog"

# ERRORS


class MissingCachedSpecError(Exception):
pass


# HELPERS


Expand Down Expand Up @@ -116,27 +126,55 @@ def construct_catalog_from_metadata(catalog_derived_metadata_definitions: List[P
return catalog


def construct_registry_with_spec_from_registry(registry: dict, cached_specs: OutputDataFrame) -> dict:
entries = [("source", entry) for entry in registry["sources"]] + [("destinations", entry) for entry in registry["destinations"]]

cached_connector_version = {
(cached_spec["docker_repository"], cached_spec["docker_image_tag"]): cached_spec["spec_cache_path"]
for cached_spec in cached_specs.to_dict(orient="records")
}
registry_with_specs = {"sources": [], "destinations": []}
for connector_type, entry in entries:
try:
spec_path = cached_connector_version[(entry["dockerRepository"], entry["dockerImageTag"])]
entry_with_spec = copy.deepcopy(entry)
entry_with_spec["spec"] = get_cached_spec(spec_path)
if connector_type == "source":
registry_with_specs["sources"].append(entry_with_spec)
else:
registry_with_specs["destinations"].append(entry_with_spec)
except KeyError:
raise MissingCachedSpecError(f"No cached spec found for {entry['dockerRegistry']:{entry['dockerImageTag']}}")
return registry_with_specs


# ASSETS


@asset(group_name=GROUP_NAME)
def cloud_catalog_from_metadata(catalog_derived_metadata_definitions: List[PartialMetadataDefinition]) -> dict:
def cloud_catalog_from_metadata(
catalog_derived_metadata_definitions: List[PartialMetadataDefinition], cached_specs: OutputDataFrame
) -> dict:
"""
This asset is used to generate the cloud catalog from the metadata definitions.
TODO (ben): This asset should be updated to use the GCS metadata definitions once available.
"""
return construct_catalog_from_metadata(catalog_derived_metadata_definitions, "cloud")
from_metadata = construct_catalog_from_metadata(catalog_derived_metadata_definitions, "cloud")
from_metadata_and_spec = construct_registry_with_spec_from_registry(from_metadata, cached_specs)
return from_metadata_and_spec


@asset(group_name=GROUP_NAME)
def oss_catalog_from_metadata(catalog_derived_metadata_definitions: List[PartialMetadataDefinition]) -> dict:
def oss_catalog_from_metadata(catalog_derived_metadata_definitions: List[PartialMetadataDefinition], cached_specs: OutputDataFrame) -> dict:
"""
This asset is used to generate the oss catalog from the metadata definitions.
TODO (ben): This asset should be updated to use the GCS metadata definitions once available.
"""
return construct_catalog_from_metadata(catalog_derived_metadata_definitions, "oss")
from_metadata = construct_catalog_from_metadata(catalog_derived_metadata_definitions, "oss")
from_metadata_and_spec = construct_registry_with_spec_from_registry(from_metadata, cached_specs)
return from_metadata_and_spec


@asset(group_name=GROUP_NAME)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import List, Set

import dpath.util
import yaml
from dagster import MetadataValue, Output, asset

GROUP_NAME = "specs_secrets_mask"

# HELPERS


def get_secrets_properties_from_registry_entry(registry_entry: dict) -> List[str]:
"""Traverse a registry entry to spot properties in a spec that have the "airbyte_secret" field set to true.
This function assumes all the properties have a "type" field that we can use to find all the nested properties in a spec.
Args:
registry_entry (dict): An entry in the registry with a spec field.
Returns:
List[str]: List of property names marked as airbyte_secret.
"""
secret_properties = []
spec_properties = registry_entry["spec"]["connectionSpecification"].get("properties")
if spec_properties is None:
return []
for type_path, _ in dpath.util.search(spec_properties, "**/type", yielded=True):
absolute_path = f"/{type_path}"
if "/" in type_path:
property_path, _ = absolute_path.rsplit(sep="/", maxsplit=1)
else:
property_path = absolute_path
property_definition = dpath.util.get(spec_properties, property_path)
marked_as_secret = property_definition.get("airbyte_secret", False)
if marked_as_secret:
secret_properties.append(property_path.split("/")[-1])
return secret_properties


# ASSETS


@asset(group_name=GROUP_NAME)
def all_specs_secrets(oss_catalog_from_metadata: dict, cloud_catalog_from_metadata: dict) -> Set[str]:
all_secret_properties = []
all_entries = (
oss_catalog_from_metadata["sources"]
+ cloud_catalog_from_metadata["sources"]
+ oss_catalog_from_metadata["destinations"]
+ cloud_catalog_from_metadata["destinations"]
)
for registry_entry in all_entries:
all_secret_properties += get_secrets_properties_from_registry_entry(registry_entry)
return set(all_secret_properties)


@asset(required_resource_keys={"catalog_directory_manager"}, group_name=GROUP_NAME)
def specs_secrets_mask_yaml(context, all_specs_secrets: Set[str]) -> Output:
yaml_string = yaml.dump({"properties": list(all_specs_secrets)})
catalog_directory_manager = context.resources.catalog_directory_manager
file_handle = catalog_directory_manager.write_data(yaml_string.encode(), ext="yaml", key="specs_secrets_mask")
metadata = {
"preview": yaml_string,
"gcs_path": MetadataValue.url(file_handle.gcs_path),
}
return Output(metadata=metadata, value=file_handle)
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from dagster import define_asset_job


generate_catalog = define_asset_job(name="generate_catalog", selection=["metadata_directory_report", "metadata_definitions"])
generate_catalog_markdown = define_asset_job(
name="generate_catalog_markdown", selection=["connector_catalog_location_html", "connector_catalog_location_markdown"]
)
generate_local_metadata_files = define_asset_job(name="generate_local_metadata_files", selection=["persist_metadata_definitions"])
generate_specs_secrets_mask_file = define_asset_job(name="generate_specs_secrets_mask_file", selection=["persist_specs_secrets_mask"])
18 changes: 15 additions & 3 deletions airbyte-ci/connectors/metadata_service/orchestrator/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ metadata-service = {path = "../lib", develop = true}
deepdiff = "^6.3.0"
mergedeep = "^1.3.4"
pydash = "^6.0.2"
dpath = "^2.1.5"


[tool.poetry.group.dev.dependencies]
Expand Down
Loading

0 comments on commit bbd48e5

Please sign in to comment.