Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #2894 - Profiler Processor & Metrics #2900

Merged
merged 24 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 42 additions & 1 deletion ingestion/src/metadata/config/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Type, TypeVar

from metadata.ingestion.api.common import DynamicTypedConfig, WorkflowContext
from metadata.ingestion.api.processor import Processor
from metadata.ingestion.api.sink import Sink
from metadata.ingestion.api.source import Source
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
Expand Down Expand Up @@ -89,7 +90,7 @@ def get_sink(
_from: str = "ingestion",
) -> Sink:
"""
Helps us fetching and importing the sink class.
Helps us to fetch and importing the sink class.

By default, we will pick it up from `ingestion`.

Expand All @@ -114,3 +115,43 @@ def get_sink(
logger.debug(f"Sink type: {sink_type}, {sink_class} configured")

return sink


def get_processor(
processor_type: str,
context: WorkflowContext,
processor_config: DynamicTypedConfig,
metadata_config: MetadataServerConfig,
_from: str = "ingestion",
**kwargs,
) -> Processor:
"""
Helps us to fetch and import the Processor class.

By default, we will pick it up from `ingestion`

We allow to pass any other specific object we may require.
E.g., for the ORM Profiler we need a Session to reach
the source tables.

:param processor_type: Type specified in the config, e.g., metadata-rest
:param context: Workflow related information
:param processor_config: Specific Processor configurations, such as the profiler and tests
:param metadata_config: Metadata server configurations
:param _from: From where do we load the sink class. Ingestion by default.
"""
processor_class = get_class(
"metadata.{}.processor.{}.{}Processor".format(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make this more flexible to load the class without depending on the packaging convention.
Not important right now but can be annoying for users to write their own sources, processors

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Let's tackle this together with #2848. I'll add a comment there

_from,
fetch_type_class(processor_type, is_file=True),
fetch_type_class(processor_type, is_file=False),
)
)

processor: Processor = processor_class.create(
processor_config.dict().get("config", {}), metadata_config, context, **kwargs
)

logger.debug(f"Sink type: {processor_type}, {processor_class} configured")

return processor
6 changes: 5 additions & 1 deletion ingestion/src/metadata/ingestion/api/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ class Processor(Closeable, Generic[Entity], metaclass=ABCMeta):
@classmethod
@abstractmethod
def create(
cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext
cls,
config_dict: dict,
metadata_config_dict: dict,
ctx: WorkflowContext,
**kwargs
Copy link
Collaborator Author

@pmbrull pmbrull Feb 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allows us to pass more parameters to the processor, such as the session in the ORM Processor. It should be transparent for the existent Processors

) -> "Processor":
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ def __init__(

@classmethod
def create(
cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext
cls,
config_dict: dict,
metadata_config_dict: dict,
ctx: WorkflowContext,
**kwargs
):
config = QueryParserProcessorConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
Expand Down
26 changes: 25 additions & 1 deletion ingestion/src/metadata/orm_profiler/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
We need to define this class as we end up having
multiple profilers per table and columns.
"""
from typing import List
from typing import List, Optional

from pydantic import BaseModel

from metadata.generated.schema.entity.data.table import Table
from metadata.orm_profiler.profiles.core import Profiler
from metadata.orm_profiler.profiles.models import ProfilerDef
from metadata.orm_profiler.validations.models import TestDef


class WorkflowResult(BaseModel):
Expand All @@ -37,3 +39,25 @@ class ProfilerResult(WorkflowResult):
table: Table # Table Entity
table_profiler: Profiler # Profiler with table results
column_profilers: List[ColumnProfiler] # Profiler with col results


class ProfilerProcessorConfig(BaseModel):
"""
Defines how we read the processor information
from the workflow JSON definition
"""

profiler: Optional[ProfilerDef] = None
tests: Optional[TestDef] = None


class ProfileAndTests(BaseModel):
"""
ORM Profiler processor response.

For a given table, return all profilers and
the ran tests, if any.
"""

profile: ProfilerResult
tests: Optional[TestDef] = None