Skip to content

Commit

Permalink
Fix #2905: Added MSSQL Usage Connector (#2948)
Browse files Browse the repository at this point in the history
* Fix #2509: added mssql usage connector

* removed comment

* removed unused variables

* fixed code smell

* renamed sql variable
  • Loading branch information
ulixius9 committed Feb 23, 2022
1 parent fab36fe commit 4739aaa
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 1 deletion.
42 changes: 42 additions & 0 deletions ingestion/examples/workflows/mssql_usage.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"source": {
"type": "mssql-usage",
"config": {
"host_port": "localhost:1433",
"service_name": "local_mssql",
"database": "catalog_test",
"query": "select top 50 * from {}.{}",
"username": "sa",
"password": "test!Password",
"duration":2,
"table_filter_pattern": {
"excludes": ["catalog_test.*"]
}
}
},
"processor": {
"type": "query-parser",
"config": {
"filter": ""
}
},
"stage": {
"type": "table-usage",
"config": {
"filename": "/tmp/mssql_usage"
}
},
"bulk_sink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/mssql_usage"
}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
}
}
2 changes: 2 additions & 0 deletions ingestion/src/metadata/ingestion/source/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""MSSQL source module"""
from typing import Optional

from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
Expand All @@ -27,6 +28,7 @@ class MssqlConfig(SQLConnectionConfig):
use_pymssql: bool = False
use_pyodbc: bool = False
uri_string: str = ""
duration: Optional[int]

def get_connection_url(self):
if self.use_pyodbc:
Expand Down
116 changes: 116 additions & 0 deletions ingestion/src/metadata/ingestion/source/mssql_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MSSQL usage module
"""

from typing import Any, Dict, Iterable, Iterator, Union

from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.ingestion.api.source import Source, SourceStatus

# This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.mssql import MssqlConfig
from metadata.ingestion.source.sql_alchemy_helper import (
SQLAlchemyHelper,
SQLSourceStatus,
)
from metadata.utils.helpers import get_raw_extract_iter, get_start_and_end
from metadata.utils.sql_queries import MSSQL_SQL_USAGE_STATEMENT


class MssqlUsageSource(Source[TableQuery]):
"""
MSSQL Usage source
Args:
config:
metadata_config:
ctx:
Attributes:
config:
analysis_date:
sql_stmt:
alchemy_helper:
report:
"""

def __init__(self, config, metadata_config, ctx):
super().__init__(ctx)
self.config = config
start, end = get_start_and_end(config.duration)
self.analysis_date = start
self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format(start_date=start, end_date=end)
self.alchemy_helper = SQLAlchemyHelper(
config, metadata_config, ctx, DatabaseServiceType.MSSQL.value, self.sql_stmt
)
self.report = SQLSourceStatus()

@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = MssqlConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)

def prepare(self):
return super().prepare()

def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy helper
:return:
"""
rows = self.alchemy_helper.execute_query()
for row in rows:
yield row

def next_record(self) -> Iterable[TableQuery]:
"""
Using itertools.groupby and raw level iterator,
it groups to table and yields TableMetadata
:return:
"""
for row in get_raw_extract_iter(self.alchemy_helper):
table_query = TableQuery(
query=row["query_type"],
user_name=row["user_name"],
starttime=str(row["start_time"]),
endtime=str(row["end_time"]),
analysis_date=self.analysis_date,
aborted=row["aborted"],
database=row["database_name"],
sql=row["query_text"],
service_name=self.config.service_name,
)
if row["schema_name"] is not None:
self.report.scanned(f"{row['database_name']}.{row['schema_name']}")
else:
self.report.scanned(f"{row['database_name']}")
yield table_query

def get_report(self):
"""
get report
Returns:
"""
return self.report

def close(self):
self.alchemy_helper.close()

def get_status(self) -> SourceStatus:
return self.report
12 changes: 11 additions & 1 deletion ingestion/src/metadata/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# limitations under the License.
import logging
from datetime import datetime, timedelta
from typing import List
from typing import Any, Dict, Iterable, List

from metadata.generated.schema.api.services.createDashboardService import (
CreateDashboardServiceRequest,
Expand Down Expand Up @@ -181,3 +181,13 @@ def datetime_to_ts(date: datetime) -> int:
Convert a given date to a timestamp as an Int
"""
return int(date.timestamp())


def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy helper
:return:
"""
rows = alchemy_helper.execute_query()
for row in rows:
yield row
19 changes: 19 additions & 0 deletions ingestion/src/metadata/utils/sql_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,22 @@
WHERE table_name='{view_name}'
AND {schema_condition}
"""

MSSQL_SQL_USAGE_STATEMENT = """
SELECT
db.NAME database_name,
t.text query_text,
s.last_execution_time start_time,
DATEADD(ms, s.total_elapsed_time, s.last_execution_time) end_time,
NULL schema_name,
NULL query_type,
NULL user_name,
NULL aborted
FROM sys.dm_exec_cached_plans AS p
INNER JOIN sys.dm_exec_query_stats AS s
ON p.plan_handle = s.plan_handle
CROSS APPLY sys.Dm_exec_sql_text(p.plan_handle) AS t
INNER JOIN sys.databases db
ON db.database_id = t.dbid
ORDER BY s.last_execution_time DESC;
"""

0 comments on commit 4739aaa

Please sign in to comment.