Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add database creation/deletion for Influx 1.8 #544

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
89 changes: 85 additions & 4 deletions influxdb_client/client/bucket_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
A bucket belongs to an organization.
"""
import warnings

from influxdb_client import BucketsService, Bucket, PostBucketRequest, PatchBucketRequest
from influxdb_client.client.util.helpers import get_org_query_param

Expand All @@ -20,13 +19,12 @@ def __init__(self, influxdb_client):

def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_rules=None,
description=None, org=None) -> Bucket:
"""Create a bucket.
"""Create a bucket. Database creation via v1 API as fallback.

:param Bucket|PostBucketRequest bucket: bucket to create
:param bucket_name: bucket name
:param description: bucket description
:param org_id: org_id
:param bucket_name: bucket name
:param retention_rules: retention rules array or single BucketRetentionRules
:param str, Organization org: specifies the organization for create the bucket;
Take the ``ID``, ``Name`` or ``Organization``.
Expand All @@ -35,6 +33,13 @@ def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_ru
If the method is called asynchronously,
returns the request thread.
"""
if self._buckets_service._is_below_v2():
# Fall back to v1 API if buckets are not supported
warnings.warn("InfluxDB versions below v2.0 are deprecated. " +
"Falling back to CREATE DATABASE statement", DeprecationWarning)
database_name = bucket_name if bucket_name is not None else bucket
return self._create_database(database=database_name)

if retention_rules is None:
retention_rules = []

Expand All @@ -58,6 +63,41 @@ def create_bucket(self, bucket=None, bucket_name=None, org_id=None, retention_ru

return self._buckets_service.post_buckets(post_bucket_request=bucket)

def _create_database(self, database=None):
"""Create a database at the v1 api (legacy).

:param database_name: name of the new database
:return: tuple(response body, status code, header dict)
"""
if database is None:
raise ValueError("Invalid value for `database`, must be defined.")

# Hedaer and local_var_params for standard procedures only
header_params = {}
header_params['Accept'] = self._influxdb_client.api_client.select_header_accept(
['application/json'])
header_params['Content-Type'] = self._influxdb_client.api_client.select_header_content_type(
['application/json'])
local_var_params = locals()
local_var_params['kwargs'] = {}
all_params = []
self._buckets_service._check_operation_params(
"create_database", all_params, local_var_params
)

return self._influxdb_client.api_client.call_api(
'/query', 'POST',
header_params=header_params,
path_params={}, post_params=[],
files={}, auth_settings=[], collection_formats={},
query_params={'q': f'CREATE DATABASE {database}'},
async_req=local_var_params.get('async_req'),
_return_http_data_only=local_var_params.get('_return_http_data_only'), # noqa: E501
_preload_content=local_var_params.get('_preload_content', True),
_request_timeout=local_var_params.get('_request_timeout'),
urlopen_kw=None
)

def update_bucket(self, bucket: Bucket) -> Bucket:
"""Update a bucket.

Expand All @@ -71,7 +111,7 @@ def update_bucket(self, bucket: Bucket) -> Bucket:
return self._buckets_service.patch_buckets_id(bucket_id=bucket.id, patch_bucket_request=request)

def delete_bucket(self, bucket):
"""Delete a bucket.
"""Delete a bucket. Delete a database via v1 API as fallback.

:param bucket: bucket id or Bucket
:return: Bucket
Expand All @@ -81,8 +121,49 @@ def delete_bucket(self, bucket):
else:
bucket_id = bucket

if self._buckets_service._is_below_v2():
# Fall back to v1 API if buckets are not supported
warnings.warn("InfluxDB versions below v2.0 are deprecated. " +
"Falling back to DROP DATABASE statement", DeprecationWarning)
return self._delete_database(database=bucket_id)

return self._buckets_service.delete_buckets_id(bucket_id=bucket_id)

def _delete_database(self, database=None):
"""Delete a database at the v1 api (legacy).

:param database_name: name of the database to delete
:return: tuple(response body, status code, header dict)
"""
if database is None:
raise ValueError("Invalid value for `database`, must be defined.")

# Hedaer and local_var_params for standard procedures only
header_params = {}
header_params['Accept'] = self._influxdb_client.api_client.select_header_accept(
['application/json'])
header_params['Content-Type'] = self._influxdb_client.api_client.select_header_content_type(
['application/json'])
local_var_params = locals()
local_var_params['kwargs'] = {}
all_params = []
self._buckets_service._check_operation_params(
"drop_database", all_params, local_var_params
)

return self._influxdb_client.api_client.call_api(
'/query', 'POST',
header_params=header_params,
path_params={}, post_params=[],
files={}, auth_settings=[], collection_formats={},
query_params={'q': f'DROP DATABASE {database}'},
async_req=local_var_params.get('async_req'),
_return_http_data_only=local_var_params.get('_return_http_data_only'),
_preload_content=local_var_params.get('_preload_content', True),
_request_timeout=local_var_params.get('_request_timeout'),
urlopen_kw=None
)

def find_bucket_by_id(self, id):
"""Find bucket by ID.

Expand Down
35 changes: 35 additions & 0 deletions influxdb_client/service/_base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def __init__(self, api_client=None):
raise ValueError("Invalid value for `api_client`, must be defined.")
self.api_client = api_client
self._build_type = None
self._build_version = None

def _check_operation_params(self, operation_id, supported_params, local_params):
supported_params.append('async_req')
Expand All @@ -35,6 +36,16 @@ async def _is_cloud_instance_async(self) -> bool:
self._build_type = await self.build_type_async()
return 'cloud' in self._build_type.lower()

def _is_below_v2(self) -> bool:
if self._build_version is None:
self._build_version = self.build_version()
return self._build_version < '2'

async def _is_below_v2_async(self) -> bool:
if self._build_version is None:
self._build_version = await self.build_version()
return self._build_version < '2'

def build_type(self) -> str:
"""
Return the build type of the connected InfluxDB Server.
Expand All @@ -59,6 +70,30 @@ async def build_type_async(self) -> str:
response = await ping_service.get_ping_async(_return_http_data_only=False)
return self.response_header(response, header_name='X-Influxdb-Build')

def build_version(self) -> str:
"""
Return the version number of the connected InfluxDB Server.

:return: Version number of InfluxDB build.
"""
from influxdb_client import PingService
ping_service = PingService(self.api_client)

response = ping_service.get_ping_with_http_info(_return_http_data_only=False)
return self.response_header(response, header_name='X-Influxdb-Version')

async def build_version_async(self) -> str:
"""
Return the version number of the connected InfluxDB Server.

:return: Version number of InfluxDB build.
"""
from influxdb_client import PingService
ping_service = PingService(self.api_client)

response = await ping_service.get_ping_async(_return_http_data_only=False)
return self.response_header(response, header_name='X-Influxdb-Version')

def response_header(self, response, header_name='X-Influxdb-Version') -> str:
if response is not None and len(response) >= 3:
if header_name in response[2]:
Expand Down