Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,15 +589,20 @@ def get_list_users(self):
"""
return list(self.query("SHOW USERS").get_points())

def create_user(self, username, password):
def create_user(self, username, password, admin=False):
"""Create a new user in InfluxDB

:param username: the new username to create
:type username: str
:param password: the password for the new user
:type password: str
:param admin: whether the user should have cluster administration
privileges or not
:type admin: boolean
"""
text = "CREATE USER {} WITH PASSWORD '{}'".format(username, password)
if admin:
text += ' WITH ALL PRIVILEGES'
self.query(text)

def drop_user(self, username):
Expand All @@ -620,29 +625,27 @@ def set_user_password(self, username, password):
text = "SET PASSWORD FOR {} = '{}'".format(username, password)
self.query(text)

def delete_series(self, id, database=None):
"""Delete series from a database.
def delete_series(self, database=None, measurement=None, tags=None):
"""Delete series from a database. Series can be filtered by
measurement and tags.

:param id: the id of the series to be deleted
:type id: int
:param measurement: Delete all series from a measurement
:type id: string
:param tags: Delete all series that match given tags
:type id: dict
:param database: the database from which the series should be
deleted, defaults to client's current database
:type database: str
"""
database = database or self._database
self.query('DROP SERIES %s' % id, database=database)

def grant_admin_privileges(self, username):
"""Grant cluster administration privileges to an user.

:param username: the username to grant privileges to
:type username: str

.. note:: Only a cluster administrator can create/ drop databases
and manage users.
"""
text = "GRANT ALL PRIVILEGES TO {}".format(username)
self.query(text)
query_str = 'DROP SERIES'
if measurement:
query_str += ' FROM "{}"'.format(measurement)

if tags:
query_str += ' WHERE ' + ' and '.join(["{}='{}'".format(k, v)
for k, v in tags.items()])
self.query(query_str, database=database)

def revoke_admin_privileges(self, username):
"""Revoke cluster administration privileges from an user.
Expand Down
25 changes: 2 additions & 23 deletions tests/influxdb/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ def test_write_points_batch(self):
"region": "us-west"},
batch_size=2)
self.assertEqual(m.call_count, 2)
self.assertEqual(expected_last_body, m.last_request.body)
self.assertEqual(expected_last_body,
m.last_request.body.decode('utf-8'))

def test_write_points_udp(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
Expand Down Expand Up @@ -614,28 +615,6 @@ def test_get_list_users_empty(self):

self.assertListEqual(self.cli.get_list_users(), [])

def test_grant_admin_privileges(self):
example_response = '{"results":[{}]}'

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
text=example_response
)
self.cli.grant_admin_privileges('test')

self.assertEqual(
m.last_request.qs['q'][0],
'grant all privileges to test'
)

@raises(Exception)
def test_grant_admin_privileges_invalid(self):
cli = InfluxDBClient('host', 8086, 'username', 'password')
with _mocked_session(cli, 'get', 400):
self.cli.grant_admin_privileges('')

def test_revoke_admin_privileges(self):
example_response = '{"results":[{}]}'

Expand Down
68 changes: 23 additions & 45 deletions tests/influxdb/client_test_with_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def point(serie_name, timestamp=None, tags=None, **fields):
},
"time": "2009-11-10T23:01:35Z",
"fields": {
"value": 33
"value": 33.0
}
}
]
Expand Down Expand Up @@ -380,6 +380,12 @@ def test_create_user(self):
self.assertIn({'user': 'test_user', 'admin': False},
rsp)

def test_create_user_admin(self):
self.cli.create_user('test_user', 'secret_password', True)
rsp = list(self.cli.query("SHOW USERS")['results'])
self.assertIn({'user': 'test_user', 'admin': True},
rsp)

def test_create_user_blank_password(self):
self.cli.create_user('test_user', '')
rsp = list(self.cli.query("SHOW USERS")['results'])
Expand Down Expand Up @@ -439,26 +445,9 @@ def test_drop_user_invalid(self):
'found invalid, expected',
ctx.exception.content)

@unittest.skip("Broken as of 0.9.0")
def test_grant_admin_privileges(self):
self.cli.create_user('test', 'test')
self.assertEqual([{'user': 'test', 'admin': False}],
self.cli.get_list_users())
self.cli.grant_admin_privileges('test')
self.assertEqual([{'user': 'test', 'admin': True}],
self.cli.get_list_users())

def test_grant_admin_privileges_invalid(self):
with self.assertRaises(InfluxDBClientError) as ctx:
self.cli.grant_admin_privileges('')
self.assertEqual(400, ctx.exception.code)
self.assertIn('{"error":"error parsing query: ',
ctx.exception.content)

@unittest.skip("Broken as of 0.9.0")
def test_revoke_admin_privileges(self):
self.cli.create_user('test', 'test')
self.cli.grant_admin_privileges('test')
self.cli.create_user('test', 'test', admin=True)
self.assertEqual([{'user': 'test', 'admin': True}],
self.cli.get_list_users())
self.cli.revoke_admin_privileges('test')
Expand Down Expand Up @@ -518,23 +507,13 @@ def test_write(self):
params={'db': 'db'},
))

@unittest.skip("fail against real server instance, "
"don't know if it should succeed actually..")
def test_write_check_read(self):
self.test_write()
# hmmmm damn,
# after write has returned, if we directly query for the data it's not
# directly available.. (don't know if this is expected behavior (
# but it maybe))
# So we have to :
time.sleep(5)
# so that then the data is available through select :
time.sleep(1)
rsp = self.cli.query('SELECT * FROM cpu_load_short', database='db')
self.assertEqual(
{'cpu_load_short': [
{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]},
rsp
)
self.assertListEqual([{'value': 0.64,
'time': '2009-11-10T23:00:00Z'}],
list(rsp.get_points()))

def test_write_points(self):
self.assertIs(True, self.cli.write_points(dummy_point))
Expand Down Expand Up @@ -692,19 +671,18 @@ def test_get_list_series_and_delete(self):
rsp
)

@unittest.skip("broken on 0.9.0")
def test_delete_series_invalid(self):
with self.assertRaises(InfluxDBClientError):
self.cli.delete_series()

def test_delete_series(self):
self.assertEqual(
len(self.cli.get_list_series()), 0
)
self.cli.write_points(dummy_point)
self.assertEqual(
len(self.cli.get_list_series()), 1
)
self.cli.delete_series(1)
self.assertEqual(
len(self.cli.get_list_series()), 0
)
self.assertEqual(len(self.cli.get_list_series()), 0)
self.cli.write_points(dummy_points)
self.assertEqual(len(self.cli.get_list_series()), 2)
self.cli.delete_series(measurement='cpu_load_short')
self.assertEqual(len(self.cli.get_list_series()), 1)
self.cli.delete_series(tags={'region': 'us-west'})
self.assertEqual(len(self.cli.get_list_series()), 0)

@unittest.skip("Broken as of 0.9.0")
def test_get_list_series_DF(self):
Expand Down