Skip to content

Commit

Permalink
fix(ingest): remove get_platform_instance_id from stateful ingestion (d…
Browse files Browse the repository at this point in the history
…atahub-project#7572)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
  • Loading branch information
2 people authored and shirshanka committed Mar 22, 2023
1 parent c85ce31 commit 9bdc94d
Show file tree
Hide file tree
Showing 19 changed files with 28 additions and 140 deletions.
17 changes: 1 addition & 16 deletions metadata-ingestion/src/datahub/cli/state_cli.py
@@ -1,6 +1,5 @@
import json
import logging
from typing import Optional

import click
from click_default_group import DefaultGroup
Expand Down Expand Up @@ -29,34 +28,20 @@ def state() -> None:
@state.command()
@click.option("--pipeline-name", required=True, type=str)
@click.option("--platform", required=True, type=str)
@click.option("--platform-instance", required=False, type=str)
@upgrade.check_upgrade
@telemetry.with_telemetry()
def inspect(
pipeline_name: str, platform: str, platform_instance: Optional[str]
) -> None:
def inspect(pipeline_name: str, platform: str) -> None:
"""
Get the latest stateful ingestion state for a given pipeline.
Only works for state entity removal for now.
"""

# Note that the platform-instance argument is not generated consistently,
# and is not always equal to the platform_instance config.

datahub_graph = get_default_graph()
checkpoint_provider = DatahubIngestionCheckpointingProvider(datahub_graph, "cli")

job_name = StaleEntityRemovalHandler.compute_job_id(platform)

raw_checkpoint = checkpoint_provider.get_latest_checkpoint(pipeline_name, job_name)
if raw_checkpoint is None and platform_instance is not None:
logger.info(
"Failed to fetch state, but trying legacy URN format because platform_instance is provided."
)
raw_checkpoint = checkpoint_provider.get_latest_checkpoint(
pipeline_name, job_name, platform_instance_id=platform_instance
)

if not raw_checkpoint:
click.secho("No ingestion state found.", fg="red")
exit(1)
Expand Down
@@ -1,6 +1,6 @@
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, NewType, Type, TypeVar
from typing import Any, Dict, NewType, Optional, Type, TypeVar

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
Expand Down Expand Up @@ -43,6 +43,14 @@ def create(
def commit(self) -> None:
pass

@abstractmethod
def get_latest_checkpoint(
self,
pipeline_name: str,
job_name: JobId,
) -> Optional[DatahubIngestionCheckpointClass]:
pass

@staticmethod
def get_data_job_urn(
orchestrator: str,
Expand All @@ -53,14 +61,3 @@ def get_data_job_urn(
Standardizes datajob urn minting for all ingestion job state providers.
"""
return builder.make_data_job_urn(orchestrator, pipeline_name, job_name)

@staticmethod
def get_data_job_legacy_urn(
orchestrator: str,
pipeline_name: str,
job_name: JobId,
platform_instance_id: str,
) -> str:
return IngestionCheckpointingProviderBase.get_data_job_urn(
orchestrator, f"{pipeline_name}_{platform_instance_id}", job_name
)
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Expand Up @@ -1240,6 +1240,3 @@ def get_data_platform_instance() -> DataPlatformInstanceClass:

def get_report(self):
return self.report

def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance or self.platform
Expand Up @@ -434,13 +434,6 @@ def get_dataplatform_instance_aspect(
else:
return None

def get_platform_instance_id(self) -> Optional[str]:
"""
The source identifier such as the specific source host address required for stateful ingestion.
Individual subclasses need to override this method appropriately.
"""
return f"{self.platform}"

def gen_dataset_key(self, db_name: str, schema: str) -> PlatformKey:
return BigQueryDatasetKey(
project_id=db_name,
Expand Down
Expand Up @@ -418,8 +418,3 @@ def _parse_into_dbt_column(
def get_external_url(self, node: DBTNode) -> Optional[str]:
# TODO: Once dbt Cloud supports deep linking to specific files, we can use that.
return f"https://cloud.getdbt.com/next/accounts/{self.config.account_id}/projects/{self.config.project_id}/develop"

def get_platform_instance_id(self) -> Optional[str]:
"""The DBT project identifier is used as platform instance."""

return f"{self.platform}_{self.config.project_id}"
13 changes: 0 additions & 13 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Expand Up @@ -488,16 +488,3 @@ def get_external_url(self, node: DBTNode) -> Optional[str]:
if self.config.git_info and node.dbt_file_path:
return self.config.git_info.get_url_for_file_path(node.dbt_file_path)
return None

def get_platform_instance_id(self) -> Optional[str]:
"""The DBT project identifier is used as platform instance."""

project_id = (
self.load_file_as_json(self.config.manifest_path)
.get("metadata", {})
.get("project_id")
)
if project_id is None:
raise ValueError("DBT project identifier is not found in manifest")

return f"{self.platform}_{project_id}"
Expand Up @@ -318,9 +318,6 @@ def _get_avro_schema_from_data_type(self, column: NestedField) -> Dict[str, Any]
],
}

def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance

def get_report(self) -> SourceReport:
return self.report

Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Expand Up @@ -187,9 +187,6 @@ def init_kafka_admin_client(self) -> None:
f"Failed to create Kafka Admin Client due to error {e}.",
)

def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance

@classmethod
def create(cls, config_dict: Dict, ctx: PipelineContext) -> "KafkaSource":
config: KafkaSourceConfig = KafkaSourceConfig.parse_obj(config_dict)
Expand Down
7 changes: 0 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/ldap.py
Expand Up @@ -288,13 +288,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

cookie = set_cookie(self.lc, pctrls)

def get_platform_instance_id(self) -> Optional[str]:
"""
The source identifier such as the specific source host address required for stateful ingestion.
Individual subclasses need to override this method appropriately.
"""
return self.config.ldap_server

def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUnit]:
"""
Handle a DN and attributes by adding manager info and constructing a
Expand Down
Expand Up @@ -1357,8 +1357,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
def get_report(self) -> SourceReport:
return self.reporter

def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance or self.platform

def close(self):
self.prepare_for_commit()
Expand Up @@ -1778,8 +1778,5 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
def get_report(self):
return self.reporter

def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance or self.platform

def close(self):
self.prepare_for_commit()
Expand Up @@ -917,9 +917,6 @@ def __init__(self, config: PowerBiDashboardSourceConfig, ctx: PipelineContext):
run_id=ctx.run_id,
)

def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_name

@classmethod
def create(cls, config_dict, ctx):
config = PowerBiDashboardSourceConfig.parse_obj(config_dict)
Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Expand Up @@ -224,9 +224,6 @@ def _get_pulsar_metadata(self, url):
f"An ambiguous exception occurred while handling the request: {e}"
)

def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance

@classmethod
def create(cls, config_dict, ctx):
config = PulsarSourceConfig.parse_obj(config_dict)
Expand Down
Expand Up @@ -1403,10 +1403,6 @@ def inspect_session_metadata(self) -> None:
except Exception:
self.report.edition = None

# Stateful Ingestion Overrides.
def get_platform_instance_id(self) -> Optional[str]:
return self.config.get_account()

# Ideally we do not want null values in sample data for a column.
# However that would require separate query per column and
# that would be expensive, hence not done.
Expand Down
10 changes: 0 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Expand Up @@ -392,16 +392,6 @@ def get_db_name(self, inspector: Inspector) -> str:
def get_schema_names(self, inspector):
return inspector.get_schema_names()

def get_platform_instance_id(self) -> Optional[str]:
"""
The source identifier such as the specific source host address required for stateful ingestion.
Individual subclasses need to override this method appropriately.
"""
config_dict = self.config.dict()
host_port = config_dict.get("host_port", "no_host_port")
database = config_dict.get("database", "no_database")
return f"{self.platform}_{host_port}_{database}"

def get_allowed_schemas(self, inspector: Inspector, db_name: str) -> Iterable[str]:
# this function returns the schema names which are filtered by schema_pattern.
for schema in self.get_schema_names(inspector):
Expand Down
Expand Up @@ -166,7 +166,9 @@ class StatefulIngestionSourceBase(Source):
"""

def __init__(
self, config: StatefulIngestionConfigBase, ctx: PipelineContext
self,
config: StatefulIngestionConfigBase[StatefulIngestionConfig],
ctx: PipelineContext,
) -> None:
super().__init__(ctx)
self.stateful_ingestion_config = config.stateful_ingestion
Expand Down Expand Up @@ -278,12 +280,6 @@ def is_checkpointing_enabled(self, job_id: JobId) -> bool:
raise ValueError(f"No use-case handler for job_id{job_id}")
return self._usecase_handlers[job_id].is_checkpointing_enabled()

def get_platform_instance_id(self) -> Optional[str]:
# This method is retained for backwards compatibility, but it is not
# required that new sources implement it. We mainly need it for the
# fallback logic in _get_last_checkpoint.
raise NotImplementedError("no platform_instance_id configured")

def _get_last_checkpoint(
self, job_id: JobId, checkpoint_state_class: Type[StateType]
) -> Optional[Checkpoint]:
Expand All @@ -292,28 +288,15 @@ def _get_last_checkpoint(
"""
last_checkpoint: Optional[Checkpoint] = None
if self.is_stateful_ingestion_configured():
# TRICKY: We currently don't include the platform_instance_id in the
# checkpoint urn, but we previously did. As such, we need to fallback
# and try the old urn format if the new format doesn't return anything.

# Obtain the latest checkpoint from GMS for this job.
assert self.ctx.pipeline_name
last_checkpoint_aspect = self.ingestion_checkpointing_state_provider.get_latest_checkpoint( # type: ignore
pipeline_name=self.ctx.pipeline_name,
job_name=job_id,
assert self.ingestion_checkpointing_state_provider
last_checkpoint_aspect = (
self.ingestion_checkpointing_state_provider.get_latest_checkpoint(
pipeline_name=self.ctx.pipeline_name,
job_name=job_id,
)
)
if last_checkpoint_aspect is None:
# Try again with the platform_instance_id, if implemented.
try:
platform_instance_id = self.get_platform_instance_id()
except NotImplementedError:
pass
else:
last_checkpoint_aspect = self.ingestion_checkpointing_state_provider.get_latest_checkpoint( # type: ignore
pipeline_name=self.ctx.pipeline_name,
job_name=job_id,
platform_instance_id=platform_instance_id,
)

# Convert it to a first-class Checkpoint object.
last_checkpoint = Checkpoint[StateType].create_from_checkpoint_aspect(
Expand Down Expand Up @@ -355,6 +338,8 @@ def _prepare_checkpoint_states_for_commit(self) -> None:
# Perform validations
if not self.is_stateful_ingestion_configured():
return None
assert self.stateful_ingestion_config

if (
self.stateful_ingestion_config
and self.stateful_ingestion_config.ignore_new_state
Expand All @@ -378,7 +363,7 @@ def _prepare_checkpoint_states_for_commit(self) -> None:
job_checkpoint.prepare_for_commit()
try:
checkpoint_aspect = job_checkpoint.to_checkpoint_aspect(
self.stateful_ingestion_config.max_checkpoint_state_size # type: ignore
self.stateful_ingestion_config.max_checkpoint_state_size
)
except Exception as e:
logger.error(
Expand Down
Expand Up @@ -64,21 +64,15 @@ def get_latest_checkpoint(
self,
pipeline_name: str,
job_name: JobId,
platform_instance_id: Optional[str] = None,
) -> Optional[DatahubIngestionCheckpointClass]:
logger.debug(
f"Querying for the latest ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" platformInstanceId:'{platform_instance_id}', job_name:'{job_name}'"
f" job_name:'{job_name}'"
)

if platform_instance_id is None:
data_job_urn = self.get_data_job_urn(
self.orchestrator_name, pipeline_name, job_name
)
else:
data_job_urn = self.get_data_job_legacy_urn(
self.orchestrator_name, pipeline_name, job_name, platform_instance_id
)
data_job_urn = self.get_data_job_urn(
self.orchestrator_name, pipeline_name, job_name
)

latest_checkpoint: Optional[
DatahubIngestionCheckpointClass
Expand All @@ -92,14 +86,14 @@ def get_latest_checkpoint(
if latest_checkpoint:
logger.debug(
f"The last committed ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" platformInstanceId:'{platform_instance_id}', job_name:'{job_name}' found with start_time:"
f" job_name:'{job_name}' found with start_time:"
f" {datetime.utcfromtimestamp(latest_checkpoint.timestampMillis/1000)}"
)
return latest_checkpoint
else:
logger.debug(
f"No committed ingestion checkpoint for pipelineName:'{pipeline_name}',"
f" platformInstanceId:'{platform_instance_id}', job_name:'{job_name}' found"
f" job_name:'{job_name}' found"
)

return None
Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Expand Up @@ -2264,6 +2264,3 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

def get_report(self) -> StaleEntityRemovalSourceReport:
return self.report

def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance or self.platform
Expand Up @@ -160,9 +160,6 @@ def create(cls, config_dict, ctx):
config = UnityCatalogSourceConfig.parse_obj(config_dict)
return cls(ctx=ctx, config=config)

def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance or self.platform

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
Expand Down

0 comments on commit 9bdc94d

Please sign in to comment.