Skip to content

Commit

Permalink
Add Snowflake event store
Browse files Browse the repository at this point in the history
  • Loading branch information
np5 committed Aug 20, 2022
1 parent 41857e2 commit fae369a
Show file tree
Hide file tree
Showing 10 changed files with 802 additions and 3 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2022.3 (TBD)

### Features (some, not all…)

New `zentral.core.stores.backends.snowflake` store backend.

## 2022.2 (August 13, 2022)

**IMPORTANT:** The License has changed! Most of the code stays under the Apache license, but some modules, like the SAML authentication, or the Splunk event store are licensed under a new source available license, and require a subscription when used in production.
Expand Down
7 changes: 5 additions & 2 deletions constraints.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.2.0
click==8.1.3
cryptography==37.0.4
cryptography==36.0.2
decorator==5.1.1
defusedxml==0.7.1
django-bootstrap-form==3.4
Expand Down Expand Up @@ -88,6 +88,7 @@ pyOpenSSL==22.0.0
pyasn1-modules==0.2.8
pyasn1==0.4.8
pycparser==2.21
pycryptodomex==3.15.0
pycurl==7.44.1
pydantic==1.9.1
pylibmc==1.6.1
Expand All @@ -98,6 +99,7 @@ python-dateutil==2.8.2
python-ldap==3.4.2
pytz==2022.1
opensearch-py==2.0.0
oscrypto==1.3.0
redis==4.3.4
requests-oauthlib==1.3.1
requests==2.28.1
Expand All @@ -107,11 +109,12 @@ s3transfer==0.6.0
six==1.16.0
sniffio==1.2.0
sqlparse==0.4.2
snowflake-connector-python==2.7.11
stack-data==0.3.0
tqdm==4.64.0
traitlets==5.3.0
typing_extensions==4.3.0
urllib3==1.26.10
urllib3==1.26.11
vine==5.0.0
wcwidth==0.2.5
webauthn==1.6.0
Expand Down
66 changes: 66 additions & 0 deletions docs/configuration/stores.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The python module implementing the store, as a string. Currently available:
* `zentral.core.stores.backends.humio`
* `zentral.core.stores.backends.kinesis`
* `zentral.core.stores.backends.opensearch`
* `zentral.core.stores.backends.snowflake`
* `zentral.core.stores.backends.splunk`
* `zentral.core.stores.backends.sumo_logic`
* `zentral.core.stores.backends.syslog`
Expand Down Expand Up @@ -241,6 +242,71 @@ An integer between 1 and 20, 1 by default. The number of threads to use when pos
}
```

## Snowflake backend options

The Snowflake backend is read-only. It can only be used as a `frontend` backend. To store the events in snowflake, you will have to setup a pipeline using the `Kinesis` backend, and `Kinesis Firehose` for example.

### `account`

**MANDATORY**

The name of the Snowflake account

### `user`

**MANDATORY**

The name of the Snowflake user

### `password`

**MANDATORY**

The password of the Snowflake user

### `database`

**MANDATORY**

The name of the Snowflake database

### `schema`

The name of the Snowflake schema. Defaults to `PUBLIC`.

### `role`

**MANDATORY**

The name of the Snowflake role.

### `warehouse`

**MANDATORY**

The name of the Snowflake warehouse.

### `session_timeout`

In seconds, the session timeout. After the current session has timed out, a new connection will be established if necessary. Defaults to 4 hours - 10 minutes.

### Full example

```json
{
"backend": "zentral.core.stores.backends.snowflake",
"frontend": true,
"username": "Zentral",
"password": "{{ env:SNOWFLAKE_PASSWORD }}",
"database": "ZENTRAL",
"schema": "ZENTRAL",
"role": "ZENTRAL",
"warehouse": "DEFAULTWH",
"session_timeout": 14400
}
```


## Splunk backend options

### `hec_url`
Expand Down
258 changes: 258 additions & 0 deletions ee/zentral/core/stores/backends/snowflake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
from datetime import timedelta
import json
import logging
import time
from django.utils import timezone
import snowflake.connector
from snowflake.connector import DictCursor
from zentral.core.events import event_from_event_d, event_types
from zentral.core.exceptions import ImproperlyConfigured
from zentral.core.stores.backends.base import BaseEventStore


logger = logging.getLogger("zentral.core.stores.backends.snowflake")


class EventStore(BaseEventStore):
read_only = True
last_machine_heartbeats = True
machine_events = True
object_events = True
probe_events = True

def __init__(self, config_d):
super().__init__(config_d)
self._connect_kwargs = {}
# connection parameters
missing_params = []
for k in ("account", "user", "password", "database", "schema", "role", "warehouse"):
v = config_d.get(k)
if not v:
if k == "schema":
v = "PUBLIC"
else:
missing_params.append(k)
continue
self._connect_kwargs[k] = v
if missing_params:
raise ImproperlyConfigured("Missing configuration parameters: {}".format(", ".join(missing_params)))
# connection
self._connection = None
self._last_active_at = time.monotonic()
self._session_timeout = config_d.get(
"session_timeout",
4*3600-10*60 # 4 hours (Snowflake default) - 10 min
)

def _get_connection(self):
if self._connection is None or (time.monotonic() - self._last_active_at) > self._session_timeout:
account = self._connect_kwargs["account"]
if self._connection is None:
action = "Connect"
else:
logger.info("Close current connection to account %s", account)
self._connection.close()
action = "Re-connect"
logger.info("%s to account %s", action, account)
self._connection = snowflake.connector.connect(**self._connect_kwargs)
self._last_active_at = time.monotonic()
return self._connection

def _deserialize_event(self, result):
metadata = json.loads(result['METADATA'])
metadata['type'] = result['TYPE']
metadata['created_at'] = result['CREATED_AT']
metadata['tags'] = json.loads(result['TAGS'])
metadata['objects'] = {}
for objref in json.loads(result['OBJECTS']):
k, v = objref.split(":", 1)
metadata['objects'].setdefault(k, []).append(v)
metadata['serial_number'] = result['SERIAL_NUMBER']
event_d = json.loads(result.pop("PAYLOAD"))
event_d['_zentral'] = metadata
return event_from_event_d(event_d)

def _prepare_query(self, query, args=None, **kwargs):
if args is None:
args = []
first_filter = True
for attr, filter_tmpl in (("from_dt", "AND created_at >= %s"),
("to_dt", "AND created_at <= %s"),
("event_type", "AND type = %s"),
("objref", "AND ARRAY_CONTAINS(%s::variant, objects)"),
("probe", "AND ARRAY_CONTAINS(%s::variant, probes)"),
("serial_number", "AND serial_number = %s"),
("order_by", None),
("limit", "LIMIT %s"),
("offset", "OFFSET %s")):
val = kwargs.get(attr)
if val is not None:
if attr == "order_by":
query += f" ORDER BY {val}"
else:
if first_filter and filter_tmpl.startswith("AND "):
filter_tmpl = f"WHERE {filter_tmpl[4:]}"
query += f" {filter_tmpl}"
args.append(val)
first_filter = False
return query, args

def _fetch_aggregated_event_counts(self, **kwargs):
query, args = self._prepare_query("SELECT TYPE, COUNT(*) AS COUNT FROM ZENTRALEVENTS", **kwargs)
query += " GROUP BY type"
cursor = self._get_connection().cursor(DictCursor)
cursor.execute(query, args)
event_counts = {
r['TYPE']: r['COUNT']
for r in cursor.fetchall()
}
cursor.close()
return event_counts

def _fetch_events(self, **kwargs):
kwargs["order_by"] = "CREATED_AT DESC"
offset = int(kwargs.pop("cursor", None) or 0)
if offset > 0:
kwargs["offset"] = offset
query, args = self._prepare_query("SELECT * FROM ZENTRALEVENTS", **kwargs)
cursor = self._get_connection().cursor(DictCursor)
cursor.execute(query, args)
events = [self._deserialize_event(t) for t in cursor.fetchall()]
cursor.close()
next_cursor = None
limit = kwargs.get("limit")
if limit and len(events) >= limit:
next_cursor = str(limit + kwargs.get("offset", 0))
return events, next_cursor

# machine events

def fetch_machine_events(self, serial_number, from_dt, to_dt=None, event_type=None, limit=10, cursor=None):
return self._fetch_events(
serial_number=serial_number,
from_dt=from_dt,
to_dt=to_dt,
event_type=event_type,
limit=limit,
cursor=cursor
)

def get_aggregated_machine_event_counts(self, serial_number, from_dt, to_dt=None):
return self._fetch_aggregated_event_counts(
serial_number=serial_number,
from_dt=from_dt,
to_dt=to_dt
)

def get_last_machine_heartbeats(self, serial_number, from_dt):
heartbeats = {}
query = (
"SELECT TYPE, MAX(CREATED_AT) LAST_SEEN,"
"PAYLOAD:source.name::varchar SOURCE_NAME, NULL USER_AGENT "
"FROM ZENTRALEVENTS "
"WHERE CREATED_AT >= %s "
"AND TYPE = 'inventory_heartbeat' "
"AND SERIAL_NUMBER = %s "
"GROUP BY TYPE, SOURCE_NAME, USER_AGENT "

"UNION "

"SELECT TYPE, MAX(CREATED_AT) LAST_SEEN,"
"NULL SOURCE_NAME, METADATA:request.user_agent::varchar USER_AGENT "
"FROM ZENTRALEVENTS "
"WHERE CREATED_AT >= %s "
"AND TYPE <> 'inventory_heartbeat' "
"AND ARRAY_CONTAINS('heartbeat'::variant, TAGS) "
"AND SERIAL_NUMBER = %s "
"GROUP BY TYPE, SOURCE_NAME, USER_AGENT"
)
args = [from_dt, serial_number, from_dt, serial_number]
cursor = self._get_connection().cursor(DictCursor)
cursor.execute(query, args)
for t in cursor.fetchall():
event_class = event_types.get(t['TYPE'])
if not event_class:
logger.error("Unknown event type %s", t['TYPE'])
continue
key = (event_class, t['SOURCE_NAME'])
heartbeats.setdefault(key, []).append((t['USER_AGENT'], t['LAST_SEEN']))
cursor.close()
return [
(event_class, source_name, ua_max_dates)
for (event_class, source_name), ua_max_dates in heartbeats.items()
]

# object events

def fetch_object_events(self, key, val, from_dt, to_dt=None, event_type=None, limit=10, cursor=None):
return self._fetch_events(
objref=f"{key}:{val}",
from_dt=from_dt,
to_dt=to_dt,
event_type=event_type,
limit=limit,
cursor=cursor
)

def get_aggregated_object_event_counts(self, key, val, from_dt, to_dt=None):
return self._fetch_aggregated_event_counts(
objref=f"{key}:{val}",
from_dt=from_dt,
to_dt=to_dt
)

# probe events

def fetch_probe_events(self, probe, from_dt, to_dt=None, event_type=None, limit=10, cursor=None):
return self._fetch_events(
probe=probe.pk,
from_dt=from_dt,
to_dt=to_dt,
event_type=event_type,
limit=limit,
cursor=cursor
)

def get_aggregated_probe_event_counts(self, probe, from_dt, to_dt=None):
return self._fetch_aggregated_event_counts(
probe=probe.pk,
from_dt=from_dt,
to_dt=to_dt
)

# zentral apps data

def get_app_hist_data(self, interval, bucket_number, tag):
data = []
query = (
"SELECT COUNT(*) EVENT_COUNT, COUNT(DISTINCT SERIAL_NUMBER) MACHINE_COUNT,"
"DATE_TRUNC(%s, CREATED_AT) BUCKET "
"FROM ZENTRALEVENTS "
"WHERE ARRAY_CONTAINS(%s::variant, TAGS) "
"GROUP BY BUCKET ORDER BY BUCKET DESC"
)
if interval == "day":
args = ["DAY", tag]
last_value = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0)
delta = timedelta(days=1)
elif interval == "hour":
args = ["HOUR", tag]
last_value = timezone.now().replace(minute=0, second=0, microsecond=0)
delta = timedelta(hours=1)
else:
logger.error("Unsupported interval %s", interval)
return data
cursor = self._get_connection().cursor(DictCursor)
cursor.execute(query, args)
results = {
t['BUCKET']: (t['EVENT_COUNT'], t['MACHINE_COUNT'])
for t in cursor.fetchall()
}
cursor.close()
for bucket in (last_value - i * delta for i in range(bucket_number - 1, -1, -1)):
try:
event_count, machine_count = results[bucket]
except KeyError:
event_count = machine_count = 0
data.append((bucket, event_count, machine_count))
return data
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pyyaml
requests
requests_oauthlib # MDM DEP
setuptools
snowflake-connector-python
sqlparse # SQL syntax highlighting
tqdm
XlsxWriter
Expand Down

0 comments on commit fae369a

Please sign in to comment.