Skip to content

Commit

Permalink
feat: Metricflow and dbt Core (#265)
Browse files Browse the repository at this point in the history
* feat: Metricflow and dbt Core

* Add tests

* Add manifest
  • Loading branch information
betodealmeida committed Mar 8, 2024
1 parent 235ebf6 commit 5171892
Show file tree
Hide file tree
Showing 6 changed files with 1,658 additions and 140 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.4.0
hooks:
- id: check-added-large-files
#- id: check-added-large-files
- id: check-ast
- id: check-json
- id: check-merge-conflict
Expand Down
80 changes: 72 additions & 8 deletions src/preset_cli/cli/superset/sync/dbt/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
A command to sync dbt models/metrics to Superset and charts/dashboards back as exposures.
"""

import logging
import os.path
import subprocess
import sys
import warnings
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

import click
import yaml
Expand All @@ -17,6 +19,7 @@
JobSchema,
MetricSchema,
MFMetricWithSQLSchema,
MFSQLEngine,
ModelSchema,
)
from preset_cli.api.clients.superset import SupersetClient
Expand All @@ -31,6 +34,8 @@
)
from preset_cli.exceptions import DatabaseNotFoundError

_logger = logging.getLogger(__name__)


@click.command()
@click.argument("file", type=click.Path(exists=True, resolve_path=True))
Expand Down Expand Up @@ -180,6 +185,10 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many
with open(manifest, encoding="utf-8") as input_:
configs = yaml.load(input_, Loader=yaml.SafeLoader)

with open(profiles, encoding="utf-8") as input_:
config = yaml.safe_load(input_)
dialect = MFSQLEngine(config[project]["outputs"][target]["type"].upper())

model_schema = ModelSchema()
models = []
for config in configs["nodes"].values():
Expand All @@ -200,14 +209,18 @@ def dbt_core( # pylint: disable=too-many-arguments, too-many-branches, too-many
]
else:
og_metrics = []
sl_metrics = []
metric_schema = MetricSchema()
for config in configs["metrics"].values():
# conform to the same schema that dbt Cloud uses for metrics
config["dependsOn"] = config.pop("depends_on")["nodes"]
config["uniqueId"] = config.pop("unique_id")
og_metrics.append(metric_schema.load(config))
if "calculation_method" in config or "sql" in config:
# conform to the same schema that dbt Cloud uses for metrics
config["dependsOn"] = config.pop("depends_on")["nodes"]
config["uniqueId"] = config.pop("unique_id")
og_metrics.append(metric_schema.load(config))
elif sl_metric := get_sl_metric(config, model_map, dialect):
sl_metrics.append(sl_metric)

superset_metrics = get_superset_metrics_per_model(og_metrics)
superset_metrics = get_superset_metrics_per_model(og_metrics, sl_metrics)

try:
database = sync_database(
Expand Down Expand Up @@ -338,7 +351,58 @@ def get_job(
raise ValueError(f"Job {job_id} not available")


def process_sl_metrics(
def get_sl_metric(
metric: Dict[str, Any],
model_map: Dict[ModelKey, ModelSchema],
dialect: MFSQLEngine,
) -> Optional[MFMetricWithSQLSchema]:
"""
Compute a SL metric using the ``mf`` CLI.
"""
mf_metric_schema = MFMetricWithSQLSchema()

command = ["mf", "query", "--explain", "--metrics", metric["name"]]
try:
_logger.info(
"Using `mf` command to retrieve SQL syntax for metric %s",
metric["name"],
)
result = subprocess.run(command, capture_output=True, text=True, check=True)
except FileNotFoundError:
_logger.warning(
"`mf` command not found, if you're using Metricflow make sure you have it "
"installed in order to sync metrics",
)
return None
except subprocess.CalledProcessError:
_logger.warning(
"Could not generate SQL for metric %s (this happens for some metrics)",
metric["name"],
)
return None

output = result.stdout.strip()
start = output.find("SELECT")
sql = output[start:]

models = get_models_from_sql(sql, dialect, model_map)
if len(models) > 1:
return None
model = models[0]

return mf_metric_schema.load(
{
"name": metric["name"],
"type": metric["type"],
"description": metric["description"],
"sql": sql,
"dialect": dialect.value,
"model": model["unique_id"],
},
)


def fetch_sl_metrics(
dbt_client: DBTClient,
environment_id: int,
model_map: Dict[ModelKey, ModelSchema],
Expand Down Expand Up @@ -498,7 +562,7 @@ def dbt_cloud( # pylint: disable=too-many-arguments, too-many-locals
model_map = {ModelKey(model["schema"], model["name"]): model for model in models}

og_metrics = dbt_client.get_og_metrics(job["id"])
sl_metrics = process_sl_metrics(dbt_client, job["environment_id"], model_map)
sl_metrics = fetch_sl_metrics(dbt_client, job["environment_id"], model_map)
superset_metrics = get_superset_metrics_per_model(og_metrics, sl_metrics)

if exposures_only:
Expand Down
4 changes: 3 additions & 1 deletion src/preset_cli/cli/superset/sync/dbt/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ def get_metric_expression(unique_id: str, metrics: Dict[str, MetricSchema]) -> s
# dbt >= 1.3
type_ = metric["calculation_method"]
sql = metric["expression"]
else:
elif "sql" in metric:
# dbt < 1.3
type_ = metric["type"]
sql = metric["sql"]
else:
raise Exception(f"Unable to generate metric expression from: {metric}")

if metric.get("filters"):
sql = apply_filters(sql, metric["filters"])
Expand Down
Loading

0 comments on commit 5171892

Please sign in to comment.