Skip to content

Commit

Permalink
revert(bigquery-usage): dataset allow filter impl (datahub-project#4901)
Browse files Browse the repository at this point in the history
* Revert "fix(ingestion): bigquery-usage: Fix biquery usage table deny pattern template (datahub-project#4898)"
  • Loading branch information
anshbansal authored and justinas-marozas committed May 17, 2022
1 parent 9cf151c commit 6bd53a2
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 52 deletions.
Expand Up @@ -74,13 +74,11 @@
BQ_DATE_SHARD_FORMAT = "%Y%m%d"
BQ_AUDIT_V1 = {
"BQ_FILTER_REGEX_ALLOW_TEMPLATE": """
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId =~ "{table_allow_pattern}"
AND
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.datasetId =~ "{dataset_allow_pattern}"
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId =~ "{allow_pattern}"
""",
"BQ_FILTER_REGEX_DENY_TEMPLATE": """
{logical_operator}
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId !~ "{table_deny_pattern}"
protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId !~ "{deny_pattern}"
""",
"BQ_FILTER_RULE_TEMPLATE": """
protoPayload.serviceName="bigquery.googleapis.com"
Expand Down Expand Up @@ -115,11 +113,11 @@

BQ_AUDIT_V2 = {
"BQ_FILTER_REGEX_ALLOW_TEMPLATE": """
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables =~ "projects/.*/datasets/{dataset_allow_pattern}/tables/{table_allow_pattern}"
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables =~ "projects/.*/datasets/.*/tables/{allow_pattern}"
""",
"BQ_FILTER_REGEX_DENY_TEMPLATE": """
{logical_operator}
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/.*/tables/{table_deny_pattern}"
protoPayload.metadata.jobChange.job.jobStats.queryStats.referencedTables !~ "projects/.*/datasets/.*/tables/{deny_pattern}"
""",
"BQ_FILTER_RULE_TEMPLATE": """
resource.type=("bigquery_project" OR "bigquery_dataset")
Expand Down Expand Up @@ -171,24 +169,22 @@
def bigquery_audit_metadata_query_template(
dataset: str,
use_date_sharded_tables: bool,
table_allow_filter: str,
dataset_allow_filter: str,
table_allow_filter: str = None,
) -> str:
"""
Receives a dataset (with project specified) and returns a query template that is used to query exported
v2 AuditLogs containing protoPayloads of type BigQueryAuditMetadata.
:param dataset: the dataset to query against in the form of $PROJECT.$DATASET
:param use_date_sharded_tables: whether to read from date sharded audit log tables or time partitioned audit log
tables
:param table_allow_filter: regex used to filter on log events that contain the wanted tables
:param dataset_allow_filter: regex used to filter on log events that contain wanted datasets
:param table_allow_filter: regex used to filter on log events that contain the wanted datasets
:return: a query template, when supplied start_time and end_time, can be used to query audit logs from BigQuery
"""
allow_filter = f"""
AND EXISTS (SELECT *
from UNNEST(JSON_EXTRACT_ARRAY(protopayload_auditlog.metadataJson,
"$.jobChange.job.jobStats.queryStats.referencedTables")) AS x
where REGEXP_CONTAINS(x, r'(projects/.*/datasets/{dataset_allow_filter if dataset_allow_filter else ".*"}/tables/{table_allow_filter if table_allow_filter else ".*"})'))
where REGEXP_CONTAINS(x, r'(projects/.*/datasets/.*/tables/{table_allow_filter if table_allow_filter else ".*"})'))
"""

query: str
Expand Down Expand Up @@ -655,8 +651,8 @@ def add_config_to_report(self):
self.report.use_v2_audit_metadata = self.config.use_v2_audit_metadata
self.report.query_log_delay = self.config.query_log_delay
self.report.log_page_size = self.config.log_page_size
self.report.allow_pattern = self.config.get_table_allow_pattern_string()
self.report.deny_pattern = self.config.get_table_deny_pattern_string()
self.report.allow_pattern = self.config.get_allow_pattern_string()
self.report.deny_pattern = self.config.get_deny_pattern_string()

def _is_table_allowed(self, table_ref: Optional[BigQueryTableRef]) -> bool:
return (
Expand Down Expand Up @@ -753,9 +749,7 @@ def _get_bigquery_log_entries_via_exported_bigquery_audit_metadata(
list_entries: Iterable[
BigQueryAuditMetadata
] = self._get_exported_bigquery_audit_metadata(
client,
self.config.get_table_allow_pattern_string(),
self.config.get_dataset_allow_pattern_string(),
client, self.config.get_allow_pattern_string()
)
list_entry_generators_across_clients.append(list_entries)
except Exception as e:
Expand All @@ -781,10 +775,7 @@ def _get_bigquery_log_entries_via_exported_bigquery_audit_metadata(
logger.info(f"Finished loading {i} log entries from BigQuery")

def _get_exported_bigquery_audit_metadata(
self,
bigquery_client: BigQueryClient,
table_allow_filter: str,
dataset_allow_filter: str,
self, bigquery_client: BigQueryClient, allow_filter: str
) -> Iterable[BigQueryAuditMetadata]:
if self.config.bigquery_audit_metadata_datasets is None:
return
Expand All @@ -810,10 +801,7 @@ def _get_exported_bigquery_audit_metadata(
).strftime(BQ_DATE_SHARD_FORMAT)

query = bigquery_audit_metadata_query_template(
dataset,
self.config.use_date_sharded_audit_log_tables,
table_allow_filter,
dataset_allow_filter,
dataset, self.config.use_date_sharded_audit_log_tables, allow_filter
).format(
start_time=start_time,
end_time=end_time,
Expand All @@ -822,11 +810,9 @@ def _get_exported_bigquery_audit_metadata(
)
else:
query = bigquery_audit_metadata_query_template(
dataset,
self.config.use_date_sharded_audit_log_tables,
table_allow_filter,
dataset_allow_filter,
dataset, self.config.use_date_sharded_audit_log_tables, allow_filter
).format(start_time=start_time, end_time=end_time)

query_job = bigquery_client.query(query)
logger.info(
f"Finished loading log entries from BigQueryAuditMetadata in {dataset}"
Expand All @@ -852,30 +838,21 @@ def _get_bigquery_log_entries_via_gcp_logging(
# completion event is delayed and happens after the configured end time.

# Can safely access the first index of the allow list as it by default contains ".*"
use_allow_filter = (
self.config.table_pattern
and (
len(self.config.table_pattern.allow) > 1
or self.config.table_pattern.allow[0] != ".*"
)
or self.config.dataset_pattern
and (
len(self.config.dataset_pattern.allow) > 1
or self.config.dataset_pattern.allow[0] != ".*"
)
use_allow_filter = self.config.table_pattern and (
len(self.config.table_pattern.allow) > 1
or self.config.table_pattern.allow[0] != ".*"
)
use_deny_filter = self.config.table_pattern and self.config.table_pattern.deny
allow_regex = (
audit_templates["BQ_FILTER_REGEX_ALLOW_TEMPLATE"].format(
table_allow_pattern=self.config.get_table_allow_pattern_string(),
dataset_allow_pattern=self.config.get_dataset_allow_pattern_string(),
allow_pattern=self.config.get_allow_pattern_string()
)
if use_allow_filter
else ""
)
deny_regex = (
audit_templates["BQ_FILTER_REGEX_DENY_TEMPLATE"].format(
table_deny_pattern=self.config.get_table_deny_pattern_string(),
deny_pattern=self.config.get_deny_pattern_string(),
logical_operator="AND" if use_allow_filter else "",
)
if use_deny_filter
Expand Down
Expand Up @@ -156,14 +156,8 @@ def use_exported_bigquery_audit_metadata_uses_v2(cls, v, values):
)
return v

def get_table_allow_pattern_string(self) -> str:
def get_allow_pattern_string(self) -> str:
return "|".join(self.table_pattern.allow) if self.table_pattern else ""

def get_table_deny_pattern_string(self) -> str:
def get_deny_pattern_string(self) -> str:
return "|".join(self.table_pattern.deny) if self.table_pattern else ""

def get_dataset_allow_pattern_string(self) -> str:
return "|".join(self.dataset_pattern.allow) if self.table_pattern else ""

def get_dataset_deny_pattern_string(self) -> str:
return "|".join(self.dataset_pattern.deny) if self.table_pattern else ""
Expand Up @@ -32,8 +32,8 @@ def test_bq_usage_config():
table_pattern={"allow": ["test-regex", "test-regex-1"], "deny": []},
)
)
assert config.get_table_allow_pattern_string() == "test-regex|test-regex-1"
assert config.get_table_deny_pattern_string() == ""
assert config.get_allow_pattern_string() == "test-regex|test-regex-1"
assert config.get_deny_pattern_string() == ""
assert (config.end_time - config.start_time) == timedelta(hours=2)
assert config.projects == ["sample-bigquery-project-name-1234"]

Expand Down

0 comments on commit 6bd53a2

Please sign in to comment.