Skip to content

Commit

Permalink
Wrapped kafka client init in a try catch
Browse files Browse the repository at this point in the history
  • Loading branch information
mcasado committed Mar 2, 2015
1 parent 2f9855f commit d08f90d
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions beaver/transports/kafka_transport.py
Expand Up @@ -18,27 +18,31 @@ def __init__(self, beaver_config, logger=None):
for key in config_to_store:
self._kafka_config[key] = beaver_config.get('kafka_' + key)

self._client = KafkaClient(self._kafka_config['hosts'], self._kafka_config['client_id'])
self._key = self._kafka_config['key']
try:
self._client = KafkaClient(self._kafka_config['hosts'], self._kafka_config['client_id'])
self._key = self._kafka_config['key']

if self._key is None:
self._prod = SimpleProducer(self._client, async=self._kafka_config['async'],
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=self._kafka_config['ack_timeout'],
codec=self._kafka_config['codec'],
batch_send=True,
batch_send_every_n=self._kafka_config['batch_n'],
batch_send_every_t=self._kafka_config['batch_t'])
else:
self._prod = KeyedProducer(self._client, async=self._kafka_config['async'],
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=self._kafka_config['ack_timeout'],
codec=self._kafka_config['codec'],
batch_send=True,
batch_send_every_n=self._kafka_config['batch_n'],
batch_send_every_t=self._kafka_config['batch_t'])
if self._key is None:
self._prod = SimpleProducer(self._client, async=self._kafka_config['async'],
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=self._kafka_config['ack_timeout'],
codec=self._kafka_config['codec'],
batch_send=True,
batch_send_every_n=self._kafka_config['batch_n'],
batch_send_every_t=self._kafka_config['batch_t'])
else:
self._prod = KeyedProducer(self._client, async=self._kafka_config['async'],
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=self._kafka_config['ack_timeout'],
codec=self._kafka_config['codec'],
batch_send=True,
batch_send_every_n=self._kafka_config['batch_n'],
batch_send_every_t=self._kafka_config['batch_t'])

self._is_valid = True;
self._is_valid = True;

except Exception, e:
raise TransportException(e.message)


def callback(self, filename, lines, **kwargs):
Expand Down

0 comments on commit d08f90d

Please sign in to comment.