Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class KafkaAdminClient(object):
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'sasl_aws_msk_iam_session': None,
'sasl_aws_msk_iam_role_arn': None,

# metrics configs
'metric_reporters': [],
Expand Down
2 changes: 1 addition & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class KafkaClient(object):
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'sasl_aws_msk_iam_session': None,
'sasl_aws_msk_iam_role_arn': None,
}

def __init__(self, **configs):
Expand Down
26 changes: 22 additions & 4 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class SSLWantWriteError(Exception):

# needed for AWS_MSK_IAM authentication:
try:
from botocore.session import Session as BotoSession
from boto3 import Session as BotoSession
except ImportError:
# no botocore available, will disable AWS_MSK_IAM mechanism
BotoSession = None
Expand Down Expand Up @@ -232,7 +232,7 @@ class BrokerConnection(object):
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'sasl_aws_msk_iam_session': BotoSession()
'sasl_aws_msk_iam_role_arn': None,
}
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512", 'AWS_MSK_IAM')
Expand Down Expand Up @@ -573,7 +573,7 @@ def _handle_sasl_handshake_response(self, future, response):
elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"):
return self._try_authenticate_scram(future)
elif self.config['sasl_mechanism'] == 'AWS_MSK_IAM':
return self._try_authenticate_aws_msk_iam(future, self.config["sasl_aws_msk_iam_session"])
return self._try_authenticate_aws_msk_iam(future)
else:
return future.failure(
Errors.UnsupportedSaslMechanismError(
Expand Down Expand Up @@ -674,7 +674,8 @@ def _try_authenticate_plain(self, future):
log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
return future.success(True)

def _try_authenticate_aws_msk_iam(self, future, session):
def _try_authenticate_aws_msk_iam(self, future):
session = self._get_aws_session()
credentials = session.get_credentials().get_frozen_credentials()
client = AwsMskIamClient(
host=self.host,
Expand Down Expand Up @@ -869,6 +870,23 @@ def _try_authenticate_oauth(self, future):
log.info('%s: Authenticated via OAuth', self)
return future.success(True)

def _get_aws_session(self):
if self.config["sasl_aws_msk_iam_role_arn"]:
client = BotoSession().client('sts')
assume_role = client.assume_role(
RoleArn=self.config["sasl_aws_msk_iam_role_arn"],
RoleSessionName="kafka-python"
)
credentials = assume_role['Credentials']

return BotoSession(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'],
)

return BotoSession()

def _build_oauth_client_request(self):
token_provider = self.config['sasl_oauth_token_provider']
return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions())
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class KafkaConsumer(six.Iterator):
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'sasl_aws_msk_iam_session': None,
'sasl_aws_msk_iam_role_arn': None,
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
'kafka_client': KafkaClient,
}
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ class KafkaProducer(object):
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
'sasl_mechanism': None,
'sasl_aws_msk_iam_session': None,
'sasl_aws_msk_iam_role_arn': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
Expand Down