Skip to content

Commit

Permalink
added verification method for udp
Browse files Browse the repository at this point in the history
  • Loading branch information
witlox committed Jun 27, 2019
1 parent f190e08 commit 1408514
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 18 deletions.
15 changes: 10 additions & 5 deletions dcron/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def main():
parser.add_argument('-w', '--web-port', type=int, default=8080, help='web hosting port (default: 8080)')
parser.add_argument('-n', '--ntp-server', default='pool.ntp.org', help='NTP server to detect clock skew (default: pool.ntp.org)')
parser.add_argument('-s', '--node-staleness', type=int, default=180, help='Time in seconds of non-communication for a node to be marked as stale (defailt: 180s)')
parser.add_argument('-x', '--hash-key', default='abracadabra', help="String to use for verifying UDP traffic (to disable use '')")
parser.add_argument('-v', '--verbose', action='store_true', default=False, help='verbose logging')

args = parser.parse_args()
Expand Down Expand Up @@ -94,6 +95,10 @@ def main():
else:
processor = Processor(args.udp_communication_port, storage, user='root')

hash_key = None
if args.hash_key != '':
hash_key = args.hash_key

with StatusProtocolServer(processor, args.udp_communication_port) as loop:

running = True
Expand All @@ -106,11 +111,11 @@ def timed_broadcast():
"""
while running:
time.sleep(5)
broadcast(args.udp_communication_port, UdpSerializer.dump(Status(get_ip(), get_load())))
broadcast(args.udp_communication_port, UdpSerializer.dump(Status(get_ip(), get_load()), hash_key))
for job in storage.cluster_jobs:
if job.assigned_to == get_ip():
job.pid = check_process(job.command)
for packet in UdpSerializer.dump(job):
for packet in UdpSerializer.dump(job, hash_key):
client(args.udp_communication_port, packet)

def timed_schedule():
Expand All @@ -122,11 +127,11 @@ def timed_schedule():
if not scheduler.check_cluster_state():
logger.info("re-balancing cluster")
jobs = storage.cluster_jobs.copy()
for packet in UdpSerializer.dump(ReBalance(timestamp=datetime.now())):
for packet in UdpSerializer.dump(ReBalance(timestamp=datetime.now()), hash_key):
client(args.udp_communication_port, packet)
time.sleep(5)
for job in jobs:
for packet in UdpSerializer.dump(job):
for packet in UdpSerializer.dump(job, hash_key):
client(args.udp_communication_port, packet)

async def scheduled_broadcast():
Expand All @@ -150,7 +155,7 @@ async def save_schedule():

logger.info("starting web application server on http://{0}:{1}/".format(get_ip(), args.web_port))

s = Site(storage, args.udp_communication_port, cron=processor.cron)
s = Site(storage, args.udp_communication_port, cron=processor.cron, hash_key=hash_key)
runner = AppRunner(s.app)
loop.run_until_complete(runner.setup())
site_instance = TCPSite(runner, port=args.web_port)
Expand Down
7 changes: 4 additions & 3 deletions dcron/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Processor(object):

logger = logging.getLogger(__name__)

def __init__(self, udp_port, storage, cron=None, user=None):
def __init__(self, udp_port, storage, cron=None, user=None, hash_key=None):
self.queue = asyncio.Queue()
self._buffer = []
self.udp_port = udp_port
Expand All @@ -52,6 +52,7 @@ def __init__(self, udp_port, storage, cron=None, user=None):
else:
self.cron = cron
self.user = user
self.hash_key = hash_key

def update_status(self, status_message):
self.logger.debug("got full status message in buffer ({0}".format(status_message))
Expand Down Expand Up @@ -127,7 +128,7 @@ async def run(self, run, uuid):
self.logger.warning("error during execution of {0}: {1}".format(run.job.command, std_err))
self.logger.info("output of {0} with code {1}: {2}".format(job.command, exit_code, std_out))
job.append_log("{0:%b %d %H:%M:%S} localhost CRON[{1}] exit code: {2}, out: {3}, err: {4}".format(datetime.now(), process.pid, exit_code, std_out, std_err))
broadcast(self.udp_port, UdpSerializer.dump(job))
broadcast(self.udp_port, UdpSerializer.dump(job, self.hash_key))
self.clean_buffer(uuid)

def kill(self, kill):
Expand Down Expand Up @@ -165,7 +166,7 @@ async def process(self):
packet_groups = group(self._buffer)
for uuid in packet_groups.keys():
self.logger.debug("identifying packet group for {0}".format(uuid))
obj = UdpSerializer.load(packet_groups[uuid])
obj = UdpSerializer.load(packet_groups[uuid], self.hash_key)
if obj:
self.logger.debug("got object {0} from {1}".format(obj, uuid))
if isinstance(obj, Status):
Expand Down
18 changes: 16 additions & 2 deletions dcron/protocols/udpserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
# SOFTWARE.

import pickle
import hashlib
import hmac

from math import ceil
from uuid import uuid4


from dcron.protocols.packet import Packet


Expand All @@ -37,22 +40,27 @@ class UdpSerializer(object):
"""

@staticmethod
def dump(obj):
def dump(obj, hash_key=None):
"""
serialize your object to list of (raw) udp_packet
:param obj: object to convert
:param hash_key: key for use in signature
:return: udp_packets
"""
buffer = pickle.dumps(obj)
if hash_key:
buffer += b' ' + hmac.new(hash_key.encode('utf-8'), buffer, hashlib.sha1).digest()
total = ceil(len(buffer) / Packet.data_size)
uuid = str(uuid4())
for i in range(total):
yield Packet(uuid, total, i, buffer[i * Packet.data_size:(i + 1) * Packet.data_size]).encode()

@staticmethod
def load(data):
def load(data, hash_key=None):
"""
construct object from udp packets
:param data: list of raw udp_packet
:param hash_key: key for use in signature
:return: the object or None
"""
packets = []
Expand All @@ -73,4 +81,10 @@ def load(data):
buffer = b''
for p in sorted(packets, key=lambda x: x.index):
buffer += p.data
if hash_key:
data, digest = buffer.split(b' ')
if digest == hmac.new(hash_key.encode('utf-8'), data, hashlib.sha1).digest():
return pickle.loads(data)
else:
return None
return pickle.loads(buffer)
15 changes: 8 additions & 7 deletions dcron/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ class Site(object):

root = pathlib.Path(__file__).parent

def __init__(self, storage, udp_port, cron=None):
def __init__(self, storage, udp_port, cron=None, hash_key=None):
self.cron = cron
self.storage = storage
self.udp_port = udp_port
self.hash_key = hash_key
self.app = web.Application()
aiohttp_jinja2.setup(self.app, loader=jinja2.PackageLoader('dcron', 'templates'))
self.app.router.add_static('/static/', path=self.root/'static', name='static')
Expand Down Expand Up @@ -145,7 +146,7 @@ async def kill_job(self, request):

self.logger.debug("broadcasting kill result")

broadcast(self.udp_port, UdpSerializer.dump(Kill(cron_item)))
broadcast(self.udp_port, UdpSerializer.dump(Kill(cron_item), self.hash_key))

raise web.HTTPAccepted()

Expand All @@ -169,7 +170,7 @@ async def run_job(self, request):

self.logger.debug("broadcasting run result")

broadcast(self.udp_port, UdpSerializer.dump(Run(cron_item)))
broadcast(self.udp_port, UdpSerializer.dump(Run(cron_item), self.hash_key))

raise web.HTTPAccepted()

Expand All @@ -193,7 +194,7 @@ async def toggle_job(self, request):

self.logger.debug("broadcasting run result")

broadcast(self.udp_port, UdpSerializer.dump(Toggle(cron_item)))
broadcast(self.udp_port, UdpSerializer.dump(Toggle(cron_item), self.hash_key))

raise web.HTTPAccepted()

Expand All @@ -220,7 +221,7 @@ async def add_job(self, request):

self.logger.debug("broadcasting add result")

broadcast(self.udp_port, UdpSerializer.dump(cron_item))
broadcast(self.udp_port, UdpSerializer.dump(cron_item, self.hash_key))

raise web.HTTPCreated()

Expand All @@ -244,7 +245,7 @@ async def remove_job(self, request):

self.logger.debug("broadcasting remove result")

broadcast(self.udp_port, UdpSerializer.dump(cron_item))
broadcast(self.udp_port, UdpSerializer.dump(cron_item, self.hash_key))

raise web.HTTPAccepted()

Expand Down Expand Up @@ -281,7 +282,7 @@ async def import_data(self, request):
cron_item.set_all(line['pattern'])
cron_item.enable(line['enabled'])
self.logger.debug("received new job from import {0}, broadcasting it.".format(cron_item))
broadcast(self.udp_port, UdpSerializer.dump(cron_item))
broadcast(self.udp_port, UdpSerializer.dump(cron_item, self.hash_key))
else:
self.logger.error("import element invalid: {0}".format(line))
return web.HTTPOk()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from setuptools import setup

version = "0.9.11"
version = "0.9.12"

requirements = ['aiohttp',
'aiofiles',
Expand Down

0 comments on commit 1408514

Please sign in to comment.