Skip to content

Commit

Permalink
lot's of api changes here
Browse files Browse the repository at this point in the history
  • Loading branch information
witlox committed Jun 26, 2019
1 parent feff26f commit f190e08
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 34 deletions.
2 changes: 1 addition & 1 deletion dcron/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ async def save_schedule():

logger.info("starting web application server on http://{0}:{1}/".format(get_ip(), args.web_port))

s = Site(storage, args.udp_communication_port)
s = Site(storage, args.udp_communication_port, cron=processor.cron)
runner = AppRunner(s.app)
loop.run_until_complete(runner.setup())
site_instance = TCPSite(runner, port=args.web_port)
Expand Down
2 changes: 1 addition & 1 deletion dcron/protocols/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, ip=None, system_load=None):
:param system_load: system load (0-100%)
"""
self.ip = ip
self.time = datetime.utcnow()
self.time = datetime.now().isoformat()
self.system_load = system_load
self.state = 'running'

Expand Down
4 changes: 3 additions & 1 deletion dcron/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from datetime import timedelta, datetime
from random import shuffle

from dateutil import parser


class Scheduler(object):
"""
Expand All @@ -47,7 +49,7 @@ def __init__(self, storage, staleness):

def active_nodes(self):
for node in self.storage.cluster_state():
if datetime.utcnow() - node.time < timedelta(seconds=self.staleness):
if datetime.utcnow() - parser.parse(node.time) < timedelta(seconds=self.staleness):
yield node
else:
node.state = 'disconnected'
Expand Down
132 changes: 112 additions & 20 deletions dcron/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,22 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import json
import logging
import pathlib

import jinja2

from aiohttp import web
from dateutil import parser, tz
import aiohttp_jinja2 as aiohttp_jinja2

from dcron.cron.cronitem import CronItem
from dcron.datagram.client import broadcast
from dcron.protocols.messages import Kill, Run, Toggle
from dcron.protocols.udpserializer import UdpSerializer
from dcron.storage import CronEncoder
from dcron.utils import get_ip


class Site(object):
Expand All @@ -46,34 +50,58 @@ class Site(object):

root = pathlib.Path(__file__).parent

def __init__(self, storage, udp_port):
def __init__(self, storage, udp_port, cron=None):
self.cron = cron
self.storage = storage
self.udp_port = udp_port
self.app = web.Application()
aiohttp_jinja2.setup(self.app, loader=jinja2.PackageLoader('dcron', 'templates'))
self.app.router.add_static('/static/', path=self.root/'static', name='static')
self.app.add_routes([web.get('/', self.get)])
self.app.add_routes([web.get('/list_nodes', self.get_nodes)])
self.app.add_routes([web.get('/list_jobs', self.get_jobs)])
self.app.add_routes([web.post('/add_job', self.add_job)])
self.app.add_routes([web.post('/remove_job', self.remove_job)])
self.app.add_routes([web.post('/get_job_log', self.get_job_log)])
self.app.add_routes([web.post('/kill_job', self.kill_job)])
self.app.add_routes([web.post('/run_job', self.run_job)])
self.app.add_routes([web.post('/toggle_job', self.toggle_job)])
self.app.add_routes([web.get('/', self.get),
web.get('/list_nodes', self.get_nodes),
web.get('/cron_in_sync', self.cron_in_sync),
web.get('/status', self.status),
web.get('/list_jobs', self.get_jobs),
web.get('/jobs', self.jobs),
web.post('/add_job', self.add_job),
web.post('/remove_job', self.remove_job),
web.post('/get_job_log', self.get_job_log),
web.post('/kill_job', self.kill_job),
web.post('/run_job', self.run_job),
web.post('/toggle_job', self.toggle_job),
web.get('/export', self.export_data),
web.post('/import', self.import_data)])

@aiohttp_jinja2.template('index.html')
async def get(self, request):
return

@aiohttp_jinja2.template('nodestable.html')
async def get_nodes(self, request):
return dict(nodes=sorted(self.storage.cluster_state(), key=lambda n: n.ip))
nodes = []
for node in self.storage.cluster_state():
node.time = parser.parse(node.time).astimezone(tz.tzlocal()).strftime('%d.%m.%Y %H:%M:%S')
nodes.append(node)
return dict(nodes=sorted(nodes, key=lambda n: n.ip))

async def cron_in_sync(self, request):
for job in self.storage.cluster_jobs:
if job.assigned_to == get_ip():
found = next(iter([j for j in self.cron.find_command(job.command) if j == job]), None)
if not found:
return web.HTTPConflict(text="stored job {0} not matched to actual cron".format(job))
return web.HTTPOk()

async def status(self, request):
return web.json_response(sorted(self.storage.cluster_state(), key=lambda n: n.ip), dumps=CronEncoder().default)

@aiohttp_jinja2.template('jobstable.html')
async def get_jobs(self, request):
return dict(jobs=sorted(self.storage.cluster_jobs, key=lambda j: (j.command, j.assigned_to if j.assigned_to else '*')))

async def jobs(self, request):
return web.json_response(sorted(self.storage.cluster_jobs, key=lambda j: (j.command, j.assigned_to if j.assigned_to else '*')), dumps=CronEncoder().default)

@aiohttp_jinja2.template('joblogs.html')
async def get_job_log(self, request):
data = await request.post()
Expand Down Expand Up @@ -110,11 +138,16 @@ async def kill_job(self, request):
'dow' not in data:
return web.Response(status=500, text='not all mandatory fields submitted')

cron_item = self.generate_cron_item(data)

if cron_item not in self.storage.cluster_jobs:
raise web.HTTPConflict(text='job not found on cluster')

self.logger.debug("broadcasting kill result")

broadcast(self.udp_port, UdpSerializer.dump(Kill(self.generate_cron_item(data))))
broadcast(self.udp_port, UdpSerializer.dump(Kill(cron_item)))

raise web.HTTPFound('/')
raise web.HTTPAccepted()

async def run_job(self, request):
data = await request.post()
Expand All @@ -129,11 +162,16 @@ async def run_job(self, request):
'dow' not in data:
return web.Response(status=500, text='not all mandatory fields submitted')

cron_item = self.generate_cron_item(data)

if cron_item not in self.storage.cluster_jobs:
raise web.HTTPConflict(text='job not found on cluster')

self.logger.debug("broadcasting run result")

broadcast(self.udp_port, UdpSerializer.dump(Run(self.generate_cron_item(data))))
broadcast(self.udp_port, UdpSerializer.dump(Run(cron_item)))

raise web.HTTPFound('/')
raise web.HTTPAccepted()

async def toggle_job(self, request):
data = await request.post()
Expand All @@ -148,11 +186,16 @@ async def toggle_job(self, request):
'dow' not in data:
return web.Response(status=500, text='not all mandatory fields submitted')

cron_item = self.generate_cron_item(data)

if cron_item not in self.storage.cluster_jobs:
raise web.HTTPConflict(text='job not found on cluster')

self.logger.debug("broadcasting run result")

broadcast(self.udp_port, UdpSerializer.dump(Toggle(self.generate_cron_item(data))))
broadcast(self.udp_port, UdpSerializer.dump(Toggle(cron_item)))

raise web.HTTPFound('/')
raise web.HTTPAccepted()

async def add_job(self, request):
data = await request.post()
Expand All @@ -172,11 +215,14 @@ async def add_job(self, request):
if 'disabled' in data:
cron_item.enable(False)

if cron_item in self.storage.cluster_jobs:
raise web.HTTPConflict(text='job already exists')

self.logger.debug("broadcasting add result")

broadcast(self.udp_port, UdpSerializer.dump(cron_item))

raise web.HTTPFound('/')
raise web.HTTPCreated()

async def remove_job(self, request):
data = await request.post()
Expand All @@ -191,11 +237,57 @@ async def remove_job(self, request):
'dow' not in data:
return web.Response(status=500, text='not all mandatory fields submitted')

cron_item = self.generate_cron_item(data, removable=True)

if cron_item not in self.storage.cluster_jobs:
raise web.HTTPConflict(text='job not found')

self.logger.debug("broadcasting remove result")

broadcast(self.udp_port, UdpSerializer.dump(self.generate_cron_item(data, removable=True)))
broadcast(self.udp_port, UdpSerializer.dump(cron_item))

raise web.HTTPAccepted()

async def export_data(self, request):
self.logger.debug("building export data")

result = []
for job in self.storage.cluster_jobs:
result.append(
{
'pattern': "{0} {1} {2} {3} {4}".format(job.minute, job.hour, job.dom, job.month, job.dow),
'command': job.command,
'enabled': job.enabled
}
)

self.logger.debug("returning export {0}".format(result))

return web.json_response(result)

async def import_data(self, request):
data = await request.post()

raise web.HTTPFound('/')
if 'payload' not in data:
return web.Response(status=500, text='no payload found')

self.logger.debug("received import request {0}".format(data['payload']))

try:
imports = json.loads(data['payload'])
for line in imports:
if 'pattern' in line and 'command' in line and 'enabled' in line:
cron_item = CronItem(command=line['command'])
cron_item.set_all(line['pattern'])
cron_item.enable(line['enabled'])
self.logger.debug("received new job from import {0}, broadcasting it.".format(cron_item))
broadcast(self.udp_port, UdpSerializer.dump(cron_item))
else:
self.logger.error("import element invalid: {0}".format(line))
return web.HTTPOk()
except ValueError as e:
self.logger.error(e)
return web.HTTPClientError(text='invalid json received')

def generate_cron_item(self, data, removable=False):

Expand Down
13 changes: 7 additions & 6 deletions dcron/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def node_state(self, ip):
node_status = [status for status in self.cluster_status if status.ip == ip]
if len(node_status) == 0:
return None
sorted_status = sorted(node_status, key=lambda s: s.time, reverse=True)
sorted_status = sorted(node_status, key=lambda s: parser.parse(s.time), reverse=True)
if not sorted_status:
return None
return sorted_status[0]
Expand Down Expand Up @@ -150,6 +150,7 @@ def default(self, o):
'comment': o.comment,
'command': o.command,
'last_run': last_run,
'pid': o.pid,
'assigned_to': o.assigned_to,
'log': o._log,
'parts': str(o.parts)
Expand All @@ -164,15 +165,15 @@ def default(self, o):
}
elif isinstance(o, Status):
time = ''
if o.time:
time = o.time.strftime("{} {}".format(DATE_FORMAT, TIME_FORMAT))
return {
'_type': 'status',
'ip': o.ip,
'state': o.state,
'load': o.system_load,
'time': time
'time': o.time
}
elif isinstance(o, list):
return json.dumps(o, cls=CronEncoder)
return JSONEncoder.default(self, o)


Expand All @@ -192,6 +193,7 @@ def object_hook(obj):
cron_item.enable(obj['enabled'])
cron_item.comment = obj['comment']
cron_item.assigned_to = obj['assigned_to']
cron_item.pid = obj['pid']
cron_item._log = obj['log']
if obj['last_run'] != '':
cron_item.last_run = parser.parse(obj['last_run'])
Expand All @@ -204,8 +206,7 @@ def object_hook(obj):
status.system_load = obj['load']
status.state = obj['state']
status.ip = obj['ip']
if obj['time'] != '':
status.time = parser.parse(obj['time'])
status.time = obj['time']
return status
return obj

0 comments on commit f190e08

Please sign in to comment.