diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 627973704..c749af34d 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -182,7 +182,7 @@ class KafkaAdminClient(object): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, - 'sasl_aws_msk_iam_session': None, + 'sasl_aws_msk_iam_role_arn': None, # metrics configs 'metric_reporters': [], diff --git a/kafka/client_async.py b/kafka/client_async.py index 33cfba55b..44ad264c2 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -193,7 +193,7 @@ class KafkaClient(object): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, - 'sasl_aws_msk_iam_session': None, + 'sasl_aws_msk_iam_role_arn': None, } def __init__(self, **configs): diff --git a/kafka/conn.py b/kafka/conn.py index b7220c337..798e9d2c3 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -86,7 +86,7 @@ class SSLWantWriteError(Exception): # needed for AWS_MSK_IAM authentication: try: - from botocore.session import Session as BotoSession + from boto3 import Session as BotoSession except ImportError: # no botocore available, will disable AWS_MSK_IAM mechanism BotoSession = None @@ -232,7 +232,7 @@ class BrokerConnection(object): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, - 'sasl_aws_msk_iam_session': BotoSession() + 'sasl_aws_msk_iam_role_arn': None, } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512", 'AWS_MSK_IAM') @@ -573,7 +573,7 @@ def _handle_sasl_handshake_response(self, future, response): elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"): return self._try_authenticate_scram(future) elif self.config['sasl_mechanism'] == 'AWS_MSK_IAM': - return self._try_authenticate_aws_msk_iam(future, self.config["sasl_aws_msk_iam_session"]) + return self._try_authenticate_aws_msk_iam(future) else: return future.failure( Errors.UnsupportedSaslMechanismError( @@ -674,7 +674,8 @@ def _try_authenticate_plain(self, future): log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) return future.success(True) - def _try_authenticate_aws_msk_iam(self, future, session): + def _try_authenticate_aws_msk_iam(self, future): + session = self._get_aws_session() credentials = session.get_credentials().get_frozen_credentials() client = AwsMskIamClient( host=self.host, @@ -869,6 +870,23 @@ def _try_authenticate_oauth(self, future): log.info('%s: Authenticated via OAuth', self) return future.success(True) + def _get_aws_session(self): + if self.config["sasl_aws_msk_iam_role_arn"]: + client = BotoSession().client('sts') + assume_role = client.assume_role( + RoleArn=self.config["sasl_aws_msk_iam_role_arn"], + RoleSessionName="kafka-python" + ) + credentials = assume_role['Credentials'] + + return BotoSession( + aws_access_key_id=credentials['AccessKeyId'], + aws_secret_access_key=credentials['SecretAccessKey'], + aws_session_token=credentials['SessionToken'], + ) + + return BotoSession() + def _build_oauth_client_request(self): token_provider = self.config['sasl_oauth_token_provider'] return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions()) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 41636deb5..f03a25a08 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -306,7 +306,7 @@ class KafkaConsumer(six.Iterator): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, - 'sasl_aws_msk_iam_session': None, + 'sasl_aws_msk_iam_role_arn': None, 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator 'kafka_client': KafkaClient, } diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 16c3b0df7..a13103dbf 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -329,7 +329,7 @@ class KafkaProducer(object): 'metrics_sample_window_ms': 30000, 'selector': selectors.DefaultSelector, 'sasl_mechanism': None, - 'sasl_aws_msk_iam_session': None, + 'sasl_aws_msk_iam_role_arn': None, 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka',