Skip to content

Commit

Permalink
[WIP] indexing fixes for hypertables
Browse files Browse the repository at this point in the history
  • Loading branch information
sdebruyn committed Apr 1, 2024
1 parent af24589 commit fb2ad23
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 1 deletion.
2 changes: 2 additions & 0 deletions dbt/adapters/timescaledb/timescaledb_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
TimescaleDBConnectionManager,
)
from dbt.adapters.timescaledb.timescaledb_index_config import TimescaleDBIndexConfig
from dbt.adapters.timescaledb.timescaledb_relation import TimescaleDBRelation


class TimescaleDBAdapter(PostgresAdapter):
ConnectionManager = TimescaleDBConnectionManager
Relation = TimescaleDBRelation

@available
def parse_index(self, raw_index: Any) -> Optional[TimescaleDBIndexConfig]:
Expand Down
44 changes: 44 additions & 0 deletions dbt/adapters/timescaledb/timescaledb_relation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from dataclasses import dataclass
from typing import Optional, Set

import agate

from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.postgres.relation import PostgresRelation
from dbt.adapters.postgres.relation_configs import (
PostgresIndexConfig,
PostgresIndexConfigChange,
)
from dbt.adapters.relation_configs import (
RelationResults,
)


@dataclass(frozen=True, eq=False, repr=False)
class TimescaleDBRelation(PostgresRelation):
def get_hypertable_index_changes(
self, relation_results: RelationResults, relation_config: RelationConfig
) -> Optional[Set[PostgresIndexConfigChange]]:
if not relation_results:
return None

index_rows: agate.Table = relation_results.get("indexes", agate.Table(rows={}))
index_dicts = [PostgresIndexConfig.parse_relation_results(index) for index in index_rows.rows]
index_list = [PostgresIndexConfig.from_dict(index) for index in index_dicts]
filtered_list = [
index
for index in index_list
if not (
not index.unique
and index.method == "btree"
and len(index.column_names) == 1
and index.name.endswith("_idx")
)
]
index_set = frozenset(filtered_list)

indexes_from_config = relation_config.config.get("indexes", [])
parsed_from_config = [PostgresIndexConfig.parse_model_node(index) for index in indexes_from_config]
set_from_config = frozenset(parsed_from_config)

return self._get_index_config_changes(index_set, set_from_config)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{% materialization virtual_hypertable, adapter="timescaledb" %}

{%- set target_relation = this.incorporate(type=this.Table) -%}
{%- set existing_relation = load_cached_relation(target_relation) -%}
{%- set index_changes = get_virtual_hypertable_index_changes(existing_relation, config) -%}

{%- set grant_config = config.get('grants') -%}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
Expand All @@ -24,7 +26,9 @@

{%- endcall %}

{% do create_indexes(target_relation) %}
{%- if index_changes %}
{{ timescaledb__update_indexes_on_virtual_hypertable(target_relation, index_changes) }}
{%- endif %}

{%- call statement("clear_reorder_policy") %}
{{ clear_reorder_policy(target_relation) }}
Expand Down
29 changes: 29 additions & 0 deletions dbt/include/timescaledb/macros/relations/hypertable.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,32 @@
migrate_data => true {# Required since dbt models will always contain data #}
);
{% endmacro %}

{%- macro timescaledb__update_indexes_on_virtual_hypertable(relation, index_changes) -%}
{%- for _index_change in index_changes -%}
{%- set _index = _index_change.context -%}

{%- if _index_change.action == "drop" -%}

{{ postgres__get_drop_index_sql(relation, _index.name) }};

{%- elif _index_change.action == "create" -%}

{{ postgres__get_create_index_sql(relation, _index.as_node_config) }}

{%- endif -%}

{%- endfor -%}

{%- endmacro -%}

{% macro describe_hypertable(relation) %}
{% set _indexes = run_query(get_show_indexes_sql(relation)) %}
{% do return({'indexes': _indexes}) %}
{% endmacro %}

{% macro get_virtual_hypertable_index_changes(existing_relation, new_config) %}
{% set _existing_hypertable = describe_hypertable(existing_relation) %}
{% set _index_changes = existing_relation.get_hypertable_index_changes(_existing_hypertable, new_config.model) %}
{% do return(_index_changes) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import Any

import pytest

from dbt.tests.fixtures.project import TestProjInfo
from dbt.tests.util import run_dbt


class TestVirtualHypertableIndexUpdates:
@pytest.fixture(scope="class")
def models(self) -> dict[str, Any]:
return {
"vht.sql": """
{% if var("add_index", false) %}
{{ config(indexes=[{'columns': ['col_1'], 'type': 'hash'}]) }}
{% endif %}
--
"""
}

@pytest.fixture(scope="class")
def project_config_update(self) -> dict[str, Any]:
return {
"name": "virtual_hypertable_tests",
"models": {"virtual_hypertable_tests": {"vht": {"+materialized": "virtual_hypertable"}}},
}

def find_indexes(self, project: TestProjInfo, unique_schema: str) -> list[str]:
indexes = project.run_sql(
f"""
select *
from pg_indexes
where tablename = 'vht'
and schemaname = '{unique_schema}'""",
fetch="all",
)
table_names = [job[2] for job in indexes]
return table_names

def test_virtual_hypertable_index_updates(self, project: TestProjInfo, unique_schema: str) -> None:
project.run_sql(f"""
create table {unique_schema}.vht (time_column timestamp, col_1 int);
select create_hypertable('{unique_schema}.vht', by_range('time_column'));""")
results = run_dbt(["run"])
assert len(results) == 1
assert len(self.find_indexes(project, unique_schema)) == 1

run_enable_results = run_dbt(["run", "--vars", "add_index: true"])
assert len(run_enable_results) == 1
assert len(self.find_indexes(project, unique_schema)) == 2

run_disable_results = run_dbt(["run"])
assert len(run_disable_results) == 1
assert len(self.find_indexes(project, unique_schema)) == 1

run_enable_results = run_dbt(["run", "--vars", "add_index: true"])
assert len(run_enable_results) == 1
assert len(self.find_indexes(project, unique_schema)) == 2

run_disable_results = run_dbt(["run"])
assert len(run_disable_results) == 1
assert len(self.find_indexes(project, unique_schema)) == 1

0 comments on commit fb2ad23

Please sign in to comment.