@@ -36,11 +36,11 @@
# Zato
from zato.broker.zato_client import BrokerClient
from zato.common import(PORTS, ZATO_CONFIG_REQUEST, ZATO_JOIN_REQUEST_ACCEPTED,
ZATO_OK, ZATO_PARALLEL_SERVER, ZATO_SINGLETON_SERVER, ZATO_URL_TYPE_SOAP)
ZATO_OK, ZATO_URL_TYPE_SOAP)
from zato.common.util import TRACE1, zmq_names
from zato.common.odb import create_pool
from zato.server.base import BaseServer
from zato.server.base._overridden import _HTTPServerChannel, _HTTPTask, _TaskDispatcher
from zato.server.base.worker import _HTTPServerChannel, _HTTPTask, _TaskDispatcher
from zato.server.channel.soap import server_soap_error

def wrap_error_message(url_type, msg):
@@ -72,11 +72,6 @@ def __init__(self, server, task_dispatcher, broker_client=None):
self.broker_client = broker_client
super(ZatoHTTPListener, self).__init__(self.server.host, self.server.port,
task_dispatcher)

def _on_broker_msg(self, msg):
""" Passes the message on to a parallel server.
"""
self.server._on_broker_msg(msg)

def _handle_security_tech_account(self, sec_def, request_data, body, headers):
""" Handles the 'tech-account' security config type.
@@ -182,14 +177,16 @@ def executeRequest(self, task, thread_ctx):

class ParallelServer(BaseServer):
def __init__(self, host=None, port=None, zmq_context=None, crypto_manager=None,
odb=None, singleton_server=None, broker_client=None):
odb=None, singleton_server=None, broker_client=None,
basic_auth_store=None):
self.host = host
self.port = port
self.zmq_context = zmq_context or zmq.Context()
self.crypto_manager = crypto_manager
self.odb = odb
self.singleton_server = singleton_server
self.broker_client = broker_client
self.basic_auth_store = basic_auth_store

self.zmq_items = {}

@@ -238,10 +235,10 @@ def _after_init_accepted(self, server):
'cron_definition':cron_definition})
self.singleton_server.scheduler.create_edit('create', job_data)

self.broker_client = BrokerClient('parallel', self.broker_token,
'''self.broker_client = BrokerClient('parallel', self.broker_token,
self.zmq_context, self.broker_push_addr, None,
self.broker_sub_addr, self.on_broker_msg)
self.broker_client.start_subscriber()
self.broker_sub_addr, self.on_broker_pull_msg)
self.broker_client.start_subscriber()'''

def _after_init_non_accepted(self, server):
pass
@@ -275,10 +272,10 @@ def on_inproc_message_handler(self, msg):

def run_forever(self):

task_dispatcher = _TaskDispatcher(self.on_broker_msg, self.broker_token,
task_dispatcher = _TaskDispatcher(self.on_broker_pull_msg, 1, self.broker_token,
self.zmq_context, self.broker_push_addr, self.broker_pull_addr,
None)
task_dispatcher.setThreadCount(60)
self.broker_sub_addr)
task_dispatcher.setThreadCount(4)

self.logger.debug('host=[{0}], port=[{1}]'.format(self.host, self.port))

@@ -305,7 +302,7 @@ def run_forever(self):

# ##############################################################################

def on_broker_msg_SCHEDULER_EXECUTE(self, msg, args):
def on_broker_pull_msg_SCHEDULER_EXECUTE(self, msg, args=None):

service_info = self.service_store.services[msg.service]
class_ = service_info['service_class']
@@ -318,4 +315,6 @@ def on_broker_msg_SCHEDULER_EXECUTE(self, msg, args):
if self.logger.isEnabledFor(logging.DEBUG):
msg = 'Invoked [{0}], response [{1}]'.format(msg.service, repr(response))
self.logger.debug(str(msg))


def on_broker_pull_msg_SECURITY_BASIC_AUTH_CREATE(self, msg):
self.basic_auth_store.create(msg)
@@ -75,10 +75,15 @@ def run(self, *ignored_args, **kwargs):
# Initialize scheduler.
self.scheduler.singleton = self

self.broker_client = BrokerClient('singleton', self.broker_token,
self.zmq_context, self.broker_push_addr, self.broker_pull_addr,
None, self.on_broker_msg)
self.broker_client.start_subscriber()
self.broker_client = BrokerClient()
self.broker_client.name = 'singleton'
self.broker_client.token = self.broker_token
self.broker_client.zmq_context = self.zmq_context
self.broker_client.push_addr = self.broker_push_addr
self.broker_client.pull_addr = self.broker_pull_addr
self.broker_client.on_pull_handler = self.on_broker_pull_msg
self.broker_client.init()
self.broker_client.start()

'''
# Start the pickup monitor.
@@ -89,16 +94,16 @@ def run(self, *ignored_args, **kwargs):

################################################################################

def on_broker_msg_SCHEDULER_CREATE(self, msg, *ignored_args):
def on_broker_pull_msg_SCHEDULER_CREATE(self, msg, *ignored_args):
self.scheduler.create_edit('create', msg)

def on_broker_msg_SCHEDULER_EDIT(self, msg, *ignored_args):
def on_broker_pull_msg_SCHEDULER_EDIT(self, msg, *ignored_args):
self.scheduler.create_edit('edit', msg)

def on_broker_msg_SCHEDULER_DELETE(self, msg, *ignored_args):
def on_broker_pull_msg_SCHEDULER_DELETE(self, msg, *ignored_args):
self.scheduler.delete(msg)

def on_broker_msg_SCHEDULER_EXECUTE(self, msg, *ignored_args):
def on_broker_pull_msg_SCHEDULER_EXECUTE(self, msg, *ignored_args):
self.scheduler.execute(msg)

################################################################################
@@ -21,7 +21,9 @@

# stdlib
import logging
from copy import deepcopy
from thread import start_new_thread
from threading import local
from traceback import format_exc

# zope.server
@@ -39,7 +41,7 @@
logger = logging.getLogger(__name__)

class _HTTPTask(HTTPTask):
""" An HTTP task which knows how to uses ZMQ sockets.
""" An HTTP task which knows how to use ZMQ sockets.
"""
def service(self, thread_data):
try:
@@ -84,12 +86,13 @@ def service(self, thread_data):

class _TaskDispatcher(ThreadedTaskDispatcher):
""" A task dispatcher which knows how to pass custom arguments down to
the newly created threads.
the worker threads.
"""
def __init__(self, message_handler, broker_token, zmq_context,
broker_push_addr, broker_pull_addr, broker_sub_addr):
def __init__(self, pull_handler, sub_handler, broker_token,
zmq_context, broker_push_addr, broker_pull_addr, broker_sub_addr):
super(_TaskDispatcher, self).__init__()
self.message_handler = message_handler
self.pull_handler = pull_handler
self.sub_handler = sub_handler
self.broker_token = broker_token
self.zmq_context = zmq_context
self.broker_push_addr = broker_push_addr
@@ -113,15 +116,18 @@ def setThreadCount(self, count):
threads[thread_no] = 1
running += 1

# It's safe to pass ZMQ contexts between threads.
thread_data = Bunch({
'message_handler': self.message_handler,
'broker_token':self.broker_token,
'zmq_context': self.zmq_context,
'broker_push_addr': self.broker_push_addr,
'broker_pull_addr': self.broker_pull_addr,
'broker_sub_addr': self.broker_sub_addr,
})
thread_data = Bunch()
thread_data.broker_token = self.broker_token
thread_data.broker_push_addr = self.broker_push_addr
thread_data.broker_pull_addr = self.broker_pull_addr
thread_data.broker_sub_addr = self.broker_sub_addr

# Each thread gets its own copy of the initial configuration ..
thread_data = deepcopy(thread_data)

# .. though some things are OK to be shared among multiple threads.
thread_data.zmq_context = self.zmq_context
thread_data.broker_pull_handler = self.pull_handler

start_new_thread(self.handlerThread, (thread_no, thread_data))

@@ -140,18 +146,25 @@ def handlerThread(self, thread_no, thread_data):
""" Mostly copy & paste from the base classes except for the part
that passes the arguments to the thread.
"""

# We're in a new thread now so we can start the broker client though note
# that the message handler will be assigned to it later on.
thread_data.broker_client = BrokerClient('parallel/thread', self.broker_token,
thread_data.zmq_context, thread_data.broker_push_addr,
thread_data.broker_pull_addr, self.broker_sub_addr,
self.message_handler)
_local = local()

args = Bunch({'broker_client': thread_data.broker_client})
thread_data.broker_client.set_message_handler_args(args)
def _on_sub_message_handler(msg, args):
""" Invoked for each message sent through the ZeroMQ SUB socket.
"""
logger.error(str([msg, args, _local]))

thread_data.broker_client.start_subscriber()
# We're in a new thread so we can start the broker client now.
_local.broker_client = BrokerClient()
_local.broker_client.name = 'parallel/thread'
_local.broker_client.token = thread_data.broker_token
_local.broker_client.zmq_context = thread_data.zmq_context
_local.broker_client.push_addr = thread_data.broker_push_addr
_local.broker_client.pull_addr = thread_data.broker_pull_addr
_local.broker_client.sub_addr = thread_data.broker_sub_addr
_local.broker_client.on_pull_handler = thread_data.broker_pull_handler
_local.broker_client.on_sub_handler = _on_sub_message_handler
_local.broker_client.init()
_local.broker_client.start()

threads = self.threads
try:
@@ -161,7 +174,7 @@ def handlerThread(self, thread_no, thread_data):
# Special value: kill this thread.
break
try:
task.service(thread_data)
task.service(_local)
except Exception, e:
logger.error('Exception during task {0}'.format(
format_exc(e)))
@@ -40,6 +40,7 @@
from zato.server.pool.sql import SQLConnectionPool, SQLConnectionPool
from zato.server.repo import RepoManager
from zato.server.scheduler import Scheduler
from zato.server.security.basic_auth import Store as BasicAuthStore
from zato.server.security.wss import WSSUsernameTokenProfileStore
from zato.server.service.store import EggServiceImporter, ServiceStore

@@ -163,7 +164,7 @@ def soap_message_handler(self):
return handler

# #######################################################
# WS-Security
# Security

@Object
def wss_nonce_cache(self):
@@ -187,6 +188,10 @@ def wss_username_password_store(self):

return store

@Object
def basic_auth_store(self):
return BasicAuthStore()

# #######################################################
# ODB (Operational Database)

@@ -204,6 +209,7 @@ def parallel_server(self):
server.odb = self.odb_manager()
server.soap_handler = self.soap_message_handler()
server.service_store = self.service_store()
server.basic_auth_store = self.basic_auth_store()

# Regular objects.
#server.sql_pool = self.sql_pool()
@@ -30,7 +30,7 @@

# Zato
from zato.common import scheduler_date_time_format
from zato.common.broker_message import SCHEDULER
from zato.common.broker_message import MESSAGE_TYPE, SCHEDULER

logger = logging.getLogger(__name__)

@@ -61,7 +61,7 @@ def _on_job_execution(self, name, service, extra):
"""
msg = {'action': SCHEDULER.JOB_EXECUTED, 'name':name,
'service': service, 'extra':extra}
self.singleton.broker_client.send_json(msg)
self.singleton.broker_client.send_json(msg, msg_type=MESSAGE_TYPE.TO_PARALLEL_PULL)

if logger.isEnabledFor(logging.DEBUG):
msg = 'Job executed, name [{0}], service [{1}], extra [{2}]'.format(
@@ -96,9 +96,14 @@ def create_interval_based(self, job_data):
""" Schedules the execution of an interval-based job.
"""
start_date = _start_date(job_data)
weeks = job_data.weeks if job_data.weeks else 0
days = job_data.days if job_data.days else 0
hours = job_data.hours if job_data.hours else 0
minutes = job_data.minutes if job_data.minutes else 0
seconds = job_data.seconds if job_data.seconds else 0
self._sched.add_interval_job(self._on_job_execution,
job_data.weeks, job_data.days, job_data.hours, job_data.minutes,
job_data.seconds, start_date, [job_data.name, job_data.service, job_data.extra],
weeks, days, hours, minutes, seconds, start_date,
[job_data.name, job_data.service, job_data.extra],
name=job_data.name, max_runs=job_data.repeats)

logger.info('Interval-based job [{0}] scheduled'.format(job_data.name))
@@ -33,6 +33,7 @@

# Zato
from zato.common import ZatoException, ZATO_OK
from zato.common.broker_message import MESSAGE_TYPE, SECURITY
from zato.common.odb.model import Cluster, HTTPBasicAuth
from zato.common.util import TRACE1
from zato.server.service.internal import _get_params, AdminService