# Modeling the Roads Example Database

![dj-roads-erd](./images/dj-roads-erd.jpg)
*note: open in a new tab to see at full-resolution*

In [None]:
import requests

DJ_PROTOCOL = "http"
DJ_HOST = "localhost"
DJ_PORT = 8000
DJ_URL = f"{DJ_PROTOCOL}://{DJ_HOST}:{DJ_PORT}"

# Add a Catalog and an Engine

In DJ, all nodes used in a query must share a common catalog. Before creating source nodes, add a `default` catalog and a spark engine to the system.

In [None]:
response = requests.post(  # Add a catalog named public
    f"{DJ_URL}/catalogs/",
    json={"name": "default"},
)
print(response.json())
response = requests.post(  # Add spark as an engine
    f"{DJ_URL}/engines/",
    json={"name": "spark", "version": "3.1.1"},
)
print(response.json())
response = requests.post(  # Attach the spark engine to the public catalog
    f"{DJ_URL}/catalogs/default/engines/",
    json=[{"name": "spark", "version": "3.1.1"}],
)
print(response.json())

# Create Source Nodes
Create twelve source nodes for each of the tables in the DJ roads example database.

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "repair_order_id": {"type": "INT"},
            "municipality_id": {"type": "STR"},
            "hard_hat_id": {"type": "INT"},
            "order_date": {"type": "DATETIME"},
            "required_date": {"type": "DATETIME"},
            "dispatched_date": {"type": "DATETIME"},
            "dispatcher_id": {"type": "INT"},
        },
        "description": "Repair orders",
        "mode": "published",
        "name": "repair_orders",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "repair_order_details",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "repair_order_id": {"type": "INT"},
            "repair_type_id": {"type": "INT"},
            "price": {"type": "FLOAT"},
            "quantity": {"type": "INT"},
            "discount": {"type": "FLOAT"},
        },
        "description": "Details on repair orders",
        "mode": "published",
        "name": "repair_order_details",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "repair_order_details",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "repair_type_id": {"type": "INT"},
            "repair_type_name": {"type": "STR"},
            "contractor_id": {"type": "INT"},
        },
        "description": "Information on different types of repairs",
        "mode": "published",
        "name": "repair_type",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "repair_type",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "contractor_id": {"type": "INT"},
            "company_name": {"type": "STR"},
            "contact_name": {"type": "STR"},
            "contact_title": {"type": "STR"},
            "address": {"type": "STR"},
            "city": {"type": "STR"},
            "state": {"type": "STR"},
            "postal_code": {"type": "STR"},
            "country": {"type": "STR"},
            "phone": {"type": "STR"},
        },
        "description": "Information on different types of repairs",
        "mode": "published",
        "name": "contractors",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "contractors",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "municipality_id": {"type": "STR"},
            "municipality_type_id": {"type": "STR"},
        },
        "description": "Information on different types of repairs",
        "mode": "published",
        "name": "municipality_municipality_type",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "municipality_municipality_type",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "municipality_type_id": {"type": "STR"},
            "municipality_type_desc": {"type": "STR"},
        },
        "description": "Information on different types of repairs",
        "mode": "published",
        "name": "municipality_type",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "municipality_type",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "municipality_id": {"type": "STR"},
            "contact_name": {"type": "STR"},
            "contact_title": {"type": "STR"},
            "local_region": {"type": "STR"},
            "phone": {"type": "STR"},
            "state_id": {"type": "INT"},
        },
        "description": "Information on different types of repairs",
        "mode": "published",
        "name": "municipality",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "municipality",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "dispatcher_id": {"type": "INT"},
            "company_name": {"type": "STR"},
            "phone": {"type": "STR"},
        },
        "description": "Information on different types of repairs",
        "mode": "published",
        "name": "dispatchers",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "dispatchers",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "hard_hat_id": {"type": "INT"},
            "last_name": {"type": "STR"},
            "first_name": {"type": "STR"},
            "title": {"type": "STR"},
            "birth_date": {"type": "DATETIME"},
            "hire_date": {"type": "DATETIME"},
            "address": {"type": "STR"},
            "city": {"type": "STR"},
            "state": {"type": "STR"},
            "postal_code": {"type": "STR"},
            "country": {"type": "STR"},
            "manager": {"type": "INT"},
            "contractor_id": {"type": "INT"},
        },
        "description": "Information on different types of repairs",
        "mode": "published",
        "name": "hard_hats",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "hard_hats",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "hard_hat_id": {"type": "INT"},
            "state_id": {"type": "STR"},
        },
        "description": "Information on different types of repairs",
        "mode": "published",
        "name": "hard_hat_state",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "hard_hat_state",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "state_id": {"type": "INT"},
            "state_name": {"type": "STR"},
            "state_abbr": {"type": "STR"},
            "state_region": {"type": "INT"},
        },
        "description": "Information on different types of repairs",
        "mode": "published",
        "name": "us_states",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "us_states",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "columns": {
            "us_region_id": {"type": "INT"},
            "us_region_description": {"type": "STR"},
        },
        "description": "Information on different types of repairs",
        "mode": "published",
        "name": "us_region",
        "type": "source",
        "catalog": "default",
        "schema_": "roads",
        "table": "us_region",
    },
)
response.json()

# Create Dimension Nodes

Dimension nodes are how you represent dimensions in the data model and can be defined using any SQL, including filters as well as joins to source nodes, transform nodes, and other dimension nodes.

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Repair order dimension",
        "query": """
            SELECT
            repair_order_id,
            municipality_id,
            hard_hat_id,
            dispatcher_id
            FROM repair_orders
        """,
        "mode": "published",
        "name": "repair_order",
        "type": "dimension",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Contractor dimension",
        "query": """
            SELECT
            contractor_id,
            company_name,
            contact_name,
            contact_title,
            address,
            city,
            state,
            postal_code,
            country,
            phone
            FROM contractors
        """,
        "mode": "published",
        "name": "contractor",
        "type": "dimension",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Hard hat dimension",
        "query": """
            SELECT
            hard_hat_id,
            last_name,
            first_name,
            title,
            birth_date,
            hire_date,
            address,
            city,
            state,
            postal_code,
            country,
            manager,
            contractor_id
            FROM hard_hats
        """,
        "mode": "published",
        "name": "hard_hat",
        "type": "dimension",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Hard hat dimension",
        "query": """
            SELECT
            hh.hard_hat_id,
            last_name,
            first_name,
            title,
            birth_date,
            hire_date,
            address,
            city,
            state,
            postal_code,
            country,
            manager,
            contractor_id,
            hhs.state_id AS state_id
            FROM hard_hats hh
            LEFT JOIN hard_hat_state hhs
            ON hh.hard_hat_id = hhs.hard_hat_id
            WHERE hh.state_id = 'NY'
        """,
        "mode": "published",
        "name": "local_hard_hats",
        "type": "dimension",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "US state dimension",
        "query": """
            SELECT
            state_id,
            state_name,
            state_abbr,
            state_region,
            r.us_region_description AS state_region_description
            FROM us_states s
            LEFT JOIN us_region r
            ON s.state_region = r.us_region_id
        """,
        "mode": "published",
        "name": "us_state",
        "type": "dimension",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Dispatcher dimension",
        "query": """
            SELECT
            dispatcher_id,
            company_name,
            phone
            FROM dispatchers
        """,
        "mode": "published",
        "name": "dispatcher",
        "type": "dimension",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Municipality dimension",
        "query": """
            SELECT
            m.municipality_id,
            contact_name,
            contact_title,
            local_region,
            phone,
            state_id,
            mmt.municipality_type_id,
            mt.municipality_type_desc
            FROM municipality AS m
            LEFT JOIN municipality_municipality_type AS mmt
            ON m.municipality_id = mmt.municipality_id
            LEFT JOIN municipality_type AS mt
            ON mmt.municipality_type_id = mt.municipality_type_desc
        """,
        "mode": "published",
        "name": "municipality_dim",
        "type": "dimension",
    },
)
response.json()

# Add Metrics

Metrics are defined by writing a SQL query that performs an aggregation function on an expression using columns from any **single** source node, dimension node, or transform node. Here are some metrics that can be added to the DJ server.

- `num_repair_orders` - Number of repair orders
- `avg_repair_price` - Avg price of a repair order
- `total_repair_cost` - Total price of a repair order
- `avg_length_of_employment` - Avg length of employment
- `total_repair_order_discounts` - Total discounts on repair orders
- `avg_repair_order_discounts` - Avg discount on repair orders
- `avg_time_to_dispatch` - Avg time to dispatch
- `avg_time_to_dispatch_local` - Avg time to dispatch in NYC

Each of these metrics can be grouped by any dimensions that are discoverable through dimension labels, such as `municipality`, `state`, `region`, `contractor`, `dispatcher`, or `hard_hat`.

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Number of repair orders",
        "query": "SELECT count(repair_order_id) as num_repair_orders FROM repair_orders",
        "mode": "published",
        "name": "num_repair_orders",
        "type": "metric",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Average repair price",
        "query": "SELECT avg(price) as avg_repair_price FROM repair_order_details",
        "mode": "published",
        "name": "avg_repair_price",
        "type": "metric",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Total repair cost",
        "query": "SELECT sum(price) as total_repair_cost FROM repair_order_details",
        "mode": "published",
        "name": "total_repair_cost",
        "type": "metric",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Average length of employment",
        "query": "SELECT avg(NOW() - hire_date) as avg_length_of_employment FROM hard_hats",
        "mode": "published",
        "name": "avg_length_of_employment",
        "type": "metric",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Total repair order discounts",
        "query": "SELECT sum(price * discount) as total_discount FROM repair_order_details",
        "mode": "published",
        "name": "total_repair_order_discounts",
        "type": "metric",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Total repair order discounts",
        "query": "SELECT avg(price * discount) as avg_repair_order_discount FROM repair_order_details",
        "mode": "published",
        "name": "avg_repair_order_discounts",
        "type": "metric",
    },
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/",
    json={
        "description": "Average time to dispatch a repair order",
        "query": "SELECT avg(dispatched_date - order_date) as avg_time_to_dispatch FROM repair_orders",
        "mode": "published",
        "name": "avg_time_to_dispatch",
        "type": "metric",
    },
)
response.json()

# Link Columns to Dimension Nodes

Dimensions are discovered through labels on node columns throughout the DJ DAG. These labels link these columns to the primary key(s) of the dimension node. Create links to all columns in the DJ DAG to their corresponding dimension nodes.

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/repair_order_details/columns/repair_order_id/?dimension=repair_order&dimension_column=repair_order_id"
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/repair_orders/columns/municipality_id/?dimension=municipality_dim&dimension_column=municipality_id"
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/repair_type/columns/contractor_id/?dimension=contractor&dimension_column=contractor_id"
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/repair_orders/columns/hard_hat_id/?dimension=hard_hat&dimension_column=hard_hat_id"
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/repair_orders/columns/dispatcher_id/?dimension=dispatcher&dimension_column=dispatcher_id"
)
response.json()

In [None]:
response = requests.post(
    f"{DJ_URL}/nodes/local_hard_hats/columns/state_id/?dimension=us_state&dimension_column=state_id"
)
response.json()

# View All Existing Nodes

Let's look at the full list of nodes that are now in the DJ system and can be used to generate queries.

In [None]:
nodes = requests.get(f"{DJ_URL}/nodes/").json()
for i in sorted([f"{node['type']} -> {node['name']}" for node in nodes]):
    print(i)

# Generate SQL Using Metrics & Dimensions

You can now generate SQL queries for any metric and include any of the discoverable dimensions that you'd like to group it by. Let's list out the dimensions that are available for each metric in DJ.

In [None]:
for metric in [
    "avg_length_of_employment",
    "avg_repair_order_discounts",
    "avg_repair_price",
    "avg_time_to_dispatch",
    "num_repair_orders",
    "total_repair_cost",
    "total_repair_order_discounts",
]:
    response = requests.get(
        f"{DJ_URL}/metrics/{metric}/",
    )
    metric_metadata = response.json()
    print(metric)
    print("---")
    for dimension in metric_metadata["dimensions"]:
        print(dimension)
    print()

### Using the metric SQL endpoint, let's generate some SQL for a few metrics, grouped by dimensions.

In [None]:
response = requests.get(
    f"{DJ_URL}/metrics/num_repair_orders/sql/?dimensions=hard_hat.first_name,hard_hat.last_name&check_database_online=false",
)
print(response.json()["sql"])

In [None]:
response = requests.get(
    f"{DJ_URL}/metrics/total_repair_order_discounts/sql/?dimensions=repair_order.hard_hat_id&check_database_online=false",
)
print(response.json()["sql"])

In [None]:
response = requests.get(
    f"{DJ_URL}/metrics/avg_time_to_dispatch/sql/?dimensions=hard_hat.state&check_database_online=false",
)
print(response.json()["sql"])

response = requests.get(
    f"{DJ_URL}/metrics/avg_repair_price/sql/?dimensions=repair_order.municipality_id&check_database_online=false",
)
print(response.json()["sql"])

# Report Materializations by Setting Availability on a Node

### Materializations can be reported by adding an `availability` to a node. When DJ builds the query, it will use any availability states it finds. Let's add an availability to the `repair_order` dimension node.

In [None]:
# Add an availability for the contractor dimension node
response = requests.post(
    f"{DJ_URL}/data/availability/repair_order/",
    json = {
        "catalog": "default",
        "schema_": "roads",
        "table": "repair_order_materialization_123",
        "valid_through_ts": 20230306,
        "max_partition": ["20230305"],
        "min_partition": ["20230304"],
    }
)
response.json()

### Now the same request that incldues the `repair_order` dimension will use the `availability` that it finds.

In [None]:
response = requests.get(
    f"{DJ_URL}/metrics/avg_repair_price/sql/?dimensions=repair_order.municipality_id&check_database_online=false",
)
print(response.json()["sql"])