Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #23 from opendatadiscovery/feature/adf-data-flows
Browse files Browse the repository at this point in the history
Add metadata regarding DataFlows to the adapter
  • Loading branch information
Vixtir committed Oct 20, 2023
2 parents 6fdf56b + 22bd58a commit f0be599
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 6 deletions.
13 changes: 8 additions & 5 deletions odd_collector_azure/adapters/azure_data_factory/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from odd_collector_azure.domain.plugin import DataFactoryPlugin

from .client import DataFactoryClient
from .domain import ADFActivity
from .domain import ADFActivity, ADFDataFlow
from .mapper.activity import map_activity
from .mapper.activity_run import map_activity_run
from .mapper.factory import map_factory
Expand Down Expand Up @@ -50,10 +50,13 @@ def get_data_entity_list(self) -> DataEntityList:
[map_pipeline_run(self.generator, run) for run in pipelines_runs]
)

activities = [
ADFActivity(act, all_activities=pipeline.activities)
for act in pipeline.activities
]
activities = []
for act in pipeline.activities:
activity = ADFActivity(act, all_activities=pipeline.activities)
if activity.type == "ExecuteDataFlow":
activity.dataflow = self.client.get_data_flow(activity.name)
activities.append(activity)

activities_runs = self.client.get_activity_runs(pipeline.name)

for activity in activities:
Expand Down
17 changes: 16 additions & 1 deletion odd_collector_azure/adapters/azure_data_factory/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@

from odd_collector_azure.domain.plugin import DataFactoryPlugin

from .domain import ADFActivityRun, ADFPipeline, ADFPipelineRun, DataFactory
from .domain import (
ADFActivityRun,
ADFDataFlow,
ADFPipeline,
ADFPipelineRun,
DataFactory,
)


def handle_errors(func):
Expand Down Expand Up @@ -99,3 +105,12 @@ def get_activity_runs(
activity_runs[run.activity_name].append(ADFActivityRun(run))

return activity_runs

@handle_errors
def get_data_flow(self, data_flow_name: str) -> ADFDataFlow:
df = self.client.data_flows.get(
resource_group_name=self.resource_group,
factory_name=self.factory,
data_flow_name=data_flow_name,
)
return ADFDataFlow(df)
38 changes: 38 additions & 0 deletions odd_collector_azure/adapters/azure_data_factory/domain/models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import re
from collections import defaultdict
from dataclasses import dataclass, field

from azure.mgmt.datafactory.models import (
Activity,
ActivityRun,
DataFlow,
DataFlowResource,
Factory,
PipelineResource,
PipelineRun,
Expand All @@ -13,6 +16,8 @@
from odd_collector_sdk.utils.metadata import HasMetadata
from odd_models import JobRunStatus

from ..utils import get_properties


class MetadataMixin:
resource: Resource
Expand Down Expand Up @@ -45,6 +50,7 @@ def activities(self) -> list[Activity]:
class ADFActivity(MetadataMixin, HasMetadata):
resource: Activity
all_activities: list[Activity] = field(default_factory=list)
dataflow: DataFlow = None

@property
def inputs(self) -> list[str]:
Expand All @@ -55,13 +61,25 @@ def outputs(self) -> list[str]:
dependency_map = self._build_dependency_map()
return dependency_map.get(self.resource.name, [])

@property
def type(self) -> str:
return self.resource.type

@property
def activities(self):
activities = (
self.resource.activities if hasattr(self.resource, "activities") else None
)
return activities

@property
def odd_metadata(self) -> dict:
act_metadata = omit(self.resource.__dict__, self.excluded_properties)
if self.dataflow:
data_flow_metadata = get_properties(self.dataflow)
return act_metadata | data_flow_metadata
return act_metadata

def _build_dependency_map(self):
dependency_map = defaultdict(list)
for activity in self.all_activities:
Expand Down Expand Up @@ -127,3 +145,23 @@ def status(self):
if self.resource.status == "Succeeded"
else JobRunStatus.FAILED
)


@dataclass
class ADFDataFlow:
resource: DataFlowResource

@property
def name(self) -> str:
return self.resource.name

@property
def data_flow_properties(self) -> DataFlow:
return self.resource.properties

@property
def sql_script(self) -> str:
joined_str = "".join(self.resource.properties.script_lines)
cleaned_str = re.sub(r"\s+", " ", joined_str).strip()

return cleaned_str
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
57 changes: 57 additions & 0 deletions odd_collector_azure/adapters/azure_data_factory/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Usage of Purview to generate pipeline

## Status

Rejected.

## Context

It was analyzed if Microsoft Purview would be useful in terms of gathering metadata in the ODD platform.

## Decision

Currently, it looks like Purview will not add additional value to the ADF adapter.
Purview will not be used in the ODD, at least for now.

### Reasons

* It shows lineage only for single activities not for the whole pipelines.
* It's based on pipeline runs not on the definition itself, hence it shows lineage for each run. Because of that, the
container or directory could change in each run.
* Lineage is shown "from file perspective", activities performed on a single file by different pipelines are shown in
a single lineage.
* Using Purview brings additional cost for the user, and most of the metadata shown by Purview can be retrieved directly from
ADF API, hence using Purview doesn't look profitable.

## Consequences

We will not get additional metadata from Purview.

# Adding datasets to the lineage

## Status

Proposed.

## Context

In the current shape, the Azure Data Factory (ADF) adapter shows only lineage between activities. For example:
![img.png](img.png)
The platform isn't showing input and output files for each activity.
It was developed that way because inputs/outputs could be parametrized and because of that we can have different files for
each run, hence it isn't obvious how to show it in the lineage.

## Decision

The proposition is to show datasets between activities. For each activity, we should show input and output, in that case,
lineage
should look like this:
`input1 -> task1 -> output1 -> task2 -> output2 -> ...`
If files directories are hardcoded they can be shown directly in the lineage.
It's yet to be decided how to show datasets that can change in each run.

## Consequences

Implementing this solution would greatly enhance the capabilities of ODD and its advantages over using ADF Studio directly.
It would add a possibility to show the integration between ADF and different services (S3, Google Cloud Storage, etc) in the
platform.
18 changes: 18 additions & 0 deletions odd_collector_azure/adapters/azure_data_factory/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import json
from datetime import datetime
from inspect import getmembers
from typing import Any

from funcy import omit


class ADFMetadataEncoder(json.JSONEncoder):
Expand All @@ -8,3 +12,17 @@ def default(self, obj):
return obj.isoformat()
else:
return obj.__dict__.copy()


def is_property(parameter: Any) -> bool:
return isinstance(parameter, property)


def get_properties(instance: Any, excluded_properties=None) -> dict[str, Any]:
properties = {
prop_name: value.fget(instance)
for prop_name, value in getmembers(instance.__class__, is_property)
if value.fget(instance) is not None
}

return omit(properties, excluded_properties) if excluded_properties else properties

0 comments on commit f0be599

Please sign in to comment.