Skip to content
This repository has been archived by the owner on Apr 14, 2023. It is now read-only.

Commit

Permalink
PEP8 and Documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
coreyjrobins committed Apr 16, 2018
1 parent d3d712e commit ba5b739
Show file tree
Hide file tree
Showing 30 changed files with 438 additions and 264 deletions.
1 change: 1 addition & 0 deletions run_all_tests.py
Expand Up @@ -46,5 +46,6 @@ def run_tests():
successful = False
finally:
from flask import current_app

current_app.running_context.executor.shutdown_pool()
sys.exit(not successful)
2 changes: 1 addition & 1 deletion tests/util/mock_objects.py
Expand Up @@ -130,7 +130,7 @@ def __init__(self, current_app):

def send(self, packet):
with self.current_app.app_context():
self.send_callback(packet)
self._send_callback(packet)

def _increment_execution_count(self):
global workflows_executed
Expand Down
5 changes: 2 additions & 3 deletions walkoff.py
Expand Up @@ -9,19 +9,18 @@

import walkoff
import walkoff.config

from scripts.compose_api import compose_api
from walkoff.multiprocessedexecutor.multiprocessedexecutor import spawn_worker_processes
from walkoff.server.app import create_app

logger = logging.getLogger('walkoff')


def run(app, host, port):
from walkoff.multiprocessedexecutor.multiprocessedexecutor import spawn_worker_processes
print_banner()
pids = spawn_worker_processes()
monkey.patch_all()

from scripts.compose_api import compose_api
compose_api()

app.running_context.executor.initialize_threading(app, pids)
Expand Down
21 changes: 12 additions & 9 deletions walkoff/cache.py
@@ -1,20 +1,21 @@
import logging
import os
import os.path
import pickle
import sqlite3
import threading
from copy import deepcopy
from datetime import timedelta
from functools import partial
from weakref import WeakSet

import os.path
from diskcache import FanoutCache, DEFAULT_SETTINGS, Cache
from diskcache.core import DBNAME
from gevent import sleep
from gevent.event import AsyncResult, Event
import threading
import walkoff.config
import pickle
from copy import deepcopy
from six import string_types, binary_type
import json

import walkoff.config

logger = logging.getLogger(__name__)

Expand All @@ -23,7 +24,6 @@
except ImportError:
from cStringIO import StringIO as BytesIO


unsubscribe_message = b'__UNSUBSCRIBE__'
"""(str): The message used to unsubscribe from and close a PubSub channel
"""
Expand All @@ -41,6 +41,7 @@ class DiskSubscription(object):
Args:
channel (str): The channel name associated with this subscription
"""

def __init__(self, channel):
self.channel = channel
self._listener = None
Expand Down Expand Up @@ -169,7 +170,8 @@ def __push_to_subscribers(self, channel, value):

@staticmethod
def __get_value(value):
if value == unsubscribe_message or isinstance(value, string_types) or isinstance(value, int) or isinstance(value, float):
if value == unsubscribe_message or isinstance(value, string_types) or isinstance(value, int) or isinstance(
value, float):
return value
if isinstance(value, binary_type):
return value.decode('utf-8')
Expand Down Expand Up @@ -231,6 +233,7 @@ class DiskCacheAdapter(object):
retry (bool, optional): Should this database retry timed out transactions? Default to True
**settings: Other setting which will be passsed to the `cache` attribute on initialization
"""

def __init__(self, directory, shards=8, timeout=0.01, retry=True, **settings):
self.directory = directory
self.retry = retry
Expand Down Expand Up @@ -430,7 +433,7 @@ def _convert_expire_to_seconds(time):
Returns:
(float): The expiration time in seconds
"""
return time.total_seconds() if isinstance(time, timedelta) else float(time)/1000.
return time.total_seconds() if isinstance(time, timedelta) else float(time) / 1000.

def shutdown(self):
"""Shuts down the connection to the cache
Expand Down
14 changes: 8 additions & 6 deletions walkoff/config.py
Expand Up @@ -2,10 +2,12 @@
import logging
import logging.config
import sys
from os.path import isfile, join, abspath
import warnings

import yaml
from os.path import isfile, join, abspath

from walkoff.appgateway import cache_apps

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -136,6 +138,9 @@ class Config(object):
@classmethod
def load_config(cls, config_path=None):
""" Loads Walkoff configuration from JSON file
Args:
config_path (str): Optional path to the config. Defaults to the CONFIG_PATH class variable.
"""
if config_path:
cls.CONFIG_PATH = config_path
Expand All @@ -158,8 +163,7 @@ def load_config(cls, config_path=None):

@classmethod
def write_values_to_file(cls, keys=None):
""" Writes the current walkoff configuration to a file
"""
""" Writes the current walkoff configuration to a file"""
if keys is None:
keys = [key for key in dir(cls) if not key.startswith('__')]

Expand All @@ -173,10 +177,8 @@ def write_values_to_file(cls, keys=None):


def initialize(config_path=None):
"""Loads the config file, loads the app cache, and loads the app APIs into memory
"""
"""Loads the config file, loads the app cache, and loads the app APIs into memory"""
Config.load_config(config_path)
setup_logger()
from walkoff.appgateway import cache_apps
cache_apps(Config.APPS_PATH)
load_app_apis()
3 changes: 2 additions & 1 deletion walkoff/messaging/utils.py
@@ -1,12 +1,13 @@
import json
import logging

from flask import current_app

import walkoff.messaging
from walkoff.events import WalkoffEvent
from walkoff.extensions import db
from walkoff.serverdb import Role, User
from walkoff.serverdb.message import Message
from flask import current_app

logger = logging.getLogger(__name__)

Expand Down
73 changes: 53 additions & 20 deletions walkoff/multiprocessedexecutor/multiprocessedexecutor.py
Expand Up @@ -9,16 +9,16 @@
import gevent
import zmq.green as zmq

from walkoff.executiondb import ExecutionDatabase
import walkoff.config
from walkoff.events import WalkoffEvent
from walkoff.executiondb import ExecutionDatabase
from walkoff.executiondb import WorkflowStatusEnum
from walkoff.executiondb.saved_workflow import SavedWorkflow
from walkoff.executiondb.workflow import Workflow
from walkoff.executiondb.workflowresults import WorkflowStatus
from walkoff.multiprocessedexecutor.workflowexecutioncontroller import WorkflowExecutionController, Receiver
from walkoff.multiprocessedexecutor.threadauthenticator import ThreadAuthenticator
from walkoff.multiprocessedexecutor.worker import Worker
import walkoff.config
from walkoff.multiprocessedexecutor.workflowexecutioncontroller import WorkflowExecutionController, Receiver

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,7 +58,8 @@ def initialize_threading(self, app, pids=None):
"""Initialize the multiprocessing communication threads, allowing for parallel execution of workflows.
Args:
pids (list, optional): Optional list of spawned processes. Defaults to None
app (FlaskApp): The current_app object
pids (list[Process], optional): Optional list of spawned processes. Defaults to None
"""
if not (os.path.exists(walkoff.config.Config.ZMQ_PUBLIC_KEYS_PATH) and
Expand All @@ -82,6 +83,11 @@ def initialize_threading(self, app, pids=None):
logger.debug('Controller threading initialized')

def wait_and_reset(self, num_workflows):
"""Waits for all of the workflows to be completed
Args:
num_workflows (int): The number of workflows to wait for
"""
timeout = 0
shutdown = 10

Expand All @@ -94,8 +100,7 @@ def wait_and_reset(self, num_workflows):
self.receiver.workflows_executed = 0

def shutdown_pool(self):
"""Shuts down the threadpool.
"""
"""Shuts down the threadpool"""
self.manager.send_exit_to_worker_comms()
if len(self.pids) > 0:
for p in self.pids:
Expand All @@ -120,8 +125,7 @@ def shutdown_pool(self):
return

def cleanup_threading(self):
"""Once the threadpool has been shutdown, clear out all of the data structures used in the pool.
"""
"""Once the threadpool has been shutdown, clear out all of the data structures used in the pool"""
self.pids = []
self.receiver_thread = None
self.workflows_executed = 0
Expand All @@ -130,18 +134,18 @@ def cleanup_threading(self):
self.receiver = None

def execute_workflow(self, workflow_id, execution_id_in=None, start=None, start_arguments=None, resume=False):
"""Executes a workflow.
"""Executes a workflow
Args:
workflow_id (Workflow): The Workflow to be executed.
execution_id_in (str, optional): The optional execution ID to provide for the workflow. Should only be
execution_id_in (UUID, optional): The optional execution ID to provide for the workflow. Should only be
used (and is required) when resuming a workflow. Must be valid UUID4. Defaults to None.
start (str, optional): The ID of the first, or starting action. Defaults to None.
start (UUID, optional): The ID of the first, or starting action. Defaults to None.
start_arguments (list[Argument]): The arguments to the starting action of the workflow. Defaults to None.
resume (bool, optional): Optional boolean to resume a previously paused workflow. Defaults to False.
Returns:
The execution ID of the Workflow.
(UUID): The execution ID of the Workflow.
"""
workflow = self.execution_db.session.query(Workflow).filter_by(id=workflow_id).first()
if not workflow:
Expand All @@ -166,7 +170,10 @@ def pause_workflow(self, execution_id):
"""Pauses a workflow that is currently executing.
Args:
execution_id (str): The execution id of the workflow.
execution_id (UUID): The execution id of the workflow.
Returns:
(bool): True if Workflow successfully paused, False otherwise
"""
workflow_status = self.execution_db.session.query(WorkflowStatus).filter_by(
execution_id=execution_id).first()
Expand All @@ -181,7 +188,10 @@ def resume_workflow(self, execution_id):
"""Resumes a workflow that is currently paused.
Args:
execution_id (str): The execution id of the workflow.
execution_id (UUID): The execution id of the workflow.
Returns:
(bool): True if workflow successfully resumed, False otherwise
"""
workflow_status = self.execution_db.session.query(WorkflowStatus).filter_by(
execution_id=execution_id).first()
Expand All @@ -202,10 +212,13 @@ def resume_workflow(self, execution_id):
return False

def abort_workflow(self, execution_id):
"""Abort a workflow.
"""Abort a workflow
Args:
execution_id (str): The execution id of the workflow.
execution_id (UUID): The execution id of the workflow.
Returns:
(bool): True if successfully aborted workflow, False otherwise
"""
workflow_status = self.execution_db.session.query(WorkflowStatus).filter_by(
execution_id=execution_id).first()
Expand All @@ -231,10 +244,13 @@ def resume_trigger_step(self, execution_id, data_in, arguments=None):
"""Resumes a workflow awaiting trigger data, if the conditions are met.
Args:
execution_id (str): The execution ID of the workflow
execution_id (UUID): The execution ID of the workflow
data_in (dict): The data to send to the trigger
arguments (list[Argument], optional): Optional list of new Arguments for the trigger action.
Defaults to None.
Returns:
(bool): True if successfully resumed trigger step, false otherwise
"""
saved_state = self.execution_db.session.query(SavedWorkflow).filter_by(
workflow_execution_id=execution_id).first()
Expand Down Expand Up @@ -273,7 +289,7 @@ def get_waiting_workflows(self):
"""Gets a list of the execution IDs of workflows currently awaiting data to be sent to a trigger.
Returns:
A list of execution IDs of workflows currently awaiting data to be sent to a trigger.
(list[UUID]): A list of execution IDs of workflows currently awaiting data to be sent to a trigger.
"""
self.execution_db.session.expire_all()
wf_statuses = self.execution_db.session.query(WorkflowStatus).filter_by(
Expand All @@ -284,10 +300,10 @@ def get_workflow_status(self, execution_id):
"""Gets the current status of a workflow by its execution ID
Args:
execution_id (str): The execution ID of the workflow
execution_id (UUID): The execution ID of the workflow
Returns:
The status of the workflow
(int): The status of the workflow
"""
workflow_status = self.execution_db.session.query(WorkflowStatus).filter_by(
execution_id=execution_id).first()
Expand All @@ -304,10 +320,27 @@ def _log_and_send_event(self, event, sender=None, data=None):
event.send(sender, data=data)

def create_case(self, case_id, subscriptions):
"""Creates a Case
Args:
case_id (int): The ID of the Case
subscriptions (list[Subscription]): List of Subscriptions to subscribe to
"""
self.manager.create_case(case_id, subscriptions)

def update_case(self, case_id, subscriptions):
"""Updates a Case
Args:
case_id (int): The ID of the Case
subscriptions (list[Subscription]): List of Subscriptions to subscribe to
"""
self.manager.create_case(case_id, subscriptions)

def delete_case(self, case_id):
"""Deletes a Case
Args:
case_id (int): The ID of the Case to delete
"""
self.manager.delete_case(case_id)

0 comments on commit ba5b739

Please sign in to comment.