Open
Description
Python code from openstack
from neutron_lib._i18n import _
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging import serializer as om_serializer
from oslo_service import service
LOG = logging.getLogger(__name__)
TRANSPORT = None
class RequestContextSerializer(om_serializer.Serializer):
def __init__(self, base=None):
pass
pass
def get_server(target, endpoints, serializer=None):
"""Get a new RPC server reference.
:param target: The target for the new RPC server.
:param endpoints: The endpoints for the RPC server.
:param serializer: The optional serialize to use for the RPC server.
:returns: A new RPC server reference.
"""
if TRANSPORT is None:
raise AssertionError(_("'TRANSPORT' must not be None"))
serializer = RequestContextSerializer(serializer)
return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
'eventlet', serializer)
class Service(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host.
"""
def __init__(self, host, topic, manager=None, serializer=None):
super().__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super().start()
self.conn = Connection()
LOG.debug("Creating Consumer connection for Service %s",
self.topic)
endpoints = [self.manager]
self.conn.create_consumer(self.topic, endpoints)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in threads
self.conn.consume_in_threads()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception: # nosec
pass
super().stop()
class Connection(object):
"""A utility class that manages a collection of RPC servers."""
def __init__(self):
super().__init__()
self.servers = []
def create_consumer(self, topic, endpoints, fanout=False):
target = oslo_messaging.Target(
topic=topic, server=cfg.CONF.host, fanout=fanout)
server = get_server(target, endpoints)
self.servers.append(server)
def consume_in_threads(self):
for server in self.servers:
server.start()
return self.servers
def close(self):
for server in self.servers:
server.stop()
for server in self.servers:
server.wait()
CodeQL
import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
module MyConf implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) {
source.asExpr().isConstant() or
source.asExpr() instanceof Name or
source.asExpr() instanceof Attribute
}
predicate isSink(DataFlow::Node sink) {
sink = API::moduleImport("oslo_messaging").getMember("get_rpc_server").getACall().getArg(2)
}
predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo){
nodeTo.asCfgNode().(IterableNode).getAnElement() = nodeFrom.asCfgNode()
}
}
module MyFlow = DataFlow::Global<MyConf>;
from DataFlow::Node source, DataFlow::Node sink
where MyFlow::flow(source, sink)
select source, sink
Output
source | sink |
---|---|
endpoints in line 19 |
endpoints in line 30 |
endpoints in line 30 |
endpoints in line 30 |
endpoints in line 84 |
endpoints in line 30 |
endpoints in line 87 |
endpoints in line 30 |