Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Delay string formatting when logging messages #227

Merged
merged 2 commits into from Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 6 additions & 8 deletions agogosml/agogosml/common/eventhub_processor_events.py
Expand Up @@ -24,7 +24,7 @@ async def open_async(self, context):
"""
Called by processor host to initialize the event processor.
"""
logger.info("Connection established {}".format(context.partition_id))
logger.info("Connection established %s", context.partition_id)

async def close_async(self, context, reason):
"""
Expand All @@ -36,10 +36,8 @@ async def close_async(self, context, reason):
:param reason: Reason for closing the async loop.
:type reason: string
"""
logger.info(
"Connection closed (reason {}, id {}, offset {}, sq_number {})".
format(reason, context.partition_id, context.offset,
context.sequence_number))
logger.info("Connection closed (reason %s, id %s, offset %s, sq_number %s)",
reason, context.partition_id, context.offset, context.sequence_number)

async def process_events_async(self, context, messages):
"""
Expand All @@ -55,8 +53,8 @@ async def process_events_async(self, context, messages):
message_json = message.body_as_str(encoding='UTF-8')
if self.on_message_received_callback is not None:
self.on_message_received_callback(message_json)
logger.debug("Received message: {}".format(message_json))
logger.info("Events processed {}".format(context.sequence_number))
logger.debug("Received message: %s", message_json)
logger.info("Events processed %s", context.sequence_number)
await context.checkpoint_async()

async def process_error_async(self, context, error):
Expand All @@ -69,4 +67,4 @@ async def process_error_async(self, context, error):
:type context: ~azure.eventprocessorhost.PartitionContext
:param error: The error that occured.
"""
logger.error("Event Processor Error {!r}".format(error))
logger.error("Event Processor Error %s", error)
8 changes: 4 additions & 4 deletions agogosml/agogosml/common/eventhub_streaming_client.py
Expand Up @@ -85,7 +85,7 @@ def __init__(self, config):
self.sender = self.send_client.add_sender()
self.send_client.run()
except Exception as e:
logger.error('Failed to init EH send client: ' + str(e))
logger.error('Failed to init EH send client: %s', e)
raise

def start_receiving(self, on_message_received_callback):
Expand Down Expand Up @@ -119,17 +119,17 @@ def send(self, message):
"""
try:
self.sender.send(EventData(body=message))
logger.info('Sent message: {}'.format(message))
logger.info('Sent message: %s', message)
return True
except Exception as e:
logger.error('Failed to send message to EH: ' + str(e))
logger.error('Failed to send message to EH: %s', e)
return False

def stop(self):
try:
self.send_client.stop()
except Exception as e:
logger.error('Failed to close send client: ' + str(e))
logger.error('Failed to close send client: %s', e)

@staticmethod
async def wait_and_close(host, timeout):
Expand Down
2 changes: 1 addition & 1 deletion agogosml/agogosml/common/flask_http_listener_client.py
Expand Up @@ -52,4 +52,4 @@ def shutdown_server(self):
raise RuntimeError('Not running with the Werkzeug Server')
func()
except Exception as e:
print('error while shutting down flask server: ' + str(e))
print('error while shutting down flask server: %s' % e)
15 changes: 7 additions & 8 deletions agogosml/agogosml/common/http_message_sender.py
Expand Up @@ -21,8 +21,8 @@ def __init__(self, config: dict):
host_endpoint = config.get('HOST')
port_endpoint = config.get('PORT')

logger.info("host_endpoint: {}".format(host_endpoint))
logger.info("port_endpoint: {}".format(port_endpoint))
logger.info("host_endpoint: %s", host_endpoint)
logger.info("port_endpoint: %s", port_endpoint)

if host_endpoint is None:
raise ValueError('Host endpoint cannot be None.')
Expand All @@ -48,15 +48,14 @@ def send(self, message):
# TODO: Add retries as some of the messages are failing to send
request = requests.post(server_address, data=message)
if request.status_code != 200:
logger.error(
"Error with a request {} and message not sent was {}".
format(request.status_code, message))
print("Error with a request {} and message not sent was {}".
format(request.status_code, message))
logger.error("Error with a request %s and message not sent was %s",
request.status_code, message)
print("Error with a request %s and message not sent was %s" %
(request.status_code, message))
else:
return_value = True

except Exception as e_e:
logger.error('Failed to send request: ' + str(e_e))
logger.error('Failed to send request: %s', e_e)

return return_value
14 changes: 7 additions & 7 deletions agogosml/agogosml/common/kafka_streaming_client.py
Expand Up @@ -73,10 +73,10 @@ def delivery_report(self, err, msg):
"""

if err is not None:
logger.error('Message delivery failed: {}'.format(err))
logger.error('Message delivery failed: %s', err)
else:
logger.info('Message delivered to {} [{}]'.format(
msg.topic(), msg.partition()))
logger.info('Message delivered to %s [%s]',
msg.topic(), msg.partition())

def send(self, message: str, *args, **kwargs):
"""
Expand All @@ -94,7 +94,7 @@ def send(self, message: str, *args, **kwargs):
self.producer.flush()
return True
except Exception as e:
logger.error('Error sending message to kafka:' + str(e))
logger.error('Error sending message to kafka: %s', e)
return False

def stop(self, *args, **kwargs):
Expand All @@ -119,8 +119,8 @@ def handle_kafka_error(self, msg):
"""
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
logger.error('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
logger.error('%% %s [%d] reached end at offset %d\n',
msg.topic(), msg.partition(), msg.offset())
else:
# Error
raise KafkaException(msg.error())
Expand Down Expand Up @@ -170,6 +170,6 @@ def read_single_message(self):
return None
else:
# Proper message
# logger.info('kafka read message: {}, from topic: {}'.format(msg.value(), msg.topic()))
# logger.info('kafka read message: %s, from topic: %s', msg.value(), msg.topic())
self.consumer.commit(msg)
return msg.value()
2 changes: 1 addition & 1 deletion agogosml/agogosml/tools/sender.py
Expand Up @@ -25,7 +25,7 @@ def json_arg(value: str):
try:
return loads(value)
except ValueError:
raise ArgumentTypeError('{} is not in JSON format'.format(value))
raise ArgumentTypeError('%s is not in JSON format' % value)


def main(messages: IO[str], sender_class: StreamingClientType, config: Dict[str, str]):
Expand Down
2 changes: 1 addition & 1 deletion agogosml/agogosml/utils/imports.py
Expand Up @@ -18,7 +18,7 @@ def get_base_module(interface) -> Tuple[str, Path]:

def import_subpackages(module_prefix: str, module_path: Path):
for _, module, _ in walk_packages([str(module_path)]):
sub_module_name = '{}.{}'.format(module_prefix, module)
sub_module_name = '%s.%s' % (module_prefix, module)
import_module(sub_module_name)


Expand Down
16 changes: 8 additions & 8 deletions agogosml/agogosml/utils/logger.py
Expand Up @@ -69,39 +69,39 @@ def _telemetry(self) -> Union[TelemetryClient, NullTelemetryClient]:
channel = TelemetryChannel(context, queue)
return TelemetryClient(ikey, telemetry_channel=channel)

def debug(self, message: str):
def debug(self, message: str, *args):
"""
Log debug message.

:param message: Debug message string.
"""
self._log(logging.DEBUG, message)
self._log(logging.DEBUG, message, *args)

def info(self, message: str):
def info(self, message: str, *args):
"""
Log info message

:param message: Info message string.
"""
self._log(logging.INFO, message)
self._log(logging.INFO, message, *args)

def error(self, message: str):
def error(self, message: str, *args):
"""
Log error message

:param message: Error message string.
"""
self._log(logging.ERROR, message)
self._log(logging.ERROR, message, *args)

def event(self, name: str, props: Optional[Dict[str, str]] = None):
props = props or {}
self._logger.info('Event %s: %r', name, props)
self._telemetry.track_event(name, props)

def _log(self, level: int, message: str):
def _log(self, level: int, message: str, *args):
if not self._logger.isEnabledFor(level):
return

self._logger.log(level, message)
self._logger.log(level, message, *args)
self._telemetry.track_trace(
message, severity=logging.getLevelName(level))
2 changes: 1 addition & 1 deletion agogosml/tests/client_mocks.py
Expand Up @@ -15,7 +15,7 @@ def __init__(self, config: dict = None):
pass

def send(self, msg):
print('Streaming Client Mock send message:', msg)
print('Streaming Client Mock send message: %s' % msg)
if self.should_fail_to_send:
self.sent = False
return False
Expand Down
4 changes: 2 additions & 2 deletions agogosml/tests/integration_tests/test_app.py
Expand Up @@ -15,7 +15,7 @@ def start(self):
self.listener.start(self.on_message_received)

def on_message_received(self, message):
logger.info('Test App message received: ' + message)
logger.info('Test App message received: %s', message)
sender_result = self.sender.send(message)
logger.info('Sender result: ' + str(sender_result))
logger.info('Sender result: %s', sender_result)
return sender_result
Expand Up @@ -195,12 +195,12 @@ def test_when_messages_sent_to_kafka_then_all_messages_are_sent_via_output():
print('sending test message to reader')

test_msg = str(time.clock())
print("sending {} to input topic".format(test_msg))
print("sending %s to input topic" % test_msg)
# send a message from INPUT reader, and expect it to flow in the pipeline,
# and eventually be picked up by the output writer
send_message_to_kafka(test_msg)
last_msg = read_message_from_kafka()
print("received {} from output topic".format(last_msg))
print("received %s from output topic" % last_msg)
assert last_msg == test_msg

ir.stop_incoming_messages()
Expand Down