Skip to content

Commit

Permalink
Automatic indexes (#1107)
Browse files Browse the repository at this point in the history
  • Loading branch information
achantavy committed Feb 7, 2023
1 parent 4ea9e62 commit 81902b2
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 44 deletions.
42 changes: 42 additions & 0 deletions cartography/client/core/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import neo4j

from cartography.graph.querybuilder import build_create_index_queries
from cartography.graph.querybuilder import build_ingestion_query
from cartography.models.core.nodes import CartographyNodeSchema
from cartography.util import batch


Expand Down Expand Up @@ -210,3 +213,42 @@ def load_graph_data(
DictList=data_batch,
**kwargs,
)


def ensure_indexes(neo4j_session: neo4j.Session, node_schema: CartographyNodeSchema) -> None:
"""
Creates indexes if they don't exist for the given CartographyNodeSchema object, as well as for all of the
relationships defined on its `other_relationships` and `sub_resource_relationship` fields. This operation is
idempotent.
This ensures that every time we need to MATCH on a node to draw a relationship to it, the field used for the MATCH
will be indexed, making the operation fast.
:param neo4j_session: The neo4j session
:param node_schema: The node_schema object to create indexes for.
"""
queries = build_create_index_queries(node_schema)

for query in queries:
if not query.startswith('CREATE INDEX IF NOT EXISTS'):
raise ValueError('Query provided to `ensure_indexes()` does not start with "CREATE INDEX IF NOT EXISTS".')
neo4j_session.run(query)


def load(
neo4j_session: neo4j.Session,
node_schema: CartographyNodeSchema,
dict_list: List[Dict[str, Any]],
**kwargs,
) -> None:
"""
Main entrypoint for intel modules to write data to the graph. Ensures that indexes exist for the datatypes loaded
to the graph and then performs the load operation.
:param neo4j_session: The Neo4j session
:param node_schema: The CartographyNodeSchema object to create indexes for and generate a query.
:param dict_list: The data to load to the graph represented as a list of dicts.
:param kwargs: Allows additional keyword args to be supplied to the Neo4j query.
:return: None
"""
ensure_indexes(neo4j_session, node_schema)
ingestion_query = build_ingestion_query(node_schema)
load_graph_data(neo4j_session, ingestion_query, dict_list, **kwargs)
3 changes: 0 additions & 3 deletions cartography/data/indexes.cypher
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ CREATE INDEX IF NOT EXISTS FOR (n:ELBListener) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:ELBListener) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:ELBV2Listener) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:ELBV2Listener) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:EMRCluster) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:EMRCluster) ON (n.arn);
CREATE INDEX IF NOT EXISTS FOR (n:EMRCluster) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:Endpoint) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:Endpoint) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:ESDomain) ON (n.arn);
Expand Down
53 changes: 53 additions & 0 deletions cartography/graph/querybuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import asdict
from string import Template
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
Expand Down Expand Up @@ -374,3 +375,55 @@ def build_ingestion_query(
attach_relationships_statement=_build_attach_relationships_statement(sub_resource_rel, other_rels),
)
return ingest_query


def build_create_index_queries(node_schema: CartographyNodeSchema) -> List[str]:
"""
Generate queries to create indexes for the given CartographyNodeSchema and all node types attached to it via its
relationships.
:param node_schema: The Cartography node_schema object
:return: A list of queries of the form `CREATE INDEX IF NOT EXISTS FOR (n:$TargetNodeLabel) ON (n.$TargetAttribute)`
"""
index_template = Template('CREATE INDEX IF NOT EXISTS FOR (n:$TargetNodeLabel) ON (n.$TargetAttribute);')

# First ensure an index exists for the node_schema and all extra labels on the `id` and `lastupdated` fields
result = [
index_template.safe_substitute(
TargetNodeLabel=node_schema.label,
TargetAttribute='id',
),
index_template.safe_substitute(
TargetNodeLabel=node_schema.label,
TargetAttribute='lastupdated',
),
]
if node_schema.extra_node_labels:
result.extend([
index_template.safe_substitute(
TargetNodeLabel=label,
TargetAttribute='id', # Precondition: 'id' is defined on all cartography node_schema objects.
) for label in node_schema.extra_node_labels.labels
])

# Next, for all relationships possible out of this node, ensure that indexes exist for all target nodes' properties
# as specified in their TargetNodeMatchers.
rel_schemas = []
if node_schema.sub_resource_relationship:
rel_schemas.extend([node_schema.sub_resource_relationship])
if node_schema.other_relationships:
rel_schemas.extend(node_schema.other_relationships.rels)
for rs in rel_schemas:
for target_key in asdict(rs.target_node_matcher).keys():
result.append(
index_template.safe_substitute(TargetNodeLabel=rs.target_node_label, TargetAttribute=target_key),
)

# Now, include extra indexes defined by the module author on the node schema's property refs.
node_props_as_dict: Dict[str, PropertyRef] = asdict(node_schema.properties)
result.extend([
index_template.safe_substitute(
TargetNodeLabel=node_schema.label,
TargetAttribute=prop_name,
) for prop_name, prop_ref in node_props_as_dict.items() if prop_ref.extra_index
])
return result
39 changes: 17 additions & 22 deletions cartography/intel/aws/emr.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import logging
import time
from typing import Any
from typing import Dict
from typing import List

import boto3
import botocore.exceptions
import neo4j

from cartography.client.core.tx import load_graph_data
from cartography.client.core.tx import load
from cartography.graph.job import GraphJob
from cartography.graph.querybuilder import build_ingestion_query
from cartography.intel.aws.ec2.util import get_botocore_config
from cartography.models.aws.emr import EMRClusterSchema
from cartography.util import aws_handle_regions
Expand All @@ -24,9 +24,9 @@

@timeit
@aws_handle_regions
def get_emr_clusters(boto3_session: boto3.session.Session, region: str) -> List[Dict]:
def get_emr_clusters(boto3_session: boto3.session.Session, region: str) -> List[Dict[str, Any]]:
client = boto3_session.client('emr', region_name=region, config=get_botocore_config())
clusters: List[Dict] = []
clusters: List[Dict[str, Any]] = []
paginator = client.get_paginator('list_clusters')
for page in paginator.paginate():
cluster = page['Clusters']
Expand All @@ -36,36 +36,31 @@ def get_emr_clusters(boto3_session: boto3.session.Session, region: str) -> List[


@timeit
def get_emr_describe_cluster(boto3_session: boto3.session.Session, region: str, cluster_id: str) -> Dict:
def get_emr_describe_cluster(boto3_session: boto3.session.Session, region: str, cluster_id: str) -> Dict[str, Any]:
client = boto3_session.client('emr', region_name=region, config=get_botocore_config())
cluster_details: Dict = {}
cluster_details: Dict[str, Any] = {}
try:
response = client.describe_cluster(ClusterId=cluster_id)
cluster_details = response['Cluster']
except botocore.exceptions.ClientError as e:
logger.warning(
"Could not run EMR describe_cluster due to boto3 error %s: %s. Skipping.",
e.response['Error']['Code'],
e.response['Error']['Message'],
)
code = e.response['Error']['Code']
msg = e.response['Error']['Message']
logger.warning(f"Could not run EMR describe_cluster due to boto3 error {code}: {msg}. Skipping.")
return cluster_details


@timeit
def load_emr_clusters(
neo4j_session: neo4j.Session,
cluster_data: List[Dict],
cluster_data: List[Dict[str, Any]],
region: str,
current_aws_account_id: str,
aws_update_tag: int,
) -> None:
logger.info("Loading EMR %d clusters for region '%s' into graph.", len(cluster_data), region)

ingestion_query = build_ingestion_query(EMRClusterSchema())

load_graph_data(
logger.info(f"Loading EMR {len(cluster_data)} clusters for region '{region}' into graph.")
load(
neo4j_session,
ingestion_query,
EMRClusterSchema(),
cluster_data,
lastupdated=aws_update_tag,
Region=region,
Expand All @@ -74,7 +69,7 @@ def load_emr_clusters(


@timeit
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None:
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict[str, Any]) -> None:
logger.debug("Running EMR cleanup job.")
cleanup_job = GraphJob.from_node_schema(EMRClusterSchema(), common_job_parameters)
cleanup_job.run(neo4j_session)
Expand All @@ -83,14 +78,14 @@ def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None:
@timeit
def sync(
neo4j_session: neo4j.Session, boto3_session: boto3.session.Session, regions: List[str], current_aws_account_id: str,
update_tag: int, common_job_parameters: Dict,
update_tag: int, common_job_parameters: Dict[str, Any],
) -> None:
for region in regions:
logger.info("Syncing EMR for region '%s' in account '%s'.", region, current_aws_account_id)
logger.info(f"Syncing EMR for region '{region}' in account '{current_aws_account_id}'.")

clusters = get_emr_clusters(boto3_session, region)

cluster_data: List[Dict] = []
cluster_data: List[Dict[str, Any]] = []
for cluster in clusters:
cluster_id = cluster['Id']
cluster_details = get_emr_describe_cluster(boto3_session, region, cluster_id)
Expand Down
2 changes: 1 addition & 1 deletion cartography/models/aws/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

@dataclass(frozen=True)
class EMRClusterNodeProperties(CartographyNodeProperties):
arn: PropertyRef = PropertyRef('ClusterArn')
arn: PropertyRef = PropertyRef('ClusterArn', extra_index=True)
auto_terminate: PropertyRef = PropertyRef('AutoTerminate')
autoscaling_role: PropertyRef = PropertyRef('AutoScalingRole')
custom_ami_id: PropertyRef = PropertyRef('CustomAmiId')
Expand Down
10 changes: 9 additions & 1 deletion cartography/models/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,24 @@ class PropertyRef:
(PropertyRef.set_in_kwargs=True).
"""

def __init__(self, name: str, set_in_kwargs=False):
def __init__(self, name: str, set_in_kwargs=False, extra_index=False):
"""
:param name: The name of the property
:param set_in_kwargs: Optional. If True, the property is not defined on the data dict, and we expect to find the
property in the kwargs.
If False, looks for the property in the data dict.
Defaults to False.
:param extra_index: If True, make sure that we create an index for this property name.
Notes:
- extra_index is available for the case where you anticipate a property will be queried frequently.
- The `id` and `lastupdated` properties will always have indexes created for them automatically by
`ensure_indexes()`.
- All properties included in target node matchers will always have indexes created for them.
Defaults to False.
"""
self.name = name
self.set_in_kwargs = set_in_kwargs
self.extra_index = extra_index

def _parameterize_name(self) -> str:
return f"${self.name}"
Expand Down
57 changes: 40 additions & 17 deletions docs/root/dev/writing-intel-modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ For the sake of consistency, if a field does not exist, set it to `None` and not

### Load

[As seen in our AWS EMR example](https://github.com/lyft/cartography/blob/e6ada9a1a741b83a34c1c3207515a1863debeeb9/cartography/intel/aws/emr.py#L113-L132), the `load` function ingests a list of dicts to Neo4j by calling [cartography.client.core.tx.load_graph_data()](https://github.com/lyft/cartography/blob/e6ada9a1a741b83a34c1c3207515a1863debeeb9/cartography/client/core/tx.py#L191-L212):
[As seen in our AWS EMR example](https://github.com/lyft/cartography/blob/e6ada9a1a741b83a34c1c3207515a1863debeeb9/cartography/intel/aws/emr.py#L113-L132), the `load` function ingests a list of dicts to Neo4j by calling [cartography.client.core.tx.load()](https://github.com/lyft/cartography/blob/e6ada9a1a741b83a34c1c3207515a1863debeeb9/cartography/client/core/tx.py#L191-L212):
```python
def load_emr_clusters(
neo4j_session: neo4j.Session,
Expand All @@ -68,24 +68,18 @@ def load_emr_clusters(
aws_update_tag: int,
) -> None:
logger.info(f"Loading EMR {len(cluster_data)} clusters for region '{region}' into graph.")

ingestion_query = build_ingestion_query(EMRClusterSchema())

load_graph_data(
load(
neo4j_session,
ingestion_query,
EMRClusterSchema(),
cluster_data,
lastupdated=aws_update_tag,
Region=region,
AccountId=current_aws_account_id,
AWS_ID=current_aws_account_id,
)

```


`load_graph_data()` requires an `ingestion_query` to be generated from `CartographyNodeSchema` and `CartographyRelSchema` objects. [cartography.graph.querybuilder.build_ingestion_query()](https://github.com/lyft/cartography/blob/e6ada9a1a741b83a34c1c3207515a1863debeeb9/cartography/graph/querybuilder.py#L312) does just that: it accepts those schema objects as input and returns a well-formed and optimized cypher query.


#### Defining a node

As an example of a `CartographyNodeSchema`, you can view our [EMRClusterSchema code](https://github.com/lyft/cartography/blob/e6ada9a1a741b83a34c1c3207515a1863debeeb9/cartography/intel/aws/emr.py#L106-L110):
Expand All @@ -110,7 +104,7 @@ Here's our [EMRClusterNodeProperties code](https://github.com/lyft/cartography/b
```python
@dataclass(frozen=True)
class EMRClusterNodeProperties(CartographyNodeProperties):
arn: PropertyRef = PropertyRef('ClusterArn')
arn: PropertyRef = PropertyRef('ClusterArn', extra_index=True)
firstseen: PropertyRef = PropertyRef('firstseen')
id: PropertyRef = PropertyRef('Id')
# ...
Expand All @@ -123,7 +117,25 @@ A `CartographyNodeProperties` object consists of [`PropertyRef`](https://github.

For example, `id: PropertyRef = PropertyRef('Id')` above tells the querybuilder to set a field called `id` on the `EMRCluster` node using the value located at key `'id'` on each dict in the list.

As another example, `region: PropertyRef = PropertyRef('Region', set_in_kwargs=True)` tells the querybuilder to set a field called `region` on the `EMRCluster` node using a keyword argument called `Region` supplied to `cartography.client.core.tx.load_graph_data()`. `set_in_kwargs=True` is useful in cases where we want every object loaded by a single call to `load_graph_data()` to have the same value for a given attribute.
As another example, `region: PropertyRef = PropertyRef('Region', set_in_kwargs=True)` tells the querybuilder to set a field called `region` on the `EMRCluster` node using a keyword argument called `Region` supplied to `cartography.client.core.tx.load()`. `set_in_kwargs=True` is useful in cases where we want every object loaded by a single call to `load()` to have the same value for a given attribute.

##### Node property indexes
Cartography uses its data model to automatically create indexes for
- node properties that uniquely identify the node (e.g. `id`)
- node properties are used to connect a node to other nodes (i.e. they are used as part of a `TargetNodeMatcher` on a `CartographyRelSchema`.)
- a node's `lastupdated` field -- this is used to enable faster cleanup jobs

As seen in the above definition for `EMRClusterNodeProperties.arn`, you can also explicitly specify additional indexes for fields that you expect to be queried on by providing `extra_index=True` to the `PropertyRef` constructor:

```python
class EMRClusterNodeProperties(CartographyNodeProperties):
# ...
arn: PropertyRef = PropertyRef('ClusterArn', extra_index=True)
```

Index creation is idempotent (we only create them if they don't exist).

See [below](#indexescypher) for more information on indexes.


#### Defining relationships
Expand Down Expand Up @@ -229,11 +241,11 @@ In this older example of ingesting GCP VPCs, we connect VPCs with GCPProjects
and [here](https://github.com/lyft/cartography/blob/8d60311a10156cd8aa16de7e1fe3e109cc3eca0f/cartography/data/indexes.cypher#L42).
All of these queries use indexes for faster lookup.

#### Create an index for new nodes
#### indexes.cypher

Older intel modules define indexes in [indexes.cypher](https://github.com/lyft/cartography/blob/8d60311a10156cd8aa16de7e1fe3e109cc3eca0f/cartography/data/indexes.cypher).
By using CartographyNodeSchema and CartographyRelSchema objects, indexes are automatically created so you don't need to update this file!

Be sure to [update the indexes.cypher file](https://github.com/lyft/cartography/blob/8d60311a10156cd8aa16de7e1fe3e109cc3eca0f/cartography/data/indexes.cypher)
with your new node type. Indexing on "`id`" is required, and indexing on anything else that will be frequently queried is
encouraged.

#### lastupdated and firstseen

Expand All @@ -242,9 +254,20 @@ On every cartography node and relationship, we set the `lastupdated` field to th
### Cleanup

We have just added new nodes and relationships to the graph, and we have also updated previously-added ones
by using `MERGE`. We now need to delete nodes and relationships that no longer exist, and we do this by simply removing
by using `MERGE`. We now need to delete nodes and relationships that no longer exist, and we do this by removing
all nodes and relationships that have `lastupdated` NOT set to the `update_tag` of this current run.

By using Cartography schema objects, a cleanup function is [trivial to write](https://github.com/lyft/cartography/blob/82e1dd0e851475381ac8f2a9a08027d67ec1d772/cartography/intel/aws/emr.py#L77-L80):

```python
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None:
logger.debug("Running EMR cleanup job.")
cleanup_job = GraphJob.from_node_schema(EMRClusterSchema(), common_job_parameters)
cleanup_job.run(neo4j_session)
```

Older intel modules still do this process with hand-written cleanup jobs that work like this:

- Delete all old nodes

You can see this in our [GCP VPCs example](https://github.com/lyft/cartography/blob/8d60311a10156cd8aa16de7e1fe3e109cc3eca0f/cartography/data/jobs/cleanup/gcp_compute_vpc_cleanup.json#L4).
Expand Down
Loading

0 comments on commit 81902b2

Please sign in to comment.