Skip to content

Commit

Permalink
Admin interface to Scheduler added
Browse files Browse the repository at this point in the history
  • Loading branch information
keiffster committed Apr 11, 2018
1 parent 2c87720 commit 7c914b0
Show file tree
Hide file tree
Showing 9 changed files with 559 additions and 89 deletions.
13 changes: 13 additions & 0 deletions bots/y-bot/aiml/admin/scheduler.aiml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<aiml>

<category>
<pattern>YADMIN SCHEDULER LIST JOBS</pattern>
<template>
<extension path="programy.extensions.admin.scheduler.SchedulerAdminExtension">
LIST JOBS
</extension>
</template>
</category>

</aiml>
11 changes: 11 additions & 0 deletions bots/y-bot/aiml/testing/scheduler/scheduler.aiml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@
</template>
</category>

<category>
<pattern>ACTION IN * * TO *</pattern>
<template>
<srai>
SCHEDULE
<extension path="programy.extensions.scheduler.scheduler.SchedulerExtension">
REMIND IN <star index="1" /> <star index="2" /> GRAMMAR <star index="3" />
</extension>
</srai>
</template>
</category>

</aiml>

Expand Down
1 change: 0 additions & 1 deletion src/programy/clients/render/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ def parse_tag(self, client_context, tag):
return self.parse_location(client_context, tag)

else:
print("Unknown tag %s", tag.name)
return None

def parse_text(self, client_context, text):
Expand Down
222 changes: 221 additions & 1 deletion src/programy/config/client/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,144 @@
from programy.utils.logging.ylogger import YLogger

from programy.config.base import BaseConfigurationData
from programy.config.bot.bot import BotConfiguration


class SchedulerJobStoreConfiguration(BaseConfigurationData):

def __init__(self):
BaseConfigurationData.__init__(self, name="jobstore")
self._jobstore = None

@property
def name(self):
return self._name

@property
def jobstore(self):
return self._jobstore

def load_config_section(self, configuration_file, configuration, bot_root):
jobstore = configuration_file.get_section(self._section_name, configuration)
if jobstore is not None:
self._name = configuration_file.get_option(jobstore, "name", missing_value=None)
if self._name is not None:
if self._name == 'mongo':
self._jobstore = SchedulerMongoJobStoreConfiguration()
elif self._name == 'redis':
self._jobstore = SchedulerRedisJobStoreConfiguration()
elif self._name == 'sqlalchemy':
self._jobstore = SchedulerSqlAlchemyJobStoreConfiguration()

self._jobstore.load_config_section(configuration_file, jobstore, bot_root)


class SchedulerMongoJobStoreConfiguration(BaseConfigurationData):

def __init__(self):
BaseConfigurationData.__init__(self, name="mongo")
self._collection = None

@property
def collection(self):
return self._collection

def load_config_section(self, configuration_file, configuration, bot_root):
mongodb = configuration_file.get_section(self._section_name, configuration)
if mongodb is not None:
self._collection = configuration_file.get_option(mongodb, "collection", missing_value=None)


class SchedulerRedisJobStoreConfiguration(BaseConfigurationData):

def __init__(self):
BaseConfigurationData.__init__(self, name="redis")
self._jobs_key = None
self._run_times_key = None

@property
def jobs_key(self):
return self._jobs_key

@property
def run_times_key(self):
return self._run_times_key

def load_config_section(self, configuration_file, configuration, bot_root):
redis = configuration_file.get_section(self._section_name, configuration)
if redis is not None:
self._jobs_key = configuration_file.get_option(redis, "jobs_key", missing_value=None)
self._run_times_key = configuration_file.get_option(redis, "run_times_key", missing_value=None)


class SchedulerSqlAlchemyJobStoreConfiguration(BaseConfigurationData):

def __init__(self):
BaseConfigurationData.__init__(self, name="sqlalchemy")
self._url = None

@property
def url(self):
return self._url

def load_config_section(self, configuration_file, configuration, bot_root):
sqlalchemy = configuration_file.get_section(self._section_name, configuration)
if sqlalchemy is not None:
self._url = configuration_file.get_option(sqlalchemy, "url", missing_value=None)


class SchedulerThreadPoolConfiguration(BaseConfigurationData):

def __init__(self):
BaseConfigurationData.__init__(self, name="threadpool")
self._max_workers = None

@property
def max_workers(self):
return self._max_workers

def load_config_section(self, configuration_file, configuration, bot_root):
threadpool = configuration_file.get_section(self._section_name, configuration)
if threadpool is not None:
self._max_workers = configuration_file.get_option(threadpool, "max_workers", missing_value=None)


class SchedulerProcessPoolConfiguration(BaseConfigurationData):

def __init__(self):
BaseConfigurationData.__init__(self, name="processpool")
self._max_workers = None

@property
def max_workers(self):
return self._max_workers

def load_config_section(self, configuration_file, configuration, bot_root):
processpool = configuration_file.get_section(self._section_name, configuration)
if processpool is not None:
self._max_workers = configuration_file.get_option(processpool, "max_workers", missing_value=None)


class SchedulerJobDefaultsConfiguration(BaseConfigurationData):

def __init__(self):
BaseConfigurationData.__init__(self, name="job_defaults")
self._coalesce = None
self._max_instances = None

@property
def coalesce(self):
return self._coalesce

@property
def max_instances(self):
return self._max_instances


def load_config_section(self, configuration_file, configuration, bot_root):
job_defaults = configuration_file.get_section(self._section_name, configuration)
if job_defaults is not None:
self._coalesce = configuration_file.get_option(job_defaults, "coalesce", missing_value=None)
self._max_instances = configuration_file.get_option(job_defaults, "max_instances", missing_value=None)


class SchedulerConfiguration(BaseConfigurationData):
Expand All @@ -30,6 +167,11 @@ def __init__(self):
self._remove_all_jobs = False
self._blocking = False

self._jobstore = None
self._threadpool = None
self._processpool = None
self._job_defaults = None

@property
def name(self):
return self._name
Expand All @@ -50,6 +192,22 @@ def remove_all_jobs(self):
def blocking(self):
return self._blocking

@property
def jobstore(self):
return self._jobstore

@property
def threadpool(self):
return self._threadpool

@property
def processpool(self):
return self._processpool

@property
def job_defaults(self):
return self._job_defaults

def load_config_section(self, configuration_file, configuration, bot_root):
scheduler = configuration_file.get_section(self._section_name, configuration)
if scheduler is not None:
Expand All @@ -58,8 +216,70 @@ def load_config_section(self, configuration_file, configuration, bot_root):
self._add_listeners = configuration_file.get_bool_option(scheduler, "add_listeners", missing_value=False)
self._remove_all_jobs = configuration_file.get_bool_option(scheduler, "remove_all_jobs", missing_value=False)

if 'jobstore' in scheduler:
self._jobstore = SchedulerJobStoreConfiguration()
self._jobstore.load_config_section(configuration_file, scheduler, bot_root)

if 'threadpool' in scheduler:
self._threadpool = SchedulerThreadPoolConfiguration()
self._threadpool.load_config_section(configuration_file, scheduler, bot_root)

if 'processpool' in scheduler:
self._processpool = SchedulerProcessPoolConfiguration()
self._processpool.load_config_section(configuration_file, scheduler, bot_root)

if 'job_defaults' in scheduler:
self._job_defaults = SchedulerJobDefaultsConfiguration()
self._job_defaults.load_config_section(configuration_file, scheduler, bot_root)
else:
YLogger.warning(self, "'scheduler' section missing from client config, using to defaults")

def create_scheduler_config(self):

config = {}
if self.jobstore is not None:

if self.jobstore.name == 'mongo':
config['apscheduler.jobstores.mongo'] = {'type': 'mongodb'}
if self.jobstore.jobstore is not None:
if self.jobstore.jobstore.collection is not None:
config['apscheduler.jobstores.mongo']['collection'] = self.jobstore.jobstore.collection

elif self.jobstore.name == 'redis':
config['apscheduler.jobstores.redis'] = {'type': 'redis'}
if self.jobstore.jobstore is not None:
if self.jobstore.jobstore.jobs_key is not None:
config['apscheduler.jobstores.redis']['jobs_key'] = self.jobstore.jobstore.jobs_key
if self.jobstore.jobstore.run_times_key is not None:
config['apscheduler.jobstores.redis']['run_times_key'] = self.jobstore.jobstore.run_times_key

elif self.jobstore.name == 'sqlalchemy':
config['apscheduler.jobstores.sqlalchemy'] = {'type': 'sqlalchemy'}
if self.jobstore.jobstore is not None:
if self.jobstore.jobstore.url is not None:
config['apscheduler.jobstores.sqlalchemy']['url'] = self.jobstore.jobstore.url

if self.threadpool is not None:
config['apscheduler.executors.default'] = {'class': 'apscheduler.executors.pool:ThreadPoolExecutor'}
if self.threadpool.max_workers is not None:
config['apscheduler.executors.default']['max_workers'] = str(self.threadpool.max_workers)

if self.processpool is not None:
config['apscheduler.executors.processpool'] = {'type': 'processpool'}
if self.threadpool.max_workers is not None:
config['apscheduler.executors.processpool']['max_workers'] = str(self.threadpool.max_workers)

if self.job_defaults is not None:
config['apscheduler.job_defaults'] = {}
if self.job_defaults.coalesce is not None:
config['apscheduler.job_defaults.coalesce'] = str(self.job_defaults.coalesce).lower()
if self.job_defaults.max_instances is not None:
config['apscheduler.job_defaults.max_instances'] = str(self.job_defaults.max_instances)

if len(config.keys()) > 0:
return config

return None



41 changes: 41 additions & 0 deletions src/programy/extensions/admin/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Copyright (c) 2016-2018 Keith Sterling http://www.keithsterling.com
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from programy.utils.logging.ylogger import YLogger

from programy.extensions.base import Extension


class SchedulerAdminExtension(Extension):

# execute() is the interface that is called from the <extension> tag in the AIML
def execute(self, client_context, data):
YLogger.debug(client_context, "Scheduler Admin - [%s]", data)


commands = [x.upper() for x in data.split()]

if commands[0] == 'LIST':
if commands[1] == 'JOBS':
jobs = client_context.client.scheduler.list_jobs()
if jobs:
response = ""
for id, job in jobs.items():
response += "> Job ID:%s, Next Run: %s, Args: %s\n"%(id, job.next_run_time, str(job.args))
return response

return "No job information available"
13 changes: 7 additions & 6 deletions src/programy/extensions/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class SchedulerExtension(Extension):
def execute(self, client_context, data):
YLogger.debug(client_context, "Scheduler - [%s]", data)

# REDMIND IN|EVERY X SECONDS|MINUTES|HOURS|DAYS|WEEKS MESSAGE ...........
# REDMIND IN|EVERY X SECONDS|MINUTES|HOURS|DAYS|WEEKS MESSAGE|GRAMMAR ...........

words = data.split()
if len(words)> 5:
Expand All @@ -36,9 +36,10 @@ def execute(self, client_context, data):
quantity = int(words[2])
period = words[3].upper()
if period in ['SECONDS', 'MINUTES', 'HOURS', 'DAYS', 'WEEKS']:
if words[4] == 'MESSAGE':
action = words[4]
if action in ['MESSAGE', 'GRAMMAR']:
text = " ".join(words[5:])
self.schedule(client_context, when, quantity, period, text)
self.schedule(client_context, when, quantity, period, action, text)
return 'OK'
else:
print ('MESSAGE missing')
Expand All @@ -51,14 +52,14 @@ def execute(self, client_context, data):

return 'ERR'

def schedule(self, client_context, when, quantity, period, text):
def schedule(self, client_context, when, quantity, period, action, text):

if when == 'IN':
if period == 'SECONDS':
client_context.client.scheduler.schedule_in_n_seconds(client_context.userid, client_context.id, text, quantity)
client_context.client.scheduler.schedule_in_n_seconds(client_context.userid, client_context.id, action, text, quantity)

elif when == 'EVERY':
if period == 'SECONDS':
client_context.client.scheduler.schedule_every_n_seconds(client_context.userid, client_context.id, text, quantity)
client_context.client.scheduler.schedule_every_n_seconds(client_context.userid, client_context.id, action, text, quantity)


0 comments on commit 7c914b0

Please sign in to comment.