Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dbt duckdb plugin #80

Merged
merged 2 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ and workstations for sensitive activities).
*Note: If you create/use a Github Codespace on any of the wagov repos,
SQU_CONFIG should be configured automatically.*

Before using, config needs to be loaded into `squ.core.cache`, which can
be done automatically from json in a keyvault by setting the env var
`SQU_CONFIG` to `"keyvault/tenantid"`.
Before using, config needs to be loaded into `nbdev_squ.core.cache`,
which can be done automatically from json in a keyvault by setting the
env var `SQU_CONFIG` to `"keyvault/tenantid"`.

``` bash
export SQU_CONFIG="{{ keyvault }}/{{ tenantid }}"
Expand Down
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ npm ci
npm run build
pip install build==1.2.1
python -m build
# To release to pypi run below after a build
# To release to pypi run below after a build. Make sure to bump version with nbdev_bump_version.
# twine upload --skip-existing dist/*
4 changes: 1 addition & 3 deletions dbt_example_project/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,5 @@ clean-targets: # directories to be removed by `dbt clean`
models:
dbt_example_project:
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view
squ:
+materialized: table
+materialized: view
27 changes: 0 additions & 27 deletions dbt_example_project/models/example/my_first_dbt_model.sql

This file was deleted.

6 changes: 0 additions & 6 deletions dbt_example_project/models/example/my_second_dbt_model.sql

This file was deleted.

21 changes: 0 additions & 21 deletions dbt_example_project/models/example/schema.yml

This file was deleted.

7 changes: 7 additions & 0 deletions dbt_example_project/models/squ/T1547_001.kql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
let selection= dynamic(['reg',' ADD', @'Software\Microsoft\Windows\CurrentVersion\Run']);
let filter_known = dynamic(['Discord.exe','Skype.exe','LiveChat.exe','Promethean Desktop.exe']);
DeviceProcessEvents
| where ActionType == "ProcessCreated"
| where ProcessCommandLine has_all (selection)
| where InitiatingProcessFileName !in (filter_known) //Known False-Positive
| where ProcessCommandLine !contains "PaperCut"
12 changes: 0 additions & 12 deletions dbt_example_project/models/squ/config.yml

This file was deleted.

1 change: 1 addition & 0 deletions dbt_example_project/models/squ/hunt.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from {{source('kql_source', 'T1547_001')}}
15 changes: 15 additions & 0 deletions dbt_example_project/models/squ/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: 2

sources:
- name: kql_source
config:
plugin: squ
meta:
kql_path: "models/squ/{name}.kql"
tables:
- name: T1547_001

models:
- name: hunt
config:
materialized: table
4 changes: 0 additions & 4 deletions dbt_example_project/models/squ/squ_python_model.py

This file was deleted.

11 changes: 4 additions & 7 deletions dbt_example_project/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ default:
outputs:
dev:
type: duckdb
path: dev.duckdb
threads: 1

prod:
type: duckdb
path: prod.duckdb
threads: 4
path: target/dev.duckdb
plugins:
- module: nbdev_squ.api
alias: squ

target: dev
1 change: 0 additions & 1 deletion install.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# This is meant to be run to setup and prep for committing a release
# use nbdev_bump_version to increment the version itself then rerun this to update README.md _docs etc
from subprocess import run
import configparser, re

run(["pip", "install", "nbdev"])
run(["pip", "install", "-e", "."]) # get current project in dev mode
Expand Down
2 changes: 1 addition & 1 deletion nbdev_squ/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.3.5"
__version__ = "1.3.6"
6 changes: 6 additions & 0 deletions nbdev_squ/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
'nbdev_squ.api.Clients.jira': ('api.html#clients.jira', 'nbdev_squ/api.py'),
'nbdev_squ.api.Clients.runzero': ('api.html#clients.runzero', 'nbdev_squ/api.py'),
'nbdev_squ.api.Clients.tio': ('api.html#clients.tio', 'nbdev_squ/api.py'),
'nbdev_squ.api.Plugin': ('api.html#plugin', 'nbdev_squ/api.py'),
'nbdev_squ.api.Plugin.configure_cursor': ('api.html#plugin.configure_cursor', 'nbdev_squ/api.py'),
'nbdev_squ.api.Plugin.default_materialization': ( 'api.html#plugin.default_materialization',
'nbdev_squ/api.py'),
'nbdev_squ.api.Plugin.initialize': ('api.html#plugin.initialize', 'nbdev_squ/api.py'),
'nbdev_squ.api.Plugin.load': ('api.html#plugin.load', 'nbdev_squ/api.py'),
'nbdev_squ.api.atlaskit_transformer': ('api.html#atlaskit_transformer', 'nbdev_squ/api.py'),
'nbdev_squ.api.chunks': ('api.html#chunks', 'nbdev_squ/api.py'),
'nbdev_squ.api.finalise_query': ('api.html#finalise_query', 'nbdev_squ/api.py'),
Expand Down
34 changes: 33 additions & 1 deletion nbdev_squ/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
# %% auto 0
__all__ = ['logger', 'clients', 'columns_of_interest', 'columns', 'Clients', 'list_workspaces_safe', 'list_workspaces',
'list_subscriptions', 'list_securityinsights_safe', 'list_securityinsights', 'chunks', 'loganalytics_query',
'query_all', 'finalise_query', 'hunt', 'atlaskit_transformer', 'security_incidents', 'security_alerts']
'query_all', 'finalise_query', 'hunt', 'atlaskit_transformer', 'security_incidents', 'security_alerts',
'Plugin']

# %% ../nbs/01_api.ipynb 3
import pandas, json, logging, time, requests, io, pkgutil, httpx_cache
from .core import *
from pathlib import Path
from diskcache import memoize_stampede
from importlib.metadata import version
from subprocess import run, CalledProcessError
Expand All @@ -17,6 +19,7 @@
from functools import cached_property
from atlassian import Jira
from tenable.io import TenableIO
from dbt.adapters.duckdb.plugins import BasePlugin, SourceConfig

# %% ../nbs/01_api.ipynb 5
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -268,3 +271,32 @@ def security_alerts(start=pandas.Timestamp("now", tz="UTC") - pandas.Timedelta("
# Sorts by TimeGenerated (TODO)
query = "SecurityAlert | summarize arg_max(TimeGenerated, *) by SystemAlertId"
return query_all(query, timespan=(start.to_pydatetime(), timedelta))

# %% ../nbs/01_api.ipynb 28
class Plugin(BasePlugin):
def initialize(self, config):
login()

def configure_cursor(self, cursor):
pass

def load(self, source_config: SourceConfig):
if "kql_path" in source_config:
kql_path = source_config["kql_path"]
kql_path = kql_path.format(**source_config.as_dict())
query = Path(kql_path).read_text()
return query_all(query, timespan=pandas.Timedelta(source_config.get("timespan", "14d")))
raise Exception("huh")
elif "list_workspaces" in source_config: # untested
return list_workspaces()
elif "client_api" in source_config: # untested
api_result = getattr(clients, source_config["client_api"])(**json.loads(source_config.get("kwargs", "{}")))
if isinstance(api_result, pandas.DataFrame):
return api_result
else:
return pandas.DataFrame(api_result)
else:
raise Exception("No valid config found for squ plugin (kql_path or api required)")

def default_materialization(self):
return "view"
110 changes: 109 additions & 1 deletion nbs/01_api.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"#| export\n",
"import pandas, json, logging, time, requests, io, pkgutil, httpx_cache\n",
"from nbdev_squ.core import *\n",
"from pathlib import Path\n",
"from diskcache import memoize_stampede\n",
"from importlib.metadata import version\n",
"from subprocess import run, CalledProcessError\n",
Expand All @@ -45,7 +46,8 @@
"from benedict import benedict\n",
"from functools import cached_property\n",
"from atlassian import Jira\n",
"from tenable.io import TenableIO"
"from tenable.io import TenableIO\n",
"from dbt.adapters.duckdb.plugins import BasePlugin, SourceConfig"
]
},
{
Expand Down Expand Up @@ -499,6 +501,112 @@
"df = security_incidents(start=pandas.Timestamp(\"now\", tz=\"UTC\") - pandas.Timedelta(\"14d\"), timedelta=pandas.Timedelta(\"14d\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## dbt-duckdb plugin\n",
"\n",
"The below squ plugin makes querying kql in duckdb projects via the [DuckDB user-defined function (UDF)](https://duckdb.org/docs/api/python/function.html) interface much easier. This could be extended to other clients pretty easily, just have to make sure data is returned as a dataframe. To use it there are a few dbt project files that need to be configured:\n",
"\n",
"### DBT ./profiles.yml\n",
"\n",
"See [DBT Connection Profiles](https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles)\n",
"\n",
"```yaml\n",
"default:\n",
" outputs:\n",
" dev:\n",
" type: duckdb\n",
" path: target/dev.duckdb\n",
" plugins: \n",
" - module: nbdev_squ.api\n",
" alias: squ\n",
"\n",
" target: dev\n",
"```\n",
"\n",
"### DBT ./models/squ/schema.yml\n",
"\n",
"See [DBT Add sources to your DAG](https://docs.getdbt.com/docs/build/sources) for how to add 'externally defined' sources, this is using the code below based on the [dbt-duckdb plugin](https://github.com/duckdb/dbt-duckdb?tab=readme-ov-file#writing-your-own-plugins) architecture\n",
"\n",
"```yaml\n",
"version: 2\n",
"\n",
"sources:\n",
" - name: kql_source\n",
" config:\n",
" plugin: squ\n",
" meta:\n",
" kql_path: \"models/squ/{name}.kql\"\n",
" tables:\n",
" - name: T1547_001\n",
"\n",
"models:\n",
" - name: hunt\n",
" config:\n",
" materialized: table\n",
"```\n",
"\n",
"Once the source is defined, dbt cli tools and other sql models can refer to it, the dbt-duckdb framework makes it available as a referencable view usable throughout the project:\n",
"\n",
"#### DBT ./models/squ/hunt.sql\n",
"\n",
"See [DBT SQL models](https://docs.getdbt.com/docs/build/sql-models) for how to write the select statement templates DBT organises into the DAG\n",
"\n",
"```sql\n",
"select * from {{source('kql_source', 'T1547_001')}}\n",
"```\n",
"\n",
"#### DBT cli usage\n",
"\n",
"See [About the dbt run command](https://docs.getdbt.com/reference/commands/run) (can use `--empty` to validate before a full run)\n",
"\n",
"```bash\n",
"cd dbt_example_project\n",
"dbt run # will build the whole dag including any materialisations like the hunt table above\n",
"dbt show --inline \"select * from {{ source('kql_source', 'T1547_001') }}\" # will use live source\n",
"dbt show --inline \"select * from {{ ref('hunt') }}\" # will use materialized table in db built by `dbt run`\n",
"dbt docs generate # will build documentation for the project\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| exports\n",
"class Plugin(BasePlugin):\n",
" def initialize(self, config):\n",
" login()\n",
"\n",
" def configure_cursor(self, cursor):\n",
" pass\n",
"\n",
" def load(self, source_config: SourceConfig):\n",
" if \"kql_path\" in source_config:\n",
" kql_path = source_config[\"kql_path\"]\n",
" kql_path = kql_path.format(**source_config.as_dict())\n",
" query = Path(kql_path).read_text()\n",
" return query_all(query, timespan=pandas.Timedelta(source_config.get(\"timespan\", \"14d\")))\n",
" raise Exception(\"huh\")\n",
" elif \"list_workspaces\" in source_config: # untested\n",
" return list_workspaces()\n",
" elif \"client_api\" in source_config: # untested\n",
" api_result = getattr(clients, source_config[\"client_api\"])(**json.loads(source_config.get(\"kwargs\", \"{}\")))\n",
" if isinstance(api_result, pandas.DataFrame):\n",
" return api_result\n",
" else:\n",
" return pandas.DataFrame(api_result)\n",
" else:\n",
" raise Exception(\"No valid config found for squ plugin (kql_path or api required)\")\n",
"\n",
" def default_materialization(self):\n",
" return \"view\""
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
2 changes: 1 addition & 1 deletion nbs/index.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"source": [
"*Note: If you create/use a Github Codespace on any of the wagov repos, SQU_CONFIG should be configured automatically.*\n",
"\n",
"Before using, config needs to be loaded into `squ.core.cache`, which can be done automatically from json in a keyvault by setting the env var `SQU_CONFIG` to `\"keyvault/tenantid\"`.\n",
"Before using, config needs to be loaded into `nbdev_squ.core.cache`, which can be done automatically from json in a keyvault by setting the env var `SQU_CONFIG` to `\"keyvault/tenantid\"`.\n",
"\n",
"```bash\n",
"export SQU_CONFIG=\"{{ keyvault }}/{{ tenantid }}\"\n",
Expand Down
Loading