Skip to content

Commit

Permalink
Merge pull request #496 from sebasrp/issue/495
Browse files Browse the repository at this point in the history
issue #495 - Initial support for Kinesis service
  • Loading branch information
jantman committed Dec 2, 2020
2 parents 49e2bfb + 65bc7ea commit a148adb
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 0 deletions.
1 change: 1 addition & 0 deletions awslimitchecker/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from awslimitchecker.services.eks import _EksService
from awslimitchecker.services.firehose import _FirehoseService
from awslimitchecker.services.iam import _IamService
from awslimitchecker.services.kinesis import _KinesisService
from awslimitchecker.services.lambdafunc import _LambdaService
from awslimitchecker.services.rds import _RDSService
from awslimitchecker.services.redshift import _RedshiftService
Expand Down
128 changes: 128 additions & 0 deletions awslimitchecker/services/kinesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
awslimitchecker/services/kinesis.py
The latest version of this package is available at:
<https://github.com/jantman/awslimitchecker>
################################################################################
Copyright 2015-2018 Jason Antman <jason@jasonantman.com>
This file is part of awslimitchecker, also known as awslimitchecker.
awslimitchecker is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
awslimitchecker is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with awslimitchecker. If not, see <http://www.gnu.org/licenses/>.
The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/awslimitchecker> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################
AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""

import abc # noqa
import logging

from .base import _AwsService
from ..limit import AwsLimit

logger = logging.getLogger(__name__)


class _KinesisService(_AwsService):

service_name = 'Kinesis'
api_name = 'kinesis'
quotas_service_code = 'kinesis'

def find_usage(self):
"""
Determine the current usage for each limit of this service,
and update corresponding Limit via
:py:meth:`~.AwsLimit._add_current_usage`.
"""
logger.debug("Checking usage for service %s", self.service_name)
self.connect()
for lim in self.limits.values():
lim._reset_usage()
self._find_shards()
self._have_usage = True
logger.debug("Done checking usage.")

def _find_shards(self):
describe_limits_response = self.conn.describe_limits()
self.limits['Shards per Region']._add_current_usage(
describe_limits_response['OpenShardCount'],
resource_id=self._boto3_connection_kwargs['region_name'],
aws_type='AWS::Kinesis::Stream'
)

def get_limits(self):
"""
Return all known limits for this service, as a dict of their names
to :py:class:`~.AwsLimit` objects.
:returns: dict of limit names to :py:class:`~.AwsLimit` objects
:rtype: dict
"""
if self.limits != {}:
return self.limits

self.connect()
region_name = self.conn._client_config.region_name
regions_500_shards = ['us-east-1', 'us-west-2', 'eu-west-1']

limits = {}

limits['Shards per Region'] = AwsLimit(
'Shards per Region',
self,
500 if region_name in regions_500_shards else 200,
self.warning_threshold,
self.critical_threshold,
limit_type='AWS::Kinesis::Stream',
)
self.limits = limits
return limits

def _update_limits_from_api(self):
"""
Call the service's API action to retrieve limit/quota information, and
update AwsLimit objects in ``self.limits`` with this information.
"""
logger.debug("Updating limits for Kinesis from the AWS API")
self.connect()
describe_limits_response = self.conn.describe_limits()
self.limits['Shards per Region']._set_api_limit(
describe_limits_response['ShardLimit']
)

def required_iam_permissions(self):
"""
Return a list of IAM Actions required for this Service to function
properly. All Actions will be shown with an Effect of "Allow"
and a Resource of "*".
:returns: list of IAM Action strings
:rtype: list
"""
return [
'kinesis:DescribeLimits',
]
7 changes: 7 additions & 0 deletions awslimitchecker/tests/services/result_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4586,6 +4586,13 @@ class CloudTrail(object):
}


class Kinesis(object):
mock_describe_limits = {
'ShardLimit': 700,
'OpenShardCount': 555
}


class EKS(object):

test_find_clusters_usage_list = {
Expand Down
196 changes: 196 additions & 0 deletions awslimitchecker/tests/services/test_kinesis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""
awslimitchecker/tests/services/test_kinesis.py
The latest version of this package is available at:
<https://github.com/jantman/awslimitchecker>
################################################################################
Copyright 2015-2018 Jason Antman <jason@jasonantman.com>
This file is part of awslimitchecker, also known as awslimitchecker.
awslimitchecker is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
awslimitchecker is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with awslimitchecker. If not, see <http://www.gnu.org/licenses/>.
The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/awslimitchecker> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################
AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""

import sys
from awslimitchecker.tests.services import result_fixtures
from awslimitchecker.limit import AwsLimit
from awslimitchecker.services.kinesis import _KinesisService

# https://code.google.com/p/mock/issues/detail?id=249
# py>=3.4 should use unittest.mock not the mock package on pypi
if (
sys.version_info[0] < 3 or
sys.version_info[0] == 3 and sys.version_info[1] < 4
):
from mock import patch, call, Mock, DEFAULT
else:
from unittest.mock import patch, call, Mock, DEFAULT


pbm = 'awslimitchecker.services.kinesis' # module patch base
pb = '%s._KinesisService' % pbm # class patch pase


class Test_KinesisService(object):

def test_init(self):
"""test __init__()"""
with patch('%s.get_limits' % pb):
cls = _KinesisService(21, 43, {}, None)
assert cls.service_name == 'Kinesis'
assert cls.api_name == 'kinesis'
assert cls.conn is None
assert cls.warning_threshold == 21
assert cls.critical_threshold == 43

def test_get_limits(self):
mock_conn = Mock()
m_client = Mock()
type(m_client).region_name = 'ap-southeast-2'
type(mock_conn)._client_config = m_client

def se_conn(cls):
cls.conn = mock_conn

with patch('%s.connect' % pb, autospec=True) as mock_connect:
mock_connect.side_effect = se_conn
cls = _KinesisService(21, 43, {}, None)

cls.limits = {}
res = cls.get_limits()
assert sorted(res.keys()) == sorted([
'Shards per Region',
])
for name, limit in res.items():
assert limit.service == cls
assert limit.def_warning_threshold == 21
assert limit.def_critical_threshold == 43

limits = cls.limits
assert len(limits) == 1
assert limits['Shards per Region'].default_limit == 200

def test_get_limits_us_east_1(self):
mock_conn = Mock()
m_client = Mock()
type(m_client).region_name = 'us-east-1'
type(mock_conn)._client_config = m_client

def se_conn(cls):
cls.conn = mock_conn

with patch('%s.connect' % pb, autospec=True) as mock_connect:
mock_connect.side_effect = se_conn
cls = _KinesisService(21, 43, {}, None)

limits = cls.limits
for x in limits:
assert isinstance(limits[x], AwsLimit)
assert x == limits[x].name
assert limits[x].service == cls

assert len(limits) == 1
assert limits['Shards per Region'].default_limit == 500

def test_get_limits_again(self):
"""test that existing limits dict is returned on subsequent calls"""
mock_limits = Mock()
cls = _KinesisService(21, 43, {}, None)
cls.limits = mock_limits
res = cls.get_limits()
assert res == mock_limits

def test_find_usage(self):
mock_conn = Mock()

def se_conn(cls):
cls.conn = mock_conn

with patch('%s.connect' % pb, autospec=True) as mock_connect:
mock_connect.side_effect = se_conn
with patch.multiple(
pb,
_find_shards=DEFAULT,
) as mocks:
cls = _KinesisService(21, 43, {}, None)
cls.conn = mock_conn
assert cls._have_usage is False
cls.find_usage()
assert mock_connect.mock_calls == [call(cls), call(cls)]
assert cls._have_usage is True
assert mock_conn.mock_calls == []
for x in [
'_find_shards',
]:
assert mocks[x].mock_calls == [call()]

def test_find_shards(self):
response = result_fixtures.Kinesis.mock_describe_limits
limit_key = 'Shards per Region'

mock_conn = Mock()
mock_conn.describe_limits.return_value = response

cls = _KinesisService(21, 43, {'region_name': 'us-west-2'}, None)
cls.conn = mock_conn
cls._find_shards()

assert mock_conn.mock_calls == [
call.describe_limits()
]
assert len(cls.limits[limit_key].get_current_usage()) == 1
assert cls.limits[limit_key].get_current_usage()[
0].get_value() == 555

def test_update_limits_from_api(self):
response = result_fixtures.Kinesis.mock_describe_limits
mock_conn = Mock()
mock_conn.describe_limits.return_value = response

def se_conn(cls):
cls.conn = mock_conn

with patch('%s.connect' % pb, autospec=True) as mock_connect:
mock_connect.side_effect = se_conn
cls = _KinesisService(21, 43, {'region_name': 'us-west-2'}, None)
assert len(cls.limits) == 1
cls.conn = mock_conn
cls._update_limits_from_api()

assert mock_connect.mock_calls == [call(cls), call(cls)]
assert mock_conn.mock_calls == [call.describe_limits()]
assert len(cls.limits) == 1
lim = cls.limits['Shards per Region'].get_limit()
assert lim == 700

def test_required_iam_permissions(self):
cls = _KinesisService(21, 43, {}, None)
assert cls.required_iam_permissions() == [
'kinesis:DescribeLimits',
]

0 comments on commit a148adb

Please sign in to comment.