In [82]:
# imports and setup
import pandas as pd
import dotenv
import os
import numpy as np
import altair as alt

config = dotenv.load_dotenv(".env")
database_url = os.environ['DATABASE_URL'] # postgresql://{{database_username}}:{{database_password}}@localhost:5432/{database_name}
datahub_env = os.environ['DATAHUB_ENV']

class DuplicateUrns(Exception):
    pass

In [83]:
# Query returns entity types datasets, charts, containers and dashboards
# and following metadata:
# description, slack_channel_name, slack_channel_url, access_requirements, refresh_period, owner, domain

# It only returns entities tagged `dc_display_in_catalogue`

sql = """
WITH datasets AS (
    select 
        urn,
        MAX(version) as latest
    FROM
        metadata_aspect_v2
    WHERE
        aspect in ('datasetProperties','chartInfo', 'containerProperties', 'dashboardInfo')
    GROUP BY
        urn
),

tag_max AS (
    select 
        urn,
        MAX(version) as latest
    FROM
        metadata_aspect_v2
    WHERE
        aspect in ('globalTags')
    GROUP BY
        urn
),

tags AS (
    SELECT
        ma2.urn,
        ma2.metadata
    FROM
        metadata_aspect_v2 as ma2
    JOIN tag_max ON ma2.version = tag_max.latest and ma2.urn = tag_max.urn
    WHERE
        aspect in ('globalTags')
),

owner_max AS (
    select 
        urn,
        MAX(version) as latest
    FROM
        metadata_aspect_v2
    WHERE
        aspect in ('ownership')
    GROUP BY
        urn
),

owners AS (
    SELECT
        ma2.urn,
        ma2.metadata
    FROM
        metadata_aspect_v2 as ma2
    JOIN owner_max ON ma2.version = owner_max.latest and ma2.urn = owner_max.urn
    WHERE
        aspect in ('ownership')
),

domain_max AS(
    select 
        urn,
        MAX(version) as latest
    FROM
        metadata_aspect_v2
    WHERE
        aspect in ('domains')
    GROUP BY
        urn
),

domains AS (
    SELECT
        ma2.urn,
        ma2.metadata
    FROM
        metadata_aspect_v2 as ma2
    JOIN domain_max ON ma2.version = domain_max.latest and ma2.urn = domain_max.urn
    WHERE
        aspect in ('domains')
)


SELECT
    datasets.urn,
    SUBSTRING(
        datasets.urn, 
        POSITION('urn:li:' IN datasets.urn) + LENGTH('urn:li:'),
        POSITION(':' IN SUBSTRING(datasets.urn FROM POSITION('urn:li:' IN datasets.urn) + LENGTH('urn:li:'))) - 1
    ) AS entity_type,
    properties.metadata::json -> 'description' as description,
    properties.metadata::json -> 'customProperties' -> 'dc_slack_channel_name' as slack_channel_name,
    properties.metadata::json -> 'customProperties' -> 'dc_slack_channel_url' as slack_channel_url,
    properties.metadata::json -> 'customProperties' -> 'dc_access_requirements' as access_requirements,
    properties.metadata::json -> 'customProperties' -> 'refresh_period' as refresh_period,
    ownership.metadata::json -> 'owners' -> 0 -> 'owner' as owner,
    domains.metadata::json -> 'domains' -> 0 as domain
FROM
    datasets
LEFT JOIN metadata_aspect_v2 as properties
    ON (properties.urn=datasets.urn and properties.aspect in ('datasetProperties','chartInfo', 'containerProperties', 'dashboardInfo') and properties.version=datasets.latest)
LEFT JOIN owners as ownership 
    ON (ownership.urn=datasets.urn)
LEFT JOIN tags
    ON datasets.urn=tags.urn
LEFT JOIN domains
    ON datasets.urn=domains.urn
WHERE EXISTS (
    SELECT 1
    FROM jsonb_array_elements(tags.metadata::jsonb->'tags') AS tag_element
    WHERE tag_element->>'tag' = 'urn:li:tag:dc_display_in_catalogue'
)
ORDER BY datasets.urn
"""

df_metrics = pd.read_sql(sql, database_url)
df_metrics = df_metrics.replace(r'^\s*$', np.nan, regex=True) # replace empty string with nan


In [84]:
# check no duplicate urns
urns = df_metrics["urn"]
if len(df_metrics[urns.isin(urns[urns.duplicated()])].sort_values("urn").index) > 0:
    raise DuplicateUrns("The query has returned duplicate urns")

In [85]:
# makes chart for counts of missing metadata for all entity types by field
metric_dict = {column: df_metrics[column].isnull().sum() for column in df_metrics.columns if not column == "entity_type" and not column == "urn"}
metric_dict["total_entities"] = len(df_metrics.index)
chart_dict={"field": [], "not populated": []}
for k, v in d.items():
    chart_dict["field"].append(k)
    chart_dict["not populated"].append(int(v))
chart_dict=pd.DataFrame.from_dict(cd, orient='columns')
chart_missing_all_entities_absolute = alt.Chart(chart_dict, title=f"Missing fields - all entities - {datahub_env}").mark_bar().encode(y="field", x=alt.X("not populated", title="Total not populated"), tooltip='not populated').properties(height=250)

In [86]:
# makes chart of % not populated by field and entity type
df_charts = df_metrics.loc[df_metrics['entity_type'] == "chart"]
df_datasets = df_metrics.loc[df_metrics['entity_type'] == "dataset"]
df_containers = df_metrics.loc[df_metrics['entity_type'] == "container"]
df_dashboards = df_metrics.loc[df_metrics['entity_type'] == "dashboard"]

# calculate the % non populated for each metadata field
metric_dict_entities = {}
metric_dict_entities["charts"] = {column: int((df_charts[column].isnull().sum()/len(df_charts.index))*100) for column in df_charts.columns if not column == "entity_type"}
metric_dict_entities["datasets"] = {column: int((df_datasets[column].isnull().sum()/len(df_datasets.index))*100) for column in df_datasets.columns if not column == "entity_type"}
metric_dict_entities["containers"] = {column: int((df_containers[column].isnull().sum()/len(df_containers.index))*100) for column in df_containers.columns if not column == "entity_type"}
metric_dict_entities["dashboards"] = {column: int((df_dashboards[column].isnull().sum()/len(df_dashboards.index))*100) for column in df_dashboards.columns if not column == "entity_type"}

chart_dict={"entity": [], "field": [], "%_not_populated": []}
for k, v in metric_dict_entities.items():
    for k2, v2 in v.items():
        if not k2 == "urn":
            chart_dict["entity"].append(k)
            chart_dict["field"].append(k2)
            chart_dict["%_not_populated"].append(v2)

chart_df = pd.DataFrame.from_dict(chart_dict)

chart_percentage_by_entity_type = alt.Chart(chart_df, title=f"Not populated % by entity type - {datahub_env}").mark_bar().encode(
    x=alt.X('entity:O', title=''),
    # It is usually easier to specify custom titles like this
    y=alt.Y('%_not_populated:Q', title='% not populated'),
    tooltip='%_not_populated',
    color=alt.Color('entity:O', title='entity type'),
    # Changing the header of the faceted column controls label location
    column=alt.Column('field:N', title='', header=alt.Header(labelOrient='bottom', labelAngle=-45, labelAlign='right'))
)

In [87]:
# makes chart of total entities by type
df_entity_totals = pd.DataFrame.from_dict({"Entity type":["datasets", "charts", "containers", "dashboards"], "Total":[len(df_datasets.index),len(df_charts.index),len(df_containers.index),len(df_dashboards.index)]})

chart_entity_totals = alt.Chart(df_entity_totals, title=f"Total entities displayed in find-moj-data {datahub_env}").mark_bar().encode(y="Entity type", x="Total", tooltip="Total").properties(height=200, width=400)

In [88]:
chart_entity_totals

In [89]:
chart_missing_all_entities_absolute

In [90]:
chart_percentage_by_entity_type