Skip to content

Commit

Permalink
added api call for rebalancing and save and load gates
Browse files Browse the repository at this point in the history
  • Loading branch information
witlox committed Aug 28, 2019
1 parent 28e4f92 commit 0e77a2c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 22 deletions.
6 changes: 3 additions & 3 deletions dcron/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def main():
elif args.cron_user:
processor = Processor(args.udp_communication_port, storage, cron=CronTab(tabfile=args.cron, user=args.cron_user), user=args.cron_user)
else:
processor = Processor(args.udp_communication_port, storage, cron=CronTab(tabfile=args.cron, user=False), user='root')
processor = Processor(args.udp_communication_port, storage, cron=CronTab(tabfile=args.cron, user='root'), user='root')
else:
processor = Processor(args.udp_communication_port, storage, user='root')

Expand Down Expand Up @@ -158,9 +158,9 @@ async def save_schedule():
logger.info("starting web application server on http://{0}:{1}/".format(get_ip(), args.web_port))

if args.cron_user:
s = Site(storage, args.udp_communication_port, cron=processor.cron, user=args.cron_user, hash_key=hash_key)
s = Site(scheduler, storage, args.udp_communication_port, cron=processor.cron, user=args.cron_user, hash_key=hash_key)
else:
s = Site(storage, args.udp_communication_port, cron=processor.cron, hash_key=hash_key)
s = Site(scheduler, storage, args.udp_communication_port, cron=processor.cron, hash_key=hash_key)
runner = AppRunner(s.app)
loop.run_until_complete(runner.setup())
site_instance = TCPSite(runner, port=args.web_port)
Expand Down
15 changes: 12 additions & 3 deletions dcron/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ class Site(object):

root = pathlib.Path(__file__).parent

def __init__(self, storage, udp_port, cron=None, user=None, hash_key=None):
self.cron = cron
def __init__(self, scheduler, storage, udp_port, cron=None, user=None, hash_key=None):
self.scheduler = scheduler
self.storage = storage
self.udp_port = udp_port
self.cron = cron
self.user = user
self.hash_key = hash_key
self.app = web.Application()
Expand All @@ -72,7 +73,8 @@ def __init__(self, storage, udp_port, cron=None, user=None, hash_key=None):
web.post('/run_job', self.run_job),
web.post('/toggle_job', self.toggle_job),
web.get('/export', self.export_data),
web.post('/import', self.import_data)])
web.post('/import', self.import_data),
web.post('/re-balance', self.re_balance)])

@aiohttp_jinja2.template('index.html')
async def get(self, request):
Expand Down Expand Up @@ -127,6 +129,13 @@ async def get_job_log(self, request):
return dict(job=job)
return dict(job=cron_item)

async def re_balance(self, request):
self.logger.debug("rebalance request received")

self.scheduler.re_balance()

raise web.HTTPAccepted()

async def kill_job(self, request):
data = await request.post()

Expand Down
43 changes: 29 additions & 14 deletions dcron/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import asyncio
import logging
import json
import os

from datetime import datetime
from dateutil import parser
Expand Down Expand Up @@ -59,17 +60,23 @@ def __init__(self, path_prefix=None):
path = join(self.path_prefix, 'cluster_status.json')
if not exists(path):
self.logger.info("no previous cache detected on {0}".format(path))
return
self.logger.debug("loading cache from {0}".format(path))
with open(path, 'r') as handle:
self.cluster_status = json.loads(handle.readline(), cls=CronDecoder)
elif os.stat(path).st_size == 0:
self.logger.error("{0} size is zero, something went wrong while saving it! deleting the emtpy file".format(path))
os.remove(path)
else:
self.logger.debug("loading cache from {0}".format(path))
with open(path, 'r') as handle:
self.cluster_status = json.loads(handle.readline(), cls=CronDecoder)
path = join(self.path_prefix, 'cluster_jobs.json')
if not exists(path):
self.logger.info("no previous cache detected on {0}".format(path))
return
self.logger.debug("loading cache from {0}".format(path))
with open(path, 'r') as handle:
self.cluster_jobs = json.loads(handle.readline(), cls=CronDecoder)
elif os.stat(path).st_size == 0:
self.logger.error("{0} size is zero, something went wrong while saving it! deleting the emtpy file".format(path))
os.remove(path)
else:
self.logger.debug("loading cache from {0}".format(path))
with open(path, 'r') as handle:
self.cluster_jobs = json.loads(handle.readline(), cls=CronDecoder)

async def save(self):
"""
Expand All @@ -78,13 +85,21 @@ async def save(self):
self.logger.debug("auto-save")
if self.path_prefix:
path = join(self.path_prefix, 'cluster_status.json')
self.logger.debug("saving status cache to {0}".format(path))
async with aiofiles.open(path, 'w') as handle:
await handle.write(json.dumps(self.cluster_status, cls=CronEncoder))
cluster_status = self.cluster_status
if cluster_status:
self.logger.debug("saving status cache to {0}".format(path))
async with aiofiles.open(path, 'w') as handle:
await handle.write(json.dumps(cluster_status, cls=CronEncoder))
else:
self.logger.debug("cluster status empty, not saving it.")
path = join(self.path_prefix, 'cluster_jobs.json')
self.logger.debug("saving job cache to {0}".format(path))
async with aiofiles.open(path, 'w') as handle:
await handle.write(json.dumps(self.cluster_jobs, cls=CronEncoder))
cluster_jobs = self.cluster_jobs
if cluster_jobs:
self.logger.debug("saving job cache to {0}".format(path))
async with aiofiles.open(path, 'w') as handle:
await handle.write(json.dumps(cluster_jobs, cls=CronEncoder))
else:
self.logger.debug("cluster jobs empty, not saving it.")
else:
self.logger.warning("no path specified for cache, cannot save")
await asyncio.sleep(0.1)
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.9.11
current_version = 0.9.17
commit = True
tag = True

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from setuptools import setup

version = "0.9.16"
version = "0.9.17"

requirements = ['aiohttp',
'aiofiles',
Expand Down

0 comments on commit 0e77a2c

Please sign in to comment.