From 371b96cfc82e78d4f838cf43a600a539c85205d0 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 2 Sep 2021 20:18:35 +0530 Subject: [PATCH 1/2] MSSQL sample-data query fix --- ingestion/src/metadata/ingestion/source/mssql.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/mssql.py b/ingestion/src/metadata/ingestion/source/mssql.py index d2e3286489e3..b961e4aafeeb 100644 --- a/ingestion/src/metadata/ingestion/source/mssql.py +++ b/ingestion/src/metadata/ingestion/source/mssql.py @@ -14,12 +14,12 @@ # limitations under the License. # This import verifies that the dependencies are available. +from metadata.generated.schema.entity.data.table import TableData import sqlalchemy_pytds # noqa: F401 from .sql_source import SQLConnectionConfig, SQLSource from ..ometa.openmetadata_rest import MetadataServerConfig - class MssqlConfig(SQLConnectionConfig): host_port = "localhost:1433" scheme = "mssql+pytds" @@ -28,13 +28,21 @@ class MssqlConfig(SQLConnectionConfig): def get_connection_url(self): return super().get_connection_url() - - class MssqlSource(SQLSource): def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) + def fetch_sample_data(self, schema: str, table: str): + query = f"select top 50 * from {schema}.{table}" + results = self.connection.execute(query) + cols = list(results.keys()) + rows = [] + for r in results: + row = list(r) + rows.append(row) + return TableData(columns=cols, rows=rows) + @classmethod def create(cls, config_dict, metadata_config_dict, ctx): config = MssqlConfig.parse_obj(config_dict) From 72363facba1810a605005ea4a9e06abda084cfa7 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Tue, 7 Sep 2021 00:26:58 +0530 Subject: [PATCH 2/2] Query Format as per Database implemented --- ingestion/examples/workflows/bigquery.json | 4 +- .../examples/workflows/confluent_kafka.json | 2 +- ingestion/examples/workflows/hive.json | 2 +- ingestion/examples/workflows/looker.json | 2 +- ingestion/examples/workflows/mssql.json | 13 +- ingestion/examples/workflows/postgres.json | 2 +- ingestion/examples/workflows/presto.json | 2 +- ingestion/examples/workflows/redshift.json | 4 +- ingestion/examples/workflows/snowflake.json | 3 +- ingestion/examples/workflows/superset.json | 2 +- ingestion/pipelines/mysql.json | 7 +- ingestion/setup.py | 4 +- .../sink/metadata_rest_dashboards.py | 139 ------------------ .../ingestion/sink/metadata_rest_tables.py | 89 ----------- .../ingestion/sink/metadata_rest_topics.py | 68 --------- .../src/metadata/ingestion/source/bigquery.py | 10 -- .../src/metadata/ingestion/source/mssql.py | 10 -- .../src/metadata/ingestion/source/redshift.py | 3 - .../metadata/ingestion/source/sql_source.py | 6 +- 19 files changed, 25 insertions(+), 347 deletions(-) delete mode 100644 ingestion/src/metadata/ingestion/sink/metadata_rest_dashboards.py delete mode 100644 ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py delete mode 100644 ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py diff --git a/ingestion/examples/workflows/bigquery.json b/ingestion/examples/workflows/bigquery.json index 26fe007a603f..a96880a3da8c 100644 --- a/ingestion/examples/workflows/bigquery.json +++ b/ingestion/examples/workflows/bigquery.json @@ -3,7 +3,7 @@ "type": "bigquery", "config": { "project_id": "project_id", - "host_port": "https://bigquery.googleapis.com", + "host_port": "bigquery.googleapis.com", "username": "gcpuser@project_id.iam.gserviceaccount.com", "service_name": "gcp_bigquery", "options": { @@ -25,7 +25,7 @@ } }, "sink": { - "type": "metadata-rest-tables", + "type": "metadata-rest", "config": { "api_endpoint": "http://localhost:8585/api" } diff --git a/ingestion/examples/workflows/confluent_kafka.json b/ingestion/examples/workflows/confluent_kafka.json index a62ec61b7213..4066f444a3ee 100644 --- a/ingestion/examples/workflows/confluent_kafka.json +++ b/ingestion/examples/workflows/confluent_kafka.json @@ -11,7 +11,7 @@ } }, "sink": { - "type": "metadata-rest-topics", + "type": "metadata-rest", "config": { } }, diff --git a/ingestion/examples/workflows/hive.json b/ingestion/examples/workflows/hive.json index 04684fa6f70a..284077df8e71 100644 --- a/ingestion/examples/workflows/hive.json +++ b/ingestion/examples/workflows/hive.json @@ -11,7 +11,7 @@ "config": {} }, "sink": { - "type": "metadata-rest-tables", + "type": "metadata-rest", "config": {} }, "metadata_server": { diff --git a/ingestion/examples/workflows/looker.json b/ingestion/examples/workflows/looker.json index 5a1e5884142c..4307a4588b2d 100644 --- a/ingestion/examples/workflows/looker.json +++ b/ingestion/examples/workflows/looker.json @@ -10,7 +10,7 @@ } }, "sink": { - "type": "metadata-rest-dashboards", + "type": "metadata-rest", "config": {} }, "metadata_server": { diff --git a/ingestion/examples/workflows/mssql.json b/ingestion/examples/workflows/mssql.json index 56f404c31ffe..b2acc2ab7805 100644 --- a/ingestion/examples/workflows/mssql.json +++ b/ingestion/examples/workflows/mssql.json @@ -4,9 +4,10 @@ "config": { "host_port": "localhost:1433", "service_name": "local_mssql", - "database":"catalog_test", + "database": "catalog_test", + "query": "select top 50 * from {}.{}", "username": "sa", - "password": "test!Password`[", + "password": "test!Password", "filter_pattern": { "excludes": ["catalog_test.*"] } @@ -14,13 +15,11 @@ }, "processor": { "type": "pii", - "config": { - } + "config": {} }, "sink": { - "type": "metadata-rest-tables", - "config": { - } + "type": "metadata-rest", + "config": {} }, "metadata_server": { "type": "metadata-server", diff --git a/ingestion/examples/workflows/postgres.json b/ingestion/examples/workflows/postgres.json index c6d3f7e09cd9..0d86e0017a70 100644 --- a/ingestion/examples/workflows/postgres.json +++ b/ingestion/examples/workflows/postgres.json @@ -14,7 +14,7 @@ "config": {} }, "sink": { - "type": "metadata-rest-tables", + "type": "metadata-rest", "config": {} }, "metadata_server": { diff --git a/ingestion/examples/workflows/presto.json b/ingestion/examples/workflows/presto.json index a088a4efb566..cb2a462ed9ef 100644 --- a/ingestion/examples/workflows/presto.json +++ b/ingestion/examples/workflows/presto.json @@ -14,7 +14,7 @@ } }, "sink": { - "type": "metadata-rest-tables", + "type": "metadata-rest", "config": { } }, diff --git a/ingestion/examples/workflows/redshift.json b/ingestion/examples/workflows/redshift.json index 40566d938023..fd22db30e306 100644 --- a/ingestion/examples/workflows/redshift.json +++ b/ingestion/examples/workflows/redshift.json @@ -8,7 +8,7 @@ "database": "warehouse", "service_name": "aws_redshift", "filter_pattern": { - "excludes": ["information_schema.*","[\\w]*event_vw.*"] + "excludes": ["information_schema.*", "[\\w]*event_vw.*"] } } }, @@ -17,7 +17,7 @@ "config": {} }, "sink": { - "type": "metadata-rest-tables", + "type": "metadata-rest", "config": {} }, "metadata_server": { diff --git a/ingestion/examples/workflows/snowflake.json b/ingestion/examples/workflows/snowflake.json index 30c0d1e70072..daf045a8a0d8 100644 --- a/ingestion/examples/workflows/snowflake.json +++ b/ingestion/examples/workflows/snowflake.json @@ -2,6 +2,7 @@ "source": { "type": "snowflake", "config": { + "host_port": "account.region.service.snowflakecomputing.com", "username": "username", "password": "strong_password", @@ -20,7 +21,7 @@ "config": {} }, "sink": { - "type": "metadata-rest-tables", + "type": "metadata-rest", "config": {} }, "metadata_server": { diff --git a/ingestion/examples/workflows/superset.json b/ingestion/examples/workflows/superset.json index c11a032b0fc6..64ec011323c6 100644 --- a/ingestion/examples/workflows/superset.json +++ b/ingestion/examples/workflows/superset.json @@ -9,7 +9,7 @@ } }, "sink": { - "type": "metadata-rest-dashboards", + "type": "metadata-rest", "config": { } }, diff --git a/ingestion/pipelines/mysql.json b/ingestion/pipelines/mysql.json index f08c8889c093..2b084c4ccebe 100644 --- a/ingestion/pipelines/mysql.json +++ b/ingestion/pipelines/mysql.json @@ -17,15 +17,14 @@ } }, "sink": { - "type": "metadata-rest-tables", - "config": { - } + "type": "metadata-rest", + "config": {} }, "metadata_server": { "type": "metadata-server", "config": { "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "auth_provider_type": "no-auth" } }, "cron": { diff --git a/ingestion/setup.py b/ingestion/setup.py index 341334b2be02..a30f0e3b6670 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -93,9 +93,7 @@ def get_long_description(): "scheduler": scheduler_requirements, "snowflake": {"snowflake-sqlalchemy<=1.2.4"}, "snowflake-usage": {"snowflake-sqlalchemy<=1.2.4"}, - "sample-tables": {"faker~=8.1.1", }, - "sample-topics": {}, - "sample-data": {"faker~=8.1.1",}, + "sample-data": {"faker~=8.1.1"}, "superset": {} } diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest_dashboards.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_dashboards.py deleted file mode 100644 index efb1fa665239..000000000000 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest_dashboards.py +++ /dev/null @@ -1,139 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from typing import List - -from pydantic import ValidationError - -from metadata.config.common import ConfigModel -from metadata.generated.schema.api.data.createChart import CreateChartEntityRequest -from metadata.generated.schema.api.data.createDashboard import CreateDashboardEntityRequest -from metadata.generated.schema.entity.data.chart import ChartType -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import WorkflowContext, Record -from metadata.ingestion.api.sink import Sink, SinkStatus -from metadata.ingestion.models.table_metadata import Chart, Dashboard -from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig -from metadata.ingestion.source import metadata - -logger = logging.getLogger(__name__) - - -class MetadataDashboardsSinkConfig(ConfigModel): - api_endpoint: str = None - - -om_chart_type_dict = { - "line": ChartType.Line, - "table": ChartType.Table, - "dist_bar": ChartType.Bar, - "bar": ChartType.Bar, - "big_number": ChartType.Line, - "histogram": ChartType.Histogram, - "big_number_total": ChartType.Line, - "dual_line": ChartType.Line, - "line_multi": ChartType.Line, - "treemap": ChartType.Area, - "box_plot": ChartType.Bar -} - -class MetadataRestDashboardsSink(Sink): - config: MetadataDashboardsSinkConfig - status: SinkStatus - - def __init__(self, ctx: WorkflowContext, config: MetadataDashboardsSinkConfig, - metadata_config: MetadataServerConfig): - super().__init__(ctx) - self.config = config - self.metadata_config = metadata_config - self.status = SinkStatus() - self.wrote_something = False - self.client = OpenMetadataAPIClient(self.metadata_config) - self.charts_dict = {} - - @classmethod - def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): - config = MetadataDashboardsSinkConfig.parse_obj(config_dict) - metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) - return cls(ctx, config, metadata_config) - - def write_record(self, record: Record) -> None: - if isinstance(record, Chart): - self._ingest_charts(record) - elif isinstance(record, Dashboard): - self._ingest_dashboards(record) - else: - logging.info("Ignoring the record due to unknown Record type {}".format(type(record))) - - def _ingest_charts(self, chart: Chart): - try: - om_chart_type = ChartType.Other - if chart.chart_type is not None and chart.chart_type in om_chart_type_dict.keys(): - om_chart_type = om_chart_type_dict[chart.chart_type] - - chart_request = CreateChartEntityRequest( - name=chart.name, - displayName=chart.displayName, - description=chart.description, - chartType=om_chart_type, - chartUrl=chart.url, - service=chart.service - ) - created_chart = self.client.create_or_update_chart(chart_request) - self.charts_dict[chart.name] = EntityReference(id=created_chart.id, type='chart') - logger.info( - 'Successfully ingested {}'.format(created_chart.displayName)) - self.status.records_written( - '{}'.format(created_chart.displayName)) - except (APIError, ValidationError) as err: - logger.error( - "Failed to ingest chart {}".format(chart.displayName)) - logger.error(err) - self.status.failure(chart.displayName) - - def _ingest_dashboards(self, dashboard: Dashboard): - try: - charts = self._get_chart_references(dashboard) - - dashboard_request = CreateDashboardEntityRequest( - name=dashboard.name, - displayName=dashboard.displayName, - description=dashboard.description, - dashboardUrl=dashboard.url, - charts=charts, - service=dashboard.service - ) - created_dashboard = self.client.create_or_update_dashboard(dashboard_request) - logger.info('Successfully ingested {}'.format(created_dashboard.displayName)) - self.status.records_written('{}'.format(created_dashboard.displayName)) - except (APIError, ValidationError) as err: - logger.error("Failed to ingest chart {}".format(dashboard.name)) - logger.error(err) - self.status.failure(dashboard.name) - - def _get_chart_references(self, dashboard: Dashboard) -> []: - chart_references = [] - for chart_id in dashboard.charts: - if chart_id in self.charts_dict.keys(): - chart_references.append(self.charts_dict[chart_id]) - return chart_references - - def get_status(self): - return self.status - - def close(self): - pass diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py deleted file mode 100644 index 0a80d5d34596..000000000000 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py +++ /dev/null @@ -1,89 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from pydantic import ValidationError - -from metadata.config.common import ConfigModel -from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest -from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import WorkflowContext -from metadata.ingestion.api.sink import Sink, SinkStatus -from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable -from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig - -logger = logging.getLogger(__name__) - - -class MetadataTablesSinkConfig(ConfigModel): - api_endpoint: str = None - - -class MetadataRestTablesSink(Sink): - config: MetadataTablesSinkConfig - status: SinkStatus - - def __init__(self, ctx: WorkflowContext, config: MetadataTablesSinkConfig, metadata_config: MetadataServerConfig): - super().__init__(ctx) - self.config = config - self.metadata_config = metadata_config - self.status = SinkStatus() - self.wrote_something = False - self.client = OpenMetadataAPIClient(self.metadata_config) - - @classmethod - def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): - config = MetadataTablesSinkConfig.parse_obj(config_dict) - metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) - return cls(ctx, config, metadata_config) - - def write_record(self, table_and_db: OMetaDatabaseAndTable) -> None: - try: - db_request = CreateDatabaseEntityRequest(name=table_and_db.database.name, - description=table_and_db.database.description, - service=EntityReference(id=table_and_db.database.service.id, - type="databaseService")) - db = self.client.create_database(db_request) - table_request = CreateTableEntityRequest(name=table_and_db.table.name, - tableType=table_and_db.table.tableType, - columns=table_and_db.table.columns, - description=table_and_db.table.description, - database=db.id) - - if table_and_db.table.viewDefinition is not None and table_and_db.table.viewDefinition != "": - table_request.viewDefinition = table_and_db.table.viewDefinition.__root__ - - created_table = self.client.create_or_update_table(table_request) - if table_and_db.table.sampleData is not None: - self.client.ingest_sample_data(id=created_table.id, sample_data=table_and_db.table.sampleData) - - logger.info( - 'Successfully ingested {}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__)) - self.status.records_written( - '{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__)) - except (APIError, ValidationError) as err: - logger.error( - "Failed to ingest table {} in database {} ".format(table_and_db.table.name.__root__, table_and_db.database.name.__root__)) - logger.error(err) - self.status.failure(table_and_db.table.name.__root__) - - def get_status(self): - return self.status - - def close(self): - pass diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py deleted file mode 100644 index 0269aa2a19ac..000000000000 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py +++ /dev/null @@ -1,68 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging - -from pydantic import ValidationError - -from metadata.config.common import ConfigModel -from metadata.generated.schema.api.data.createTopic import CreateTopic -from metadata.ingestion.api.common import WorkflowContext -from metadata.ingestion.api.sink import Sink, SinkStatus -from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig - -logger = logging.getLogger(__name__) - - -class MetadataTopicsSinkConfig(ConfigModel): - api_endpoint: str = None - - -class MetadataRestTopicsSink(Sink): - config: MetadataTopicsSinkConfig - status: SinkStatus - - def __init__(self, ctx: WorkflowContext, config: MetadataTopicsSinkConfig, metadata_config: MetadataServerConfig): - super().__init__(ctx) - self.config = config - self.metadata_config = metadata_config - self.status = SinkStatus() - self.wrote_something = False - self.rest = OpenMetadataAPIClient(self.metadata_config) - - @classmethod - def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): - config = MetadataTopicsSinkConfig.parse_obj(config_dict) - metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) - return cls(ctx, config, metadata_config) - - def write_record(self, topic: CreateTopic) -> None: - try: - created_topic = self.rest.create_or_update_topic(topic) - logger.info( - 'Successfully ingested {}'.format(created_topic.name.__root__)) - self.status.records_written(created_topic) - except (APIError, ValidationError) as err: - logger.error( - "Failed to ingest topic {} ".format(topic.name.__root__)) - logger.error(err) - self.status.failure(topic.name) - - def get_status(self): - return self.status - - def close(self): - pass diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index a783869711ad..017890be88ed 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -45,16 +45,6 @@ def create(cls, config_dict, metadata_config_dict, ctx): metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) return cls(config, metadata_config, ctx) - def fetch_sample_data(self, schema: str, table: str): - query = f"select * from {self.config.project_id}.{schema}.{table} limit 50" - results = self.connection.execute(query) - cols = list(results.keys()) - rows = [] - for r in results: - row = list(r) - rows.append(row) - return TableData(columns=cols, rows=rows) - def standardize_schema_table_names( self, schema: str, table: str ) -> Tuple[str, str]: diff --git a/ingestion/src/metadata/ingestion/source/mssql.py b/ingestion/src/metadata/ingestion/source/mssql.py index b961e4aafeeb..edcdfd38ff26 100644 --- a/ingestion/src/metadata/ingestion/source/mssql.py +++ b/ingestion/src/metadata/ingestion/source/mssql.py @@ -33,16 +33,6 @@ class MssqlSource(SQLSource): def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) - def fetch_sample_data(self, schema: str, table: str): - query = f"select top 50 * from {schema}.{table}" - results = self.connection.execute(query) - cols = list(results.keys()) - rows = [] - for r in results: - row = list(r) - rows.append(row) - return TableData(columns=cols, rows=rows) - @classmethod def create(cls, config_dict, metadata_config_dict, ctx): config = MssqlConfig.parse_obj(config_dict) diff --git a/ingestion/src/metadata/ingestion/source/redshift.py b/ingestion/src/metadata/ingestion/source/redshift.py index bc01d95143ca..165acef669a7 100644 --- a/ingestion/src/metadata/ingestion/source/redshift.py +++ b/ingestion/src/metadata/ingestion/source/redshift.py @@ -38,11 +38,8 @@ def get_identifier(self, schema: str, table: str) -> str: def get_connection_url(self): return super().get_connection_url() - - class RedshiftSource(SQLSource): - def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index b826f1c30db8..b0f680a1f9b9 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -67,6 +67,7 @@ class SQLConnectionConfig(ConfigModel): scheme: str service_name: str service_type: str + query: Optional[str] = 'select * from {}.{} limit 50' options: dict = {} include_views: Optional[bool] = True include_tables: Optional[bool] = True @@ -174,13 +175,12 @@ def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowCont def standardize_schema_table_names( self, schema: str, table: str ) -> Tuple[str, str]: - print("IN SQL SOURCE") return schema, table def fetch_sample_data(self, schema: str, table: str): try: - query = f"select * from {schema}.{table} limit 50" - logger.info("Fetching sample data, this may take a while {}".format(query)) + query = self.config.query.format(schema,table) + logger.info(query) results = self.connection.execute(query) cols = list(results.keys()) rows = []