Skip to content

Commit

Permalink
Fix #3037: metadata --version doesn't work (#3038)
Browse files Browse the repository at this point in the history
  • Loading branch information
amiorin committed Mar 1, 2022
1 parent 777c6c3 commit fe5618c
Show file tree
Hide file tree
Showing 16 changed files with 87 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ def openmetadata_airflow_lineage_example():
"bigquery_gcp.shopify.raw_customer",
],
},
outlets={
"tables": ["bigquery_gcp.shopify.fact_order"]
},
outlets={"tables": ["bigquery_gcp.shopify.fact_order"]},
)
def generate_data():
"""write your query to generate ETL"""
Expand Down
1 change: 0 additions & 1 deletion ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def get_long_description():
"pyarrow~=6.0.1",
"google-cloud-datacatalog==3.6.2",
},

"bigquery-usage": {"google-cloud-logging", "cachetools"},
"docker": {"python_on_whales==0.34.0"},
"backup": {"boto3~=1.19.12"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"""
from typing import Optional

import numpy as np
from sqlalchemy import and_, func
from sqlalchemy.orm import Session

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
from airflow_provider_openmetadata.lineage.openmetadata import (
OpenMetadataLineageBackend,
)
from airflow_provider_openmetadata.lineage.utils import (
get_xlets,
)
from airflow_provider_openmetadata.lineage.utils import get_xlets
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
Expand Down
4 changes: 3 additions & 1 deletion ingestion/tests/integration/ometa/test_ometa_lineage_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def setUpClass(cls) -> None:

cls.create_db_entity = cls.metadata.create_or_update(data=cls.create_db)

cls.db_reference = EntityReference(id=cls.create_db_entity.id, name="test-db", type="database")
cls.db_reference = EntityReference(
id=cls.create_db_entity.id, name="test-db", type="database"
)

cls.table = CreateTableRequest(
name="test",
Expand Down
4 changes: 3 additions & 1 deletion ingestion/tests/integration/ometa/test_ometa_model_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ def test_mlmodel_properties(self):
)
create_db_entity = self.metadata.create_or_update(data=create_db)

db_reference = EntityReference(id=create_db_entity.id, name="test-db-ml", type="database")
db_reference = EntityReference(
id=create_db_entity.id, name="test-db-ml", type="database"
)

create_table1 = CreateTableRequest(
name="test-ml",
Expand Down
46 changes: 25 additions & 21 deletions ingestion/tests/integration/ometa/test_ometa_pipeline_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@
OpenMetadata high-level API Pipeline test
"""
import uuid
from unittest import TestCase
from datetime import datetime
from unittest import TestCase

from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.services.createPipelineService import (
CreatePipelineServiceRequest,
)
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus, StatusType, Task, TaskStatus
from metadata.generated.schema.entity.data.pipeline import (
Pipeline,
PipelineStatus,
StatusType,
Task,
TaskStatus,
)
from metadata.generated.schema.entity.services.pipelineService import (
PipelineService,
PipelineServiceType,
Expand Down Expand Up @@ -209,7 +215,7 @@ def test_add_status(self):
tasks=[
Task(name="task1"),
Task(name="task2"),
]
],
)

pipeline = self.metadata.create_or_update(data=create_pipeline)
Expand All @@ -222,8 +228,8 @@ def test_add_status(self):
executionStatus=StatusType.Successful,
taskStatus=[
TaskStatus(name="task1", executionStatus=StatusType.Successful),
]
)
],
),
)

# We get a list of status
Expand All @@ -239,8 +245,8 @@ def test_add_status(self):
taskStatus=[
TaskStatus(name="task1", executionStatus=StatusType.Successful),
TaskStatus(name="task2", executionStatus=StatusType.Successful),
]
)
],
),
)

assert updated.pipelineStatus[0].executionDate.__root__ == execution_ts
Expand All @@ -260,27 +266,30 @@ def test_add_tasks(self):
tasks=[
Task(name="task1"),
Task(name="task2"),
]
],
)

pipeline = self.metadata.create_or_update(data=create_pipeline)

# Add new tasks
updated_pipeline = self.metadata.add_task_to_pipeline(
pipeline, Task(name="task3"),
pipeline,
Task(name="task3"),
)

assert len(updated_pipeline.tasks) == 3

# Update a task already added
updated_pipeline = self.metadata.add_task_to_pipeline(
pipeline, Task(name="task3", displayName="TaskDisplay"),
pipeline,
Task(name="task3", displayName="TaskDisplay"),
)

assert len(updated_pipeline.tasks) == 3
assert next(
iter(
task for task in updated_pipeline.tasks
task
for task in updated_pipeline.tasks
if task.displayName == "TaskDisplay"
)
)
Expand All @@ -290,9 +299,7 @@ def test_add_tasks(self):
Task(name="task3"),
Task(name="task4"),
]
updated_pipeline = self.metadata.add_task_to_pipeline(
pipeline, *new_tasks
)
updated_pipeline = self.metadata.add_task_to_pipeline(pipeline, *new_tasks)

assert len(updated_pipeline.tasks) == 4

Expand All @@ -307,7 +314,8 @@ def test_add_tasks_to_empty_pipeline(self):
pipeline = self.metadata.create_or_update(data=self.create)

updated_pipeline = self.metadata.add_task_to_pipeline(
pipeline, Task(name="task", displayName="TaskDisplay"),
pipeline,
Task(name="task", displayName="TaskDisplay"),
)

assert len(updated_pipeline.tasks) == 1
Expand All @@ -326,17 +334,13 @@ def test_clean_tasks(self):
Task(name="task2"),
Task(name="task3"),
Task(name="task4"),
]
],
)

pipeline = self.metadata.create_or_update(data=create_pipeline)

updated_pipeline = self.metadata.clean_pipeline_tasks(
pipeline=pipeline,
tasks=[
Task(name="task3"),
Task(name="task4")
]
pipeline=pipeline, tasks=[Task(name="task3"), Task(name="task4")]
)

assert len(updated_pipeline.tasks) == 2
Expand Down
4 changes: 3 additions & 1 deletion ingestion/tests/integration/ometa/test_ometa_table_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ def setUpClass(cls) -> None:

cls.create_db_entity = cls.metadata.create_or_update(data=cls.create_db)

cls.db_reference = EntityReference(id=cls.create_db_entity.id, name="test-db", type="database")
cls.db_reference = EntityReference(
id=cls.create_db_entity.id, name="test-db", type="database"
)

cls.entity = Table(
id=uuid.uuid4(),
Expand Down
12 changes: 9 additions & 3 deletions ingestion/tests/integration/ometa/test_ometa_tags_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from unittest import TestCase

from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import CreateTagCategoryRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.entity.tags.tagCategory import Tag, TagCategory
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
Expand Down Expand Up @@ -130,7 +132,9 @@ def test_b_update_primary_tag(self):
"""Test put tag category"""

rand_name = random.getrandbits(64)
updated_primary_tag = CreateTagRequest(description="test tag", name=f"{rand_name}")
updated_primary_tag = CreateTagRequest(
description="test tag", name=f"{rand_name}"
)

self.metadata.update_primary_tag(
CATEGORY_NAME, PRIMARY_TAG_NAME, updated_primary_tag
Expand All @@ -142,7 +146,9 @@ def test_a_update_secondary_tag(self):
"""Test put tag category"""

rand_name = random.getrandbits(64)
updated_secondary_tag = CreateTagRequest(description="test tag", name=f"{rand_name}")
updated_secondary_tag = CreateTagRequest(
description="test tag", name=f"{rand_name}"
)

self.metadata.update_secondary_tag(
CATEGORY_NAME, PRIMARY_TAG_NAME, SECONDARY_TAG_NAME, updated_secondary_tag
Expand Down
46 changes: 23 additions & 23 deletions ingestion/tests/integration/orm_profiler/test_orm_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,26 @@
from unittest import TestCase

import pytest
from metadata.config.common import WorkflowExecutionError

from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base

from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.data.table import Table

from metadata.ingestion.api.workflow import Workflow
from metadata.ingestion.ometa.ometa_api import OpenMetadata

from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig

from metadata.ingestion.api.workflow import Workflow
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base

from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.orm_profiler.engines import create_and_bind_session


sqlite_shared = "file:cachedb?mode=memory&cache=shared"

ingestion_config = {
"source": {
"type": "sqlite",
"config": {
"service_name": "test_sqlite",
"database": sqlite_shared # We need this to share the session
}
"database": sqlite_shared, # We need this to share the session
},
},
"sink": {"type": "metadata-rest", "config": {}},
"metadata_server": {
Expand Down Expand Up @@ -74,7 +68,9 @@ class ProfilerWorkflowTest(TestCase):
Run the end to end workflow and validate
"""

engine = create_engine(f"sqlite+pysqlite:///{sqlite_shared}", echo=True, future=True)
engine = create_engine(
f"sqlite+pysqlite:///{sqlite_shared}", echo=True, future=True
)
session = create_and_bind_session(engine)

server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api")
Expand Down Expand Up @@ -106,7 +102,9 @@ def test_ingestion(self):
Validate that the ingestion ran correctly
"""

table_entity: Table = self.metadata.get_by_name(entity=Table, fqdn="test_sqlite.main.users")
table_entity: Table = self.metadata.get_by_name(
entity=Table, fqdn="test_sqlite.main.users"
)
assert table_entity.fullyQualifiedName == "test_sqlite.main.users"

def test_profiler_workflow(self):
Expand Down Expand Up @@ -139,26 +137,28 @@ def test_profiler_workflow(self):
{
"name": "check name count",
"column": "name",
"expression": "count < 10"
"expression": "count < 10",
},
{
"name": "check null count",
"column": "nickname",
"expression": "null_count == 0"
}
]
"expression": "null_count == 0",
},
],
}
]
}
}
],
},
},
}

profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
status = profiler_workflow.print_status()
profiler_workflow.stop()

assert status == 1 # We have a test error, so we get a failure with exit status 1
assert (
status == 1
) # We have a test error, so we get a failure with exit status 1

with pytest.raises(WorkflowExecutionError):
profiler_workflow.raise_from_status()
4 changes: 3 additions & 1 deletion ingestion/tests/integration/source/hive/test_hive_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ def create_delete_table(client: OpenMetadata, databases: List[Database]):
Column(name="id", dataType="INT", dataLength=1),
Column(name="name", dataType="VARCHAR", dataLength=1),
]
db_ref = EntityReference(id=databases[0].id, name=databases[0].name.__root__, type="database")
db_ref = EntityReference(
id=databases[0].id, name=databases[0].name.__root__, type="database"
)
table = CreateTableRequest(name="test1", columns=columns, database=db_ref)
created_table = client.create_or_update(table)
if table.name.__root__ == created_table.name.__root__:
Expand Down
4 changes: 3 additions & 1 deletion ingestion/tests/integration/source/mssql/test_mssql_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def create_delete_table(client):
Column(name="id", columnDataType="INT"),
Column(name="name", columnDataType="VARCHAR"),
]
db_ref = EntityReference(id=databases[0].id, name=databases[0].name.__root__, type="database")
db_ref = EntityReference(
id=databases[0].id, name=databases[0].name.__root__, type="database"
)
table = CreateTableRequest(name="test1", columns=columns, database=db_ref)
created_table = client.create_or_update_table(table)
if table.name.__root__ == created_table.name.__root__:
Expand Down
4 changes: 3 additions & 1 deletion ingestion/tests/integration/source/mysql/test_mysql_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def create_delete_table(client: OpenMetadata):
Column(name="id", dataType="INT", dataLength=1),
Column(name="name", dataType="VARCHAR", dataLength=1),
]
db_ref = EntityReference(id=databases[0].id, name=databases[0].name.__root__, type="database")
db_ref = EntityReference(
id=databases[0].id, name=databases[0].name.__root__, type="database"
)
table = CreateTableRequest(name="test1", columns=columns, database=db_ref)
created_table = client.create_or_update(table)
if table.name.__root__ == created_table.name.__root__:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ def test_create_table_service(catalog_service):
service=EntityReference(id=postgres_dbservice.id, type="databaseService"),
)
created_database = client.create_database(create_database_request)
db_ref = EntityReference(id=created_database.id.__root__, name=created_database.name.__root__, type="database")
table = CreateTableRequest(
name=table_name, columns=columns, database=db_ref
db_ref = EntityReference(
id=created_database.id.__root__,
name=created_database.name.__root__,
type="database",
)
table = CreateTableRequest(name=table_name, columns=columns, database=db_ref)
created_table = client.create_or_update_table(table)
if created_database and created_table:
assert 1
Expand Down
6 changes: 3 additions & 3 deletions ingestion/tests/integration/source/trino/test_trino_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def create_delete_table(client: OpenMetadata, databases: List[Database]):
Column(name="name", dataType="VARCHAR", dataLength=1),
]
print(databases[0])
db_ref = EntityReference(id=databases[0].id.__root__, name=databases[0].name.__root__, type="database")
table = CreateTableRequest(
name="test1", columns=columns, database=db_ref
db_ref = EntityReference(
id=databases[0].id.__root__, name=databases[0].name.__root__, type="database"
)
table = CreateTableRequest(name="test1", columns=columns, database=db_ref)
created_table = client.create_or_update(table)
if table.name.__root__ == created_table.name.__root__:
client.delete(entity=Table, entity_id=str(created_table.id.__root__))
Expand Down

0 comments on commit fe5618c

Please sign in to comment.