Skip to content

Commit

Permalink
Fix open-metadata#2937: added clickhouse usage connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 committed Feb 25, 2022
1 parent 6dadbc1 commit bdd2545
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 1 deletion.
41 changes: 41 additions & 0 deletions ingestion/examples/workflows/clickhouse_usage.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"source": {
"type": "clickhouse-usage",
"config": {
"username":"default",
"password":"",
"database": "default",
"service_name": "local_clickhouse",
"schema_filter_pattern": {
"excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"]
},
"duration":2
}
},
"processor": {
"type": "query-parser",
"config": {
"filter": ""
}
},
"stage": {
"type": "table-usage",
"config": {
"filename": "/tmp/clickhouse_usage"
}
},
"bulk_sink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/clickhouse_usage"
}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
}
}

2 changes: 1 addition & 1 deletion ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def get_long_description():
"mlflow": {"mlflow-skinny~=1.22.0"},
"sklearn": {"scikit-learn==1.0.2"},
"db2": {"ibm-db-sa==0.3.7"},
"clickhouse": {"clickhouse-driver==0.2.3", "clickhouse-sqlalchemy==0.1.8"},
"clickhouse": {"clickhouse-driver==0.2.3", "clickhouse-sqlalchemy==0.2.0"},
"databricks": {"sqlalchemy-databricks==0.1.0"},
"singlestore": {"pymysql>=1.0.2"},
}
Expand Down
2 changes: 2 additions & 0 deletions ingestion/src/metadata/ingestion/source/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
from typing import Optional

from clickhouse_sqlalchemy.drivers.base import ClickHouseDialect
from clickhouse_sqlalchemy.drivers.http.transport import RequestsTransport, _get_type
Expand Down Expand Up @@ -139,6 +140,7 @@ class ClickhouseConfig(SQLConnectionConfig):
host_port = "localhost:8123"
scheme = "clickhouse+http"
service_type = DatabaseServiceType.ClickHouse.value
duration: Optional[int]

def get_connection_url(self):
return super().get_connection_url()
Expand Down
120 changes: 120 additions & 0 deletions ingestion/src/metadata/ingestion/source/clickhouse_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Clickhouse usage module
"""

from typing import Any, Dict, Iterable, Iterator, Union

from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.ingestion.api.source import Source, SourceStatus

# This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.clickhouse import ClickhouseConfig
from metadata.ingestion.source.sql_alchemy_helper import (
SQLAlchemyHelper,
SQLSourceStatus,
)
from metadata.utils.helpers import get_raw_extract_iter, get_start_and_end
from metadata.utils.sql_queries import CLICKHOUSE_SQL_USAGE_STATEMENT


class ClickhouseUsageSource(Source[TableQuery]):
"""
Clickhouse Usage source
Args:
config:
metadata_config:
ctx:
Attributes:
config:
analysis_date:
sql_stmt:
alchemy_helper:
report:
"""

def __init__(self, config, metadata_config, ctx):
super().__init__(ctx)
self.config = config
start, end = get_start_and_end(config.duration)
self.analysis_date = start
self.sql_stmt = CLICKHOUSE_SQL_USAGE_STATEMENT.format(
start_time=start, end_time=end
)
self.alchemy_helper = SQLAlchemyHelper(
config,
metadata_config,
ctx,
DatabaseServiceType.ClickHouse.value,
self.sql_stmt,
)
self.report = SQLSourceStatus()

@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = ClickhouseConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)

def prepare(self):
return super().prepare()

def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy helper
:return:
"""
rows = self.alchemy_helper.execute_query()
for row in rows:
yield row

def next_record(self) -> Iterable[TableQuery]:
"""
Using itertools.groupby and raw level iterator,
it groups to table and yields TableMetadata
:return:
"""
for row in get_raw_extract_iter(self.alchemy_helper):
table_query = TableQuery(
query=row["query_id"],
user_name=row["user_name"],
starttime=str(row["start_time"]),
endtime=str(row["end_time"]),
analysis_date=self.analysis_date,
aborted=row["aborted"],
database=row["database_name"][0]
if len(row["database_name"]) >= 1
else "default",
sql=row["query_text"],
service_name=self.config.service_name,
)
yield table_query

def get_report(self):
"""
get report
Returns:
"""
return self.report

def close(self):
self.alchemy_helper.close()

def get_status(self) -> SourceStatus:
return self.report
17 changes: 17 additions & 0 deletions ingestion/src/metadata/utils/sql_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,20 @@
ON db.database_id = t.dbid
ORDER BY s.last_execution_time DESC;
"""

CLICKHOUSE_SQL_USAGE_STATEMENT = """
Select
query_start_time start_time,
DATEADD(query_duration_ms, query_start_time) end_time,
databases database_name,
user user_name,
FALSE aborted,
query_id query_id,
query query_text,
NULL schema_name,
tables tables
From system.query_log
Where start_time between '{start_time}' and '{end_time}'
and CAST(type,'Int8') <> 3
and CAST(type,'Int8') <> 4
"""

0 comments on commit bdd2545

Please sign in to comment.