# Queries
This notebook contains a set of DMS query examples that can be used as templates.

In [None]:
import json
import time
from collections import defaultdict

from cognite.client import CogniteClient
from cognite.client.data_classes.data_modeling import (
    EdgeListWithCursor,
    InstanceSort,
    NodeId,
    NodeListWithCursor,
    ViewId,
)
from cognite.client.data_classes.data_modeling.query import (
    NodeResultSetExpression,
    Query,
    QueryResult,
    Select,
    SourceSelector,
)
from cognite.client.data_classes.filters import (
    And,
    ContainsAll,
    Equals,
    HasData,
    In,
    Nested,
    Prefix,
    SpaceFilter,
)
from cognite.client.exceptions import CogniteAPIError
from tenacity import retry, stop_after_attempt, retry_if_exception, wait_exponential

from get_client import get_client


client: CogniteClient = get_client("absolute/path/to/.env/file")
client._config.headers = {"cdf-version": "alpha"} # enable debug in api call to /query
print("Client configured with alpha features enabled")

# Setup spaces

In [None]:

DMU_SP = "dmu_rmdm_instances"

# Setup Views

In [None]:
asset_vid = ViewId(space="dmu_rmdm_model", external_id="Asset", version="1.0.0")
ts_vid = ViewId(space="dmu_rmdm_model", external_id="TimeSeriesExt", version="1.0.0")
file_vid = ViewId(space="dmu_rmdm_model", external_id="FileExt", version="1.0.0")
act_vid = ViewId(space="dmu_rmdm_model", external_id="Activity", version="1.0.0")
eq_vid = ViewId(space="dmu_rmdm_model", external_id="Equipment", version="1.0.0")
mo_vid = ViewId(space="dmu_rmdm_model", external_id="MaintenanceOrder", version="1.0.0")
eqtype_vid = ViewId(space="cdf_cdm", external_id="CogniteEquipmentType", version="v1")
cognite_asset_vid = ViewId(space="cdf_cdm", external_id="CogniteAsset", version="v1")
describable_vid = ViewId(space="cdf_cdm", external_id="CogniteDescribable", version="v1")
cognite_act_vid = ViewId(space="cdf_cdm", external_id="CogniteActivity", version="v1")

In [None]:
client.data_modeling.views.list(space="dmu_rmdm_model")

# Assets
## General asset queries

### List assets and sort based on their externalId

Listing assets works almost the same as in the case of legacy assets. The main difference is the **sources** argument, that allows to choose the properties that will be fetched, by selecting a view (or a list of views).

You can sort/filter either by using a property specified within a View or node/edge registry.
Sorting by created/updated time is not allowed as of now, due to performance considerations (too much reindexing on every instance update).

In [None]:
assets = client.data_modeling.instances.list(
    sources=asset_vid, # Asset view - all properties from this View will be returned
    space=DMU_SP, # Space to search in for instances
    limit=1000,
    filter=Equals(property=asset_vid.as_property_ref("sourceContext"), value="BSEE-Well"), # Filtering by sourceContext property of the asset view
    sort=InstanceSort(property=["node", "externalId"], direction="descending"), # Sorting by externalId in descending order
)
print(len(assets))
print(assets)

# Search endpoint

Search allows for lookup based on regex queries. The nuances and tokenization method is described in the docs.

What's important is that this endpoint is targeting elastic-search backend, where *everything is indexed*, so it should be more performant than query endpoint.

On the other hand, search can only reach the properties with type 'text',

In [None]:
assets = client.data_modeling.instances.search(
    view=ts_vid,
    space=DMU_SP,
    limit=1000,
    sort=InstanceSort(property=ts_vid.as_property_ref("name"), direction="descending"),
    query="CNY-AC", # Search will not find the timerseries with DR to CNY-AC, but it will look for "CNY-AC" string in any text property
)
print(len(assets))
print(assets)

## Iterative listing

Using instances API you can fetch the instances in batches, to avoid timeouts and reduce memory load

In [None]:
for i, assets in enumerate(client.data_modeling.instances(
    chunk_size=1000, 
    instance_type='node',
    sources=asset_vid, # Asset view - all properties from this View will be returned
    space=DMU_SP, # Space to search in for instances
    limit=20000, # Higher limit to showcase the iterative listing
    sort=InstanceSort(property=["node", "externalId"], direction="descending"), # Sorting by externalId in descending order
)):
    print(f"Fetching batch {i+1}")
    print(len(assets))

### Fetchin assets with query endpoint

You can use querying for simple extraction if you need only a specific subset of properties of retreived instances

This query is equivalent to the list() example above

In [None]:
sourceContext = "BSEE-Well"
query = Query(
    with_={  # equivalent to FROM all Nodes in the project WHERE space = SESSION2_SP AND has data in asset view AND sourceContext = BSEE-Well
        "assets": NodeResultSetExpression(
            filter=And(
                Equals(["node", "space"], value={"parameter": "space"}),
                HasData(views=[asset_vid]),
                Equals(property=asset_vid.as_property_ref("sourceContext"), value={"parameter": "sourceContext"}),
            ),
            limit=1000,
            sort=[InstanceSort(property=("node", "externalId"), direction="descending")],
        ),
    },
    select={  # equivalent to SELECT name, parent, tags FROM assets
        "assets": Select(
            [SourceSelector(asset_vid, ["name", "parent", "tags"])],
        ),
    },
    parameters={
        "space": DMU_SP, "sourceContext": sourceContext
    },
)
try:
    res = client.data_modeling.instances.query(query=query)
    assets = res["assets"]
    print(len(assets))
    print(assets)
except CogniteAPIError as e:
    print(e)


# Get subtree of an asset

Getting a subset of assets based on a root is a common use case. Use the 'path' property to extract all assets with a given node in their paths.

### With ContainsAll

In [None]:
sub_tree_root = NodeId(DMU_SP, "CNY-AC")

start_time = time.time()
sub_tree_nodes = client.data_modeling.instances.list(
    sources=asset_vid,
    filter=ContainsAll(property=asset_vid.as_property_ref("path"), values=[sub_tree_root]),
    limit=1000,
)
contains_time = time.time() - start_time
print(f"ContainsAll filter call took: {contains_time:.3f} seconds")
print(len(sub_tree_nodes))
print(sub_tree_nodes)

### With Prefix

You may also use the Prefix filter to extract the instances of which 'path' begins with a certain node.

In [None]:
sub_tree_root = NodeId(DMU_SP, "CNY-AC")

start_time = time.time()
# Retrieve the root asset first to get its path
sub_tree_root_retrieved = client.data_modeling.instances.retrieve_nodes(
    sub_tree_root,
    sources=asset_vid,
)

# Use the path of the root asset to filter the assets
sub_tree_nodes_prefix = client.data_modeling.instances.list(
    sources=asset_vid,
    filter=Prefix(
        property=asset_vid.as_property_ref("path"),
        value=sub_tree_root_retrieved.properties.data[asset_vid]["path"],
    ),
    limit=1000,
)
prefix_time = time.time() - start_time
print(f"Prefix filter call took: {prefix_time:.3f} seconds")
print(len(sub_tree_nodes_prefix))
print(sub_tree_nodes_prefix)

## Get multiple representations of an asset

As you know, a single instance may have its properties in multiple views. When querying, listing or retrieval, it's possible to get multiple sources (views) along with their properties.

In [None]:
asset_external_id = "CNY-AC"
space = DMU_SP
client.data_modeling.instances.retrieve_nodes(
    NodeId(space, asset_external_id),
    sources=[asset_vid, cognite_asset_vid, describable_vid],
)

### The same call using query SDK

In [None]:
asset_external_id = "CNY-AC"
query = Query(
    with_={  # equivalent to FROM all Nodes in the project WHERE space = DMU_SP and externalId = CNY-AC
        "assets": NodeResultSetExpression(
            filter=And(
                Equals(["node", "externalId"], value=asset_external_id),
                SpaceFilter(space=DMU_SP),  # Setting this as { "parameter": "space" } doesn't work
            ),
        ),
    },
    select={
        # seeing the same instance through both views
        "assets": Select(
            [
                SourceSelector(asset_vid, ["*"]), # You can actually specify the properties if you want to
                SourceSelector(cognite_asset_vid, ["*"]), 
                SourceSelector(describable_vid, ["*"]), 
            ],
        ),
    },
)
try:
    res = client.data_modeling.instances.query(query=query)
    print(res["assets"])   
except CogniteAPIError as e:
    print(e)


## Get parent and/or children of an asset

### Note that you can use this logic for any kind of **single** direct relations (and their reverse). For example, you can retrieve the type of the asset (see below)

This way, you can traverse a graph using direct relations

In [None]:
# asset_eid = "WLL-6080740225"
asset_eid = "CNY-AC"
query = Query(
    with_={  # equivalent to FROM all Nodes in the project WHERE space = DMU_SP and externalId = CNY-AC
        "asset": NodeResultSetExpression(
            filter=And(
                Equals(property=("node", "externalId"), value=asset_eid),
                Equals(property=("node", "space"), value={"parameter": "space"}),  # Instead of SpaceFilter
            ),
        ),
        "parent": NodeResultSetExpression(
            from_="asset", # equivalent to FROM retrieved asset
            through=asset_vid.as_property_ref("parent"), # use parent property from Asset view
            direction="outwards",  # outwards, meaning get the parent of the retrievedasset
        ),
        "children": NodeResultSetExpression(
            from_="asset", # equivalent to FROM retrieved asset
            through=asset_vid.as_property_ref("parent"), # use parent property from Asset view
            direction="inwards",  # inwards, meaning the assets that have the asset as a parent
        ),
        "further_children": NodeResultSetExpression(
            from_="children", # equivalent to FROM retrieved children
            through=asset_vid.as_property_ref("parent"), # use parent property from Asset view
            direction="inwards",  # inwards, the further children that have 'children' as a parent
        ),
        "type": NodeResultSetExpression(
            from_="asset", # equivalent to FROM retrieved asset
            through=asset_vid.as_property_ref("type"), # use type property from Asset view
            direction="outwards",  # outwards, meaning get the type of the retrieved asset
        ),
    },
    select={
        "parent": Select(
            [
                SourceSelector(
                    source=asset_vid, # use properties from Asset view
                    properties=["name", "description", "source"], # properties to return
                ),
            ],
        ),
        "children": Select(
            [
                SourceSelector(
                    source=asset_vid, # use properties from Asset view
                    properties=["*"], # properties to return
                ),
            ],
        ),
        "further_children": Select(
            [
                SourceSelector(
                    source=asset_vid, # use properties from Asset view
                    properties=["*"], # properties to return
                ),
            ],
        ),
        "type": Select(
            [
                SourceSelector(
                    source=asset_vid, # use properties from Asset view
                    properties=["*"], # properties to return
                ),
            ],
        ),
    },
    parameters={"space": DMU_SP}, # parameter to use in the query
)
try:
    res = client.data_modeling.instances.query(query=query)
    print(f"parent: {res['parent']}")
    print(f"children: {res['children']}")
    print(f"further_children: {res['further_children']}")
    print(f"type: {res['type']}")
    print(f"same_asset: {res['same_asset']}")
    print(f"length of same_asset: {len(res['same_asset'])}")
except CogniteAPIError as e:
    print(e)



## Using the Nested filter

Nested filter allows to use property of the directly related View to filter the instances. The filter can be applied only to single direct relations. 

In [None]:

query = Query(
    with_={
        "asset": NodeResultSetExpression(
            # equivalent to FROM all Nodes in the project WHERE 'parent' of instances with properties
            # in the Asset view has an alias 'AC'
            filter=Nested(
                # Direct relation to instances with properties in Asset view through 'parent' property
                scope=asset_vid.as_property_ref("parent"),
                filter=ContainsAll(
                    property=asset_vid.as_property_ref("aliases"),  # Property in Asset view
                    values=["AC"], # Value to filter by
                ),
            ),
            limit=1000,
        ),
        "equipment": NodeResultSetExpression(
            # equivalent to FROM all Nodes in the project WHERE 'asset' of instances with properties
            # in the Equipment view has a tag 'Permanently Abandoned'
            filter=Nested(
                scope=eq_vid.as_property_ref("asset"),
                filter=ContainsAll(
                    property=eq_vid.as_property_ref("tags"),
                    values=["Permanently Abandoned"],
                ),
            ),
            limit=1000,
        ),
    },
    select={
        "asset": Select(
            [
                SourceSelector(
                    source=asset_vid, # use properties from Asset view
                    properties=["*"], # properties to return
                ),
            ]
        ),
        "equipment": Select(
            [
                SourceSelector(
                    source=eq_vid, # use properties from Equipment view
                    properties=["*"], # properties to return
                ),
            ]
        ),
    },
)
try:
    res = client.data_modeling.instances.query(query=query)
    print(res["asset"])
    print(res["equipment"])
    print(len(res["asset"]))
    print(len(res["equipment"]))
except CogniteAPIError as e:
    print(e)


## The same query as a json object

In some cases you may need to use a json object instead of SDK for querying

In [None]:
# JSON representation of the Nested filter query
json_query = {
    "with": {
        "asset": { # identifier of the item to retrieve
            "limit": 1000, # default limit is 100
            "nodes": {  # equivalent to FROM all Nodes in the project WHERE 'parent' of instances with properties
                        # in the Asset view has 'tags' property with value 'Permanently Abandoned'
                "filter": {
                    "nested": {
                        # Direct relation to instances with properties in Asset view through 'parent' property
                        "scope": [
                            asset_vid.space,
                            f"{asset_vid.external_id}/{asset_vid.version}",
                            "parent",
                        ],
                        # Filter by 'tags' property in Asset view
                        "filter": {
                            "containsAll": {
                                "property": [
                                    asset_vid.space,
                                    f"{asset_vid.external_id}/{asset_vid.version}",
                                    "aliases",
                                ],
                                # Value to filter by
                                "values": ["AC"],
                            },
                        },
                    },
                },
            },
        },
        "equipment": {
            # equivalent to FROM all Nodes in the project WHERE 'asset' of instances with properties
            # in the Equipment view has a tag 'Permanently Abandoned'
            "limit": 1000, # default limit is 100
            "nodes": {
                "filter": {
                    "nested": {
                        # Direct relation to instances with properties in Equipment view through 'asset' property
                        "scope": [
                            eq_vid.space,
                            f"{eq_vid.external_id}/{eq_vid.version}",
                            "asset",
                        ],
                        "filter": {
                            # Filter by 'tags' property in Equipment view
                            "containsAll": {
                                "property": [
                                    eq_vid.space,
                                    f"{eq_vid.external_id}/{eq_vid.version}",
                                    "tags",
                                ],
                                "values": ["Permanently Abandoned"],
                            },
                        },
                    },
                },
            },
        },
    },
    "select": {
        "asset": {
            "sources": [
                {
                    "source": {
                        "type": "view",
                        "space": asset_vid.space,
                        "externalId": asset_vid.external_id,
                        "version": asset_vid.version,
                    },
                    "properties": ["*"],  # All properties
                },
            ],
        },
        "equipment": {
            "sources": [
                {
                    "source": {
                        "type": "view",
                        "space": eq_vid.space,
                        "externalId": eq_vid.external_id,
                        "version": eq_vid.version,
                    },
                    "properties": ["*"],  # All properties
                },
            ],
        },
    },
    "debug": {},
}
try:
    res = client.post(url=f"/api/v1/projects/{client.config.project}/models/instances/query",  json=json_query)
    assets = json.loads(res.content)["items"]["asset"]
    equipments = json.loads(res.content)["items"]["equipment"]
    print(assets)
    print(len(assets))
    print(equipments)
    print(len(equipments))
    debug = json.loads(res.content)["debug"]
    print(debug)
except CogniteAPIError as e:
    print(e)


# Timeseries, activities, files
## Retrive timeseries related to an asset
Activities and files can be returned the same way.

The main problem here is that there is no way to extract assets and then use them to find the related timeseries. It is not possible because
- the properties holding node references pointing to assets are lists of direct relations
- reverse lists of direct relations cannot be queried

If your use case requires traversing multiple nodes both ways and lists of direct relations do not fulfill the requirements - that's when you need edges. Another way is to chain the queries outside of 'query' structure (query -> get result -> use in next query)

In [None]:
asset_id = NodeId(space=DMU_SP, external_id="PLTF-EW1003A (Prince)-811")
print(asset_id.dump(include_instance_type=False))
query = Query(
    with_={
        "timeseries": NodeResultSetExpression(
            filter=ContainsAll(property=ts_vid.as_property_ref("assets"), values={"parameter": "asset"}),
            limit=1000,
        ),
    },
    select={
        "timeseries": Select(
            [
                SourceSelector(
                    source=ts_vid,
                    properties=["name", "description", "source", "unit", "assets", "equipment", "activities"],
                ),
            ],
        ),
    },
    parameters={"asset": [asset_id.dump(include_instance_type=False)]},
)
try:
    res = client.data_modeling.instances.query(query=query)
    print(res["timeseries"])
    print(len(res["timeseries"]))
except CogniteAPIError as e:
    print(e)


## Retrieve activities of a timeseries and equipment related to these activities

In [None]:
timeseries_id = NodeId(space=DMU_SP, external_id="CUMULATIVE_BOE_PER_DAY_TS_6081740998")
query = Query(
    with_={
        "activities": NodeResultSetExpression(
            filter=ContainsAll(
                property=cognite_act_vid.as_property_ref("timeSeries"),
                values={"parameter": "timeseries"},
            ),
            limit=100,
        ),
        "equipment_activities": NodeResultSetExpression(
            from_="activities",
            through=cognite_act_vid.as_property_ref("equipment"), # This must be a property reference
            limit=10,
        ),
    },
    select={
        "activities": Select(
            [
                SourceSelector(
                    source=cognite_act_vid,
                    properties=["name", "description", "source", "assets", "equipment"],
                ),
            ],
        ),
        "equipment_activities": Select(
            [
                SourceSelector(
                    source=eq_vid,
                    properties=["name", "description", "source"],
                ),
            ],
        ),
    },
    parameters={"timeseries": [timeseries_id.dump(include_instance_type=False)]},
)
try:
    res = client.data_modeling.instances.query(query=query)
    print(res["activities"])
    print(res["equipment_activities"])
    print("returned activities:", len(res["activities"]))
    print("returned equipment activities:", len(res["equipment_activities"]))
except CogniteAPIError as e:
    print(e)

## Retrieve equipment associated with an asset

You can retrieve equipment related to an asset through the 'asset' property in the Equipment.
This is useful when trying to get the equipment instances associated with assets of a certain type or class
or extensions of CogniteAsset with some properties.

Not that it only works with Equipment - all other Asset entity relationships (to files, timeseries, activities)
are Reverse **Lists** of direct relations, meaning they cannot be traversed inwards. 

In [None]:
asset_tags = ("SHELL OFFSHORE INC.", "Borehole Completed", "AC901")
equipmentType = "Casing"

query = Query(
    with_={
        "assets": NodeResultSetExpression( 
            # equivalent to FROM all Nodes in the project WHERE tags contains all of the asset_tags
            filter=ContainsAll(property=asset_vid.as_property_ref("tags"), values={"parameter": "asset_tags"}),
            ),
        
        "equipment": NodeResultSetExpression(
            from_="assets", # equivalent to FROM retrieved assets
            # all equipment (eq_vid) with asset property equal to the any of the retrieved assets
            through=eq_vid.as_property_ref("asset"), 
            direction="inwards",  # inwards, meaning the equipment components that are associated with the asset
            filter=Nested( 
                # WHERE equipmentType related to the equipment is equal to parameterized equipmentType (Casing)
                scope=eq_vid.as_property_ref("equipmentType"),
                filter=Equals(property=eqtype_vid.as_property_ref("name"), value={"parameter": "equipmentType"}),
            ),
        ),
    },
    select={
        "assets": Select(
            [
                SourceSelector(
                    source=asset_vid, # use properties from CogniteAsset view
                    properties=["*"], # properties to return
                ),
            ],
        ),
        "equipment": Select(
            [
                SourceSelector(
                    source=eq_vid, # use properties from CogniteEquipment view
                    properties=["*"], # properties to return
                ),
            ],
        ),
    },
    parameters={
        "equipmentType": equipmentType, 
        "asset_tags": asset_tags}, # parameters to use in the query
)
try:
    res = client.data_modeling.instances.query(query=query)
    print(res["assets"])
    print(res["equipment"])
except CogniteAPIError as e:
    print(e)


# If there is time for hands-on... 

### 1. List all Equipments with alias "LOBSTER".
Use ContainsAll filter (there should be 7). Sort by "name", descending.

---

### 2. Search for all maintenance orders with "injection" in the longText property 
(verify first 3 that "injection" is there).

---

### 3. Iterate through all activities in batches of 1000.

---

### 4. Query all maintenance orders related to asset with external Id "WLL-6081140466" 
(there should be 4).

---

### 5. Get all equipment of type "EQTY-HEL" 
(there should be 74). Use Nested filter.

# Using the cursor

For completion, the methods below can be used to paginate with the instantiated query.

Examples of usage and considerations are TBD

In [None]:
# I have a function (inspired by Yggdrasil team) that updates a provided query with a cursor that's suited for the sync endpoint.

def get_data(
    client: CogniteClient, query: Query, max_iterations: int | None = 100
) -> tuple[dict[str, list[NodeListWithCursor | EdgeListWithCursor]], dict[str, str]]:
    """Cursor based pagination for data model queries.

    Note that the cursors are updated in the query object.

    Args:
        client (CogniteClient): The Cognite client to use for making the query.
        query (Query): The query to fetch data from CDF data model.
        max_iterations (int): The maximum number of iterations to run. Defaults to 100. Set to None or -1 for no limit.

    Returns:
        tuple: A tuple containing:
            - dict: A dictionary with the data collected from the query, where keys are selection names
              and values are lists of NodeListWithCursor or EdgeListWithCursor.
            - dict: A dictionary with cursors for each selection.
    """

    # Check if cursors are already present in the query
    if all(query.cursors.values()):
        print("Cursors already set in query, continuing retrieval.")

    # Initialize collected data
    collected_data = defaultdict(list[dict])

    # Initialize current iteration
    current_iteration = 0

    # Set max iterations to infinity if not provided
    if max_iterations is None or max_iterations == -1:
        max_iterations = float("inf")

    # Run the query until max iterations is reached or no more data is available
    while current_iteration < max_iterations:
        res = client.data_modeling.instances.sync(query=query)

        # If no data is returned, exit the loop
        if res is None:
            if not collected_data:
                print("No data returned, exiting loop.")
                return {}, {}
            else:
                print("Query failed, but returning collected data so far.")
            return collected_data, {}

        # cursors persists regardless if no data is returned
        if all([not res.data[selection] for selection in res.data]):
            print("No more data available, exiting loop.")
            break

        # Collect data from the query
        print("Collecting data.")
        for selection in res.data:
            collected_data[selection].extend(res.data[selection])

        # Update cursors in the query
        query.cursors = res.cursors
        current_iteration += 1

    print(f"Collected data for {current_iteration} iterations.")
    return collected_data, res.cursors

In [None]:


NUM_ATTEMPTS = 3
EXPONENTIAL_MULTIPLIER = 1
EXPONENTIAL_MIN = 1
EXPONENTIAL_MAX = 10

# 408s occur infrequently, hence why it is also included
def is_retryable_exception(e):
    """Only retry on 408s and 429s."""
    return isinstance(e, CogniteAPIError) and e.code in {408, 429}


retry_cognite = retry(
    reraise=True,
    stop=stop_after_attempt(NUM_ATTEMPTS),
    retry=retry_if_exception(is_retryable_exception),
    wait=wait_exponential(multiplier=EXPONENTIAL_MULTIPLIER, min=EXPONENTIAL_MIN, max=EXPONENTIAL_MAX),
)


@retry_cognite
def make_query_with_retry(client: CogniteClient, query: Query) -> QueryResult:
    """Run a query against the CDF data model with retry logic."""
    return client.data_modeling.instances.sync(query=query)