Skip to content

Python: Why global dataflow not tracking endpoints in function Service.start()? #17019

Open
@hksdpc255

Description

@hksdpc255

Python code from openstack

from neutron_lib._i18n import _
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging import serializer as om_serializer
from oslo_service import service


LOG = logging.getLogger(__name__)
TRANSPORT = None


class RequestContextSerializer(om_serializer.Serializer):
    def __init__(self, base=None):
        pass

    pass

def get_server(target, endpoints, serializer=None):
    """Get a new RPC server reference.

    :param target: The target for the new RPC server.
    :param endpoints: The endpoints for the RPC server.
    :param serializer: The optional serialize to use for the RPC server.
    :returns: A new RPC server reference.
    """
    if TRANSPORT is None:
        raise AssertionError(_("'TRANSPORT' must not be None"))
    serializer = RequestContextSerializer(serializer)
    return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
                                         'eventlet', serializer)

class Service(service.Service):
    """Service object for binaries running on hosts.

    A service enables rpc by listening to queues based on topic and host.
    """
    def __init__(self, host, topic, manager=None, serializer=None):
        super().__init__()
        self.host = host
        self.topic = topic
        self.serializer = serializer
        if manager is None:
            self.manager = self
        else:
            self.manager = manager

    def start(self):
        super().start()

        self.conn = Connection()
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)

        endpoints = [self.manager]

        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

    def stop(self):
        # Try to shut the connection down, but if we get any sort of
        # errors, go ahead and ignore them.. as we're shutting down anyway
        try:
            self.conn.close()
        except Exception:  # nosec
            pass
        super().stop()


class Connection(object):
    """A utility class that manages a collection of RPC servers."""

    def __init__(self):
        super().__init__()
        self.servers = []

    def create_consumer(self, topic, endpoints, fanout=False):
        target = oslo_messaging.Target(
            topic=topic, server=cfg.CONF.host, fanout=fanout)
        server = get_server(target, endpoints)
        self.servers.append(server)

    def consume_in_threads(self):
        for server in self.servers:
            server.start()
        return self.servers

    def close(self):
        for server in self.servers:
            server.stop()
        for server in self.servers:
            server.wait()

CodeQL

import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking

module MyConf implements DataFlow::ConfigSig {
    predicate isSource(DataFlow::Node source) {
        source.asExpr().isConstant() or
        source.asExpr() instanceof Name or
        source.asExpr() instanceof Attribute
    }
    predicate isSink(DataFlow::Node sink) {
        sink = API::moduleImport("oslo_messaging").getMember("get_rpc_server").getACall().getArg(2)
    }
    predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo){
        nodeTo.asCfgNode().(IterableNode).getAnElement() = nodeFrom.asCfgNode()
    }
}

module MyFlow = DataFlow::Global<MyConf>;

from DataFlow::Node source, DataFlow::Node sink
where MyFlow::flow(source, sink)
select source, sink

Output

source sink
endpoints in line 19 endpoints in line 30
endpoints in line 30 endpoints in line 30
endpoints in line 84 endpoints in line 30
endpoints in line 87 endpoints in line 30

Metadata

Metadata

Assignees

No one assigned

    Labels

    questionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions