Skip to content

Commit

Permalink
Table Constraints Added - Ingestion (#2854)
Browse files Browse the repository at this point in the history
  • Loading branch information
ayush-shah committed Feb 19, 2022
1 parent 0e34028 commit d2c6400
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
10 changes: 9 additions & 1 deletion ingestion/examples/sample_data/datasets/tables.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"description": "Unique identifier for the address.",
"fullyQualifiedName": "bigquery_gcp.shopify.dim_address.address_id",
"tags": [],
"constraint": "PRIMARY_KEY",
"ordinalPosition": 1
},
{
Expand Down Expand Up @@ -131,6 +130,15 @@
"ordinalPosition": 12
}
],
"tableConstraints": [
{
"constraintType": "PRIMARY_KEY",
"columns": [
"address_id",
"shop_id"
]
}
],
"database": {
"id": "50da1ff8-4e1d-4967-8931-45edbf4dd908",
"type": "database",
Expand Down
2 changes: 2 additions & 0 deletions ingestion/src/metadata/ingestion/sink/metadata_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def write_tables(self, db_and_table: OMetaDatabaseAndTable):
columns=db_and_table.table.columns,
description=db_and_table.table.description,
database=db_ref,
tableConstraints=db_and_table.table.tableConstraints,
)
if db_and_table.table.viewDefinition:
table_request.viewDefinition = (
Expand Down Expand Up @@ -219,6 +220,7 @@ def write_tables(self, db_and_table: OMetaDatabaseAndTable):
db_and_table.database.name.__root__,
)
)
logger.debug(traceback.print_exc())
logger.error(err)
self.status.failure(f"Table: {db_and_table.table.name.__root__}")

Expand Down
20 changes: 17 additions & 3 deletions ingestion/src/metadata/ingestion/source/sql_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
from metadata.generated.schema.entity.data.table import (
Column,
Constraint,
ConstraintType,
DataModel,
ModelType,
Table,
TableConstraint,
TableData,
TableProfile,
)
Expand Down Expand Up @@ -93,6 +95,7 @@ def __init__(
self.connection = self.engine.connect()
self.data_profiler = None
self.data_models = {}
self.table_constraints = None
self.database_source_state = set()
if self.config.dbt_catalog_file is not None:
with open(self.config.dbt_catalog_file, "r", encoding="utf-8") as catalog:
Expand Down Expand Up @@ -212,6 +215,7 @@ def fetch_tables(
description = _get_table_description(schema, table_name, inspector)
fqn = f"{self.config.service_name}.{schema}.{table_name}"
self.database_source_state.add(fqn)
self.table_constraints = None
table_columns = self._get_columns(schema, table_name, inspector)
table_entity = Table(
id=uuid.uuid4(),
Expand All @@ -221,6 +225,8 @@ def fetch_tables(
fullyQualifiedName=fqn,
columns=table_columns,
)
if self.table_constraints:
table_entity.tableConstraints = self.table_constraints
try:
if self.sql_config.generate_sample_data:
table_data = self.fetch_sample_data(schema, table_name)
Expand Down Expand Up @@ -371,7 +377,7 @@ def _parse_data_model(self):
)
model_fqdn = f"{schema}.{model_name}"
except Exception as err:
print(err)
logger.error(err)
self.data_models[model_fqdn] = model

def _parse_data_model_upstream(self, mnode):
Expand Down Expand Up @@ -441,17 +447,17 @@ def _get_column_constraints(
Prepare column constraints for the Table Entity
"""
constraint = None

if column["nullable"]:
constraint = Constraint.NULL
elif not column["nullable"]:
constraint = Constraint.NOT_NULL

if column["name"] in pk_columns:
if len(pk_columns) > 1:
return None
constraint = Constraint.PRIMARY_KEY
elif column["name"] in unique_columns:
constraint = Constraint.UNIQUE

return constraint

def _get_columns(
Expand Down Expand Up @@ -521,9 +527,17 @@ def _get_columns(
data_type_display = column["type"]
if parsed_string is None:
col_type = ColumnTypeParser.get_column_type(column["type"])

col_constraint = self._get_column_constraints(
column, pk_columns, unique_columns
)
if not col_constraint and len(pk_columns) > 1:
self.table_constraints = [
TableConstraint(
constraintType=ConstraintType.PRIMARY_KEY,
columns=pk_columns,
)
]
col_data_length = self._check_col_length(
col_type, column["type"]
)
Expand Down

0 comments on commit d2c6400

Please sign in to comment.