Skip to content

Commit

Permalink
Merge pull request #25407 from rallytime/bp-23236
Browse files Browse the repository at this point in the history
Back-port #23236 to 2015.8
  • Loading branch information
Mike Place committed Jul 14, 2015
2 parents 334eafa + 8faa49a commit bd7c71e
Show file tree
Hide file tree
Showing 6 changed files with 598 additions and 101 deletions.
56 changes: 56 additions & 0 deletions salt/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,32 @@ def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout
raise NotImplementedError()


class PushChannel(object):
'''
Factory class to create Sync channel for push side of push/pull IPC
'''
@staticmethod
def factory(opts, **kwargs):
sync = SyncWrapper(AsyncPushChannel.factory, (opts,), kwargs)
return sync

def send(self, load, tries=3, timeout=60):
'''
Send load across IPC push
'''
raise NotImplementedError()


class PullChannel(object):
'''
Factory class to create Sync channel for pull side of push/pull IPC
'''
@staticmethod
def factory(opts, **kwargs):
sync = SyncWrapper(AsyncPullChannel.factory, (opts,), kwargs)
return sync


# TODO: better doc strings
class AsyncChannel(object):
'''
Expand Down Expand Up @@ -150,4 +176,34 @@ def on_recv(self, callback):
'''
raise NotImplementedError()


class AsyncPushChannel(object):
'''
Factory class to create IPC Push channels
'''
@staticmethod
def factory(opts, **kwargs):
'''
If we have additional IPC transports other than UxD and TCP, add them here
'''
# FIXME for now, just UXD
# Obviously, this makes the factory approach pointless, but we'll extend later
import salt.transport.ipc
return salt.transport.ipc.IPCMessageClient(opts, **kwargs)


class AsyncPullChannel(object):
'''
Factory class to create IPC pull channels
'''
@staticmethod
def factory(opts, **kwargs):
'''
If we have additional IPC transports other than UXD and TCP, add them here
'''
import salt.transport.ipc
return salt.transport.ipc.IPCMessageServer(opts, **kwargs)

## Additional IPC messaging patterns should provide interfaces here, ala router/dealer, pub/sub, etc

# EOF
25 changes: 25 additions & 0 deletions salt/transport/frame.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
'''
Helper functions for transport components to handle message framing
'''
# Import python libs
from __future__ import absolute_import
import msgpack


def frame_msg(body, header=None, raw_body=False):
'''
Frame the given message with our wire protocol
'''
framed_msg = {}
if header is None:
header = {}

# if the body wasn't already msgpacked-- lets do that.
if not raw_body:
body = msgpack.dumps(body)

framed_msg['head'] = header
framed_msg['body'] = body
framed_msg_packed = msgpack.dumps(framed_msg)
return '{0} {1}'.format(len(framed_msg_packed), framed_msg_packed)

0 comments on commit bd7c71e

Please sign in to comment.