Skip to content

Commit

Permalink
Make flake8 happy
Browse files Browse the repository at this point in the history
  • Loading branch information
emphoeller committed Apr 23, 2024
1 parent e8a38cd commit 2811d74
Showing 1 changed file with 190 additions and 171 deletions.
361 changes: 190 additions & 171 deletions coturn_exporter
Original file line number Diff line number Diff line change
Expand Up @@ -27,194 +27,213 @@ MAX_FAILURES = 5

logger = logging.getLogger(__name__)


def make_write_reentrant(stream):
'''
Make it possible to call stream.write() from multiple control flows (threads,
signal handlers) at at the same time.
Usually, doing so will occaisionally lead to an error like this:
RuntimeError: reentrant call inside <_io.BufferedWriter name='<stderr>'>
It seems like the logging module tries to prevent this with its locks, but it
still happens in my testing.
'''
fd = stream.fileno()
def new_write(s, /):
b = s.encode('utf-8', 'ignore')
written = 0
with memoryview(b) as mv:
while written < len(mv):
with mv[written:] as remaining:
written += os.write(fd, remaining)
return len(s)
stream.write = new_write
'''
Make it possible to call stream.write() from multiple control flows
(threads, signal handlers) at at the same time.
Usually, doing so will occaisionally lead to an error like this:
RuntimeError: reentrant call inside <_io.BufferedWriter name='<stderr>'>
It seems like the logging module tries to prevent this with its locks, but
it still happens in my testing.
'''
fd = stream.fileno()

def new_write(s, /):
b = s.encode('utf-8', 'ignore')
written = 0
with memoryview(b) as mv:
while written < len(mv):
with mv[written:] as remaining:
written += os.write(fd, remaining)
return len(s)

stream.write = new_write


def set_loglevel():
'''
Configures the logger using the LOGLEVEL environment variable
'''
loglevel = os.environ.get('LOGLEVEL', None)
if loglevel is None:
logging.getLogger().setLevel(logging.WARNING)
elif loglevel in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'):
logging.getLogger().setLevel(getattr(logging, loglevel))
else:
logger.critical(
'invalid value for optional environment variable LOGLEVEL: %r (allowed '
"values are 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')", loglevel)
sys.exit(1)
'''
Configures the logger using the LOGLEVEL environment variable
'''
loglevel = os.environ.get('LOGLEVEL', None)
if loglevel is None:
logging.getLogger().setLevel(logging.WARNING)
elif loglevel in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'):
logging.getLogger().setLevel(getattr(logging, loglevel))
else:
logger.critical(
'invalid value for optional environment variable LOGLEVEL: %r '
"(allowed values are 'DEBUG', 'INFO', 'WARNING', 'ERROR', "
"'CRITICAL')", loglevel)
sys.exit(1)


def get_int_envvar(name, default):
'''
Obtains the value of the environemnt variable given by name, converts it to
int, and returns the result. Returns default if it does not exist.
'''
try:
value = os.environ[name]
except KeyError:
return default
try:
int_value = int(value)
except ValueError:
logger.critical(
'invalid format for optional environment vairable %s: %r (must be int)',
name, value)
sys.exit(1)
return int_value
'''
Obtains the value of the environemnt variable given by name, converts it to
int, and returns the result. Returns default if it does not exist.
'''
try:
value = os.environ[name]
except KeyError:
return default
try:
int_value = int(value)
except ValueError:
logger.critical(
'invalid format for optional environment vairable %s: %r (must be '
'int)', name, value)
sys.exit(1)
return int_value


def get_executable_path(name):
'''
Returns the full path for the executable given by name.
'''
path = shutil.which(name)
if path is None:
logger.critical('could not locate execuable %r', name)
sys.exit(1)
return path
'''
Returns the full path for the executable given by name.
'''
path = shutil.which(name)
if path is None:
logger.critical('could not locate execuable %r', name)
sys.exit(1)
return path


def get_executable_call_args():
'''
Build and return the argument list for the call to EXECUTABLE_NAME.
'''
try:
turn_server_ip = os.environ['IP']
except KeyError:
logger.critical('the environment variable IP must be set')
sys.exit(1)
args = [
get_executable_path(EXECUTABLE_NAME), '-t', '-e', turn_server_ip, '-c',
'-n', '0', turn_server_ip]
if (turn_server_port := os.environ.get('PORT', None)) is not None:
args.extend(['-p', turn_server_port])
if (turn_server_secret := os.environ.get('SECRET', None)) is not None:
args.extend(['-W', turn_server_secret])
return args
'''
Build and return the argument list for the call to EXECUTABLE_NAME.
'''
try:
turn_server_ip = os.environ['IP']
except KeyError:
logger.critical('the environment variable IP must be set')
sys.exit(1)
args = [
get_executable_path(EXECUTABLE_NAME), '-t', '-e', turn_server_ip, '-c',
'-n', '0', turn_server_ip]
if (turn_server_port := os.environ.get('PORT', None)) is not None:
args.extend(['-p', turn_server_port])
if (turn_server_secret := os.environ.get('SECRET', None)) is not None:
args.extend(['-W', turn_server_secret])
return args


def install_shutdown_signal_handlers(shutdown_event):
'''
Installs signal handlers for some common signals so that the shutdown_event is
set when they are received.
'''
def shutdown_signal_handler(signum, frame):
if not shutdown_event.is_set():
logger.info('received signal %d, shutting down', signum)
shutdown_event.set()
signal.signal(signal.SIGHUP, shutdown_signal_handler)
signal.signal(signal.SIGINT, shutdown_signal_handler)
signal.signal(signal.SIGPIPE, shutdown_signal_handler)
signal.signal(signal.SIGTERM, shutdown_signal_handler)
signal.signal(signal.SIGQUIT, shutdown_signal_handler)
'''
Installs signal handlers for some common signals so that the shutdown_event
is set when they are received.
'''
def shutdown_signal_handler(signum, frame):
if not shutdown_event.is_set():
logger.info('received signal %d, shutting down', signum)
shutdown_event.set()
signal.signal(signal.SIGHUP, shutdown_signal_handler)
signal.signal(signal.SIGINT, shutdown_signal_handler)
signal.signal(signal.SIGPIPE, shutdown_signal_handler)
signal.signal(signal.SIGTERM, shutdown_signal_handler)
signal.signal(signal.SIGQUIT, shutdown_signal_handler)


def check_until_shutdown_event(*, args, interval, turn_server_state_enum,
shutdown_event):
'''
Runs the executable using args every interval seconds to update the
turn_server_state_enum until shutdown_event is set.
'''
failures = 0 # Number of consecutive failures to determine the server status
def failed():
nonlocal failures
if failures < MAX_FAILURES:
failures += 1
return
logger.warning('too many consecutive failures determining the server '
'status, setting "turn_server_state" to "unknown"')
turn_server_state_enum.state('unknown')
while True:
process = subprocess.Popen(args)
wait_until = time.monotonic_ns() + TIMEOUT_SIGTERM * 1e9
while time.monotonic_ns() < wait_until and not shutdown_event.is_set():
try:
process.wait(0.1)
except subprocess.TimeoutExpired:
pass
else:
break
if shutdown_event.is_set():
break
time_until_next_check = 1
if process.returncode is None: # Still running
logger.warning(
'%s could not determine the TURN server status within the timeout (%d '
'seconds), killing and retrying', EXECUTABLE_NAME, TIMEOUT_SIGTERM)
failed()
process.terminate()
try:
process.wait(TIMEOUT_SIGKILL)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
else:
if process.returncode < 0:
logger.warning('%s was terminated unexpectedly by signal %d, retrying',
EXECUTABLE_NAME, -process.returncode)
failed()
else:
failures = 0
if process.returncode == 0:
turn_server_state_enum.state('ok')
time_until_next_check = interval
logger.info('%s said TURN server is ok', EXECUTABLE_NAME)
'''
Runs the executable using args every interval seconds to update the
turn_server_state_enum until shutdown_event is set.
'''
failures = 0 # Number of consecutive failures

def failed():
nonlocal failures
if failures < MAX_FAILURES:
failures += 1
return
logger.warning('too many consecutive failures determining the server '
'status, setting "turn_server_state" to "unknown"')
turn_server_state_enum.state('unknown')

while True:
process = subprocess.Popen(args)
wait_until = time.monotonic_ns() + TIMEOUT_SIGTERM * 1e9
while time.monotonic_ns() < wait_until and not shutdown_event.is_set():
try:
process.wait(0.1)
except subprocess.TimeoutExpired:
pass
else:
break
if shutdown_event.is_set():
break
time_until_next_check = 1
if process.returncode is None: # Still running
logger.warning(
'%s could not determine the TURN server status within the '
'timeout (%d seconds), killing and retrying', EXECUTABLE_NAME,
TIMEOUT_SIGTERM)
failed()
process.terminate()
try:
process.wait(TIMEOUT_SIGKILL)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
else:
turn_server_state_enum.state('not_ok')
time_until_next_check = min(interval, 10)
logger.info('%s said TURN server is not ok', EXECUTABLE_NAME)
if shutdown_event.wait(time_until_next_check):
break
if process.returncode < 0:
logger.warning('%s was terminated unexpectedly by signal %d, '
'retrying', EXECUTABLE_NAME,
-process.returncode)
failed()
else:
failures = 0
if process.returncode == 0:
turn_server_state_enum.state('ok')
time_until_next_check = interval
logger.info('%s said TURN server is ok', EXECUTABLE_NAME)
else:
turn_server_state_enum.state('not_ok')
time_until_next_check = min(interval, 10)
logger.info('%s said TURN server is not ok',
EXECUTABLE_NAME)
if shutdown_event.wait(time_until_next_check):
break


def main():
make_write_reentrant(sys.stdout)
make_write_reentrant(sys.stderr)
logging.basicConfig(
format='[%(asctime)s] %(name)s %(levelname)s: %(message)s')
if len(sys.argv) > 1:
logger.critical('expected 0 arguments, but got %d', len(sys.argv) - 1)
sys.exit(1)
set_loglevel()
interval = get_int_envvar('INTERVAL', 900)
if interval <= 0:
logger.critical(
'environment variable INTERVAL must be positive, but got %d', interval)
sys.exit(1)
args = get_executable_call_args()
shutdown_event = threading.Event()
install_shutdown_signal_handlers(shutdown_event)
prometheus_client.disable_created_metrics()
turn_server_state_enum = prometheus_client.Enum('turn_server_state',
'the state of the TURN server', states=['ok', 'not_ok', 'unknown'])
turn_server_state_enum.state('unknown')
server, server_thread = prometheus_client.start_http_server(
METRICS_SERVER_PORT)
try:
check_until_shutdown_event(
args=args, interval=interval,
turn_server_state_enum=turn_server_state_enum,
shutdown_event=shutdown_event)
finally:
server.shutdown()
server_thread.join()
make_write_reentrant(sys.stdout)
make_write_reentrant(sys.stderr)
logging.basicConfig(
format='[%(asctime)s] %(name)s %(levelname)s: %(message)s')
if len(sys.argv) > 1:
logger.critical('expected 0 arguments, but got %d', len(sys.argv) - 1)
sys.exit(1)
set_loglevel()
interval = get_int_envvar('INTERVAL', 900)
if interval <= 0:
logger.critical(
'environment variable INTERVAL must be positive, but got %d',
interval)
sys.exit(1)
args = get_executable_call_args()
shutdown_event = threading.Event()
install_shutdown_signal_handlers(shutdown_event)
prometheus_client.disable_created_metrics()
turn_server_state_enum = prometheus_client.Enum(
'turn_server_state', 'the state of the TURN server',
states=['ok', 'not_ok', 'unknown'])
turn_server_state_enum.state('unknown')
server, server_thread = prometheus_client.start_http_server(
METRICS_SERVER_PORT)
try:
check_until_shutdown_event(
args=args, interval=interval,
turn_server_state_enum=turn_server_state_enum,
shutdown_event=shutdown_event)
finally:
server.shutdown()
server_thread.join()


if __name__ == '__main__':
main()
sys.exit(0)
main()
sys.exit(0)
else:
raise ImportError('this is not a module')
raise ImportError('this is not a module')

0 comments on commit 2811d74

Please sign in to comment.