Skip to content

Commit

Permalink
Merge pull request #52 from jacobtomlinson/async
Browse files Browse the repository at this point in the history
Switch to an async event loop
  • Loading branch information
jacobtomlinson committed Nov 14, 2016
2 parents ff006da + e3c173f commit 16ad6d3
Show file tree
Hide file tree
Showing 23 changed files with 382 additions and 220 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ sudo: false
matrix:
fast_finish: true
include:
- python: "3.4"
env: TOXENV=py34
- python: "3.5"
env: TOXENV=lint
- python: "3.5"
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3-alpine
FROM python:3.5-alpine
MAINTAINER Jacob Tomlinson <jacob@tom.linson.uk>

RUN mkdir -p /usr/src/app
Expand All @@ -8,6 +8,7 @@ WORKDIR /usr/src/app
COPY . .

RUN apk update && apk add git
RUN pip3 install --upgrade pip
RUN pip3 install --no-cache-dir -r requirements.txt
RUN pip3 install -U tox

Expand Down
26 changes: 14 additions & 12 deletions docs/extending/connectors.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# Creating a connector

Connectors are a class which extends the base opsdroid Connector. The class has two mandatory methods, `connect` and `respond`.
Connectors are a class which extends the base opsdroid Connector. The class has three mandatory methods, `connect`, `listen` and `respond`.

#### connect
*connect* is a method which connects to a specific chat service and retrieves messages from it. Each message is formatted into an opsdroid Message object and then parsed. This method should block the thread with an infinite loop.
*connect* is a method which connects to a specific chat service

#### listen
*listen* uses the open connection to the chat service and retrieves messages from it. Each message is formatted into an opsdroid Message object and then parsed. This method should block the thread with an infinite loop but use `await` commands when getting new messages and parsing with opsdroid. This allows the [event loop](https://docs.python.org/3/library/asyncio-eventloop.html) to hand control of the thread to a different function while we are waiting.

#### respond
*respond* will take a Message object and return the contents to the chat service.
Expand All @@ -23,13 +26,15 @@ from opsdroid.message import Message

class MyConnector(Connector):

def connect(self, opsdroid):
async def connect(self, opsdroid):
# Create connection object with chat library
self.connection = chatlibrary.connect()
self.connection = await chatlibrary.connect()

async def listen(self, opsdroid):
# Listen for new messages from the chat service
while True:
# Get raw message from chat
raw_message = self.connection.get_next_message()
raw_message = await self.connection.get_next_message()

# Convert to opsdroid Message object
#
Expand All @@ -39,14 +44,11 @@ class MyConnector(Connector):
raw_message.room, self)

# Parse the message with opsdroid
opsdroid.parse(message)

# Sleep before processing next message
time.sleep(1)
await opsdroid.parse(message)

def respond(self, message):
async def respond(self, message):
# Send message.text back to the chat service
self.connection.send(raw_message.text, raw_message.user,
raw_message.room)
await self.connection.send(raw_message.text, raw_message.user,
raw_message.room)

```
12 changes: 6 additions & 6 deletions docs/extending/databases.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ from opsdroid.database import Database

class MyDatabase(Database):

def connect(self, opsdroid):
async def connect(self, opsdroid):
# Create connection object for database
self.connection = databaselibrary.connect()
self.connection = await databaselibrary.connect()

def put(self, key, value):
dasync ef put(self, key, value):
# Insert the object into the database
response = self.connection.insert(key, value)
response = await self.connection.insert(key, value)

# Return a bool for whether the insert was successful
return response.success

def get(self, key):
async def get(self, key):
# Get the object from the database and return it
return self.connection.find(key)
return await self.connection.find(key)

```
19 changes: 10 additions & 9 deletions docs/extending/skills.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Within this file should be functions which are decorated with an opsdroid skill
from opsdroid.skills import match_regex

@match_regex('hi')
def hello(opsdroid, message):
message.respond('Hey')
async def hello(opsdroid, message):
await message.respond('Hey')
```

In this example we are importing the `match_regex` decorator from the opsdroid skills library. We are then using it to decorate a simple hello world function.
Expand All @@ -22,6 +22,8 @@ The decorator takes a regular expression to match against the message received f

If the message matches the regular expression then the decorated function is called. As arguments opsdroid will pass a pointer to itself along with a Message object containing information about the message from the user.

To ensure the bot is responsive the concurrency controls introduced in Python 3.5 are used. This means that all functions which will be executed should be defined as an `async` function, and calls to functions which may require IO (like a connector or database) should be awaited with the `await` keyword. For more information see [asyncio](https://docs.python.org/3/library/asyncio.html) and [event loops](https://docs.python.org/3/library/asyncio-eventloop.html).

## Message object

The message object passed to the skill function is an instance of the opsdroid Message class which has the following properties and methods.
Expand Down Expand Up @@ -70,16 +72,15 @@ Stores the object provided for a specific key.
from opsdroid.skills import match_regex

@match_regex(r'remember (.*)')
def remember(opsdroid, message):
async def remember(opsdroid, message):
remember = message.regex.group(1)
opsdroid.memory.put("remember", remember)
message.respond("OK I'll remember that")
await opsdroid.memory.put("remember", remember)
await message.respond("OK I'll remember that")

@match_regex(r'remind me')
def remember(opsdroid, message):
message.respond(
opsdroid.memory.get("remember")
)
async def remember(opsdroid, message):
information = await opsdroid.memory.get("remember")
await message.respond(information)
```

In the above example we have defined two skill functions. The first takes whatever the user says after the work "remember" and stores it in the database.
Expand Down
20 changes: 12 additions & 8 deletions opsdroid/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
import argparse

from opsdroid.loader import Loader
from opsdroid.core import OpsDroid
from opsdroid.helper import set_logging_level
from opsdroid.const import LOG_FILENAME
Expand All @@ -19,6 +18,13 @@ def parse_args(args):
return parser.parse_args(args)


def check_dependencies():
"""Check for system dependencies required by opsdroid."""
if sys.version_info[0] < 3 or sys.version_info[1] < 5:
logging.critical("Whoops! opsdroid requires python 3.5 or above.")
sys.exit(1)


def main():
"""The main function."""
logging.basicConfig(filename=LOG_FILENAME, level=logging.INFO)
Expand All @@ -35,17 +41,15 @@ def main():
print(conf.read())
sys.exit(0)

check_dependencies()

with OpsDroid() as opsdroid:
loader = Loader(opsdroid)
opsdroid.config = loader.load_config_file([
"./configuration.yaml",
"~/.opsdroid/configuration.yaml",
"/etc/opsdroid/configuration.yaml"
])
opsdroid.load()
if "logging" in opsdroid.config:
set_logging_level(opsdroid.config['logging'])
loader.load_config(opsdroid.config)
opsdroid.start_loop()
opsdroid.exit()


if __name__ == "__main__":
main()
27 changes: 20 additions & 7 deletions opsdroid/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,37 @@ def __init__(self, config):
self.name = ""
self.config = config

def connect(self, opsdroid):
"""Connect to chat service and parse all messages.
async def connect(self, opsdroid):
"""Connect to chat service.
This method should create a connection to the desired chat service.
It should also be possible to call it multiple times in the event of
being disconnected.
Args:
opsdroid (OpsDroid): An instance of the opsdroid core.
"""
raise NotImplementedError

async def listen(self, opsdroid):
"""Listen to chat service and parse all messages.
This method should block the thread with an infinite loop and create
Message objects for chat messages coming from the service. It should
then call `opsdroid.parse(message)` on those messages.
then call `await opsdroid.parse(message)` on those messages.
Due to this method blocking, if multiple connectors are configured in
opsdroid they will be run in parallel using the multiprocessing
library.
As the method should include some kind of `while True` all messages
from the chat service should be "awaited" asyncronously to avoid
blocking the thread.
Args:
opsdroid (OpsDroid): An instance of the opsdroid core.
"""
raise NotImplementedError

def respond(self, message):
async def respond(self, message):
"""Send a message back to the chat service.
The message object will have a `text` property which should be sent
Expand Down
66 changes: 52 additions & 14 deletions opsdroid/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,34 @@
import logging
import sys
import weakref
from multiprocessing import Process
import asyncio

from opsdroid.helper import match
from opsdroid.memory import Memory
from opsdroid.connector import Connector
from opsdroid.database import Database
from opsdroid.loader import Loader


class OpsDroid():
"""Root object for opsdroid."""

# pylint: disable=too-many-instance-attributes
# All are reasonable in this case.

instances = []

def __init__(self):
"""Start opsdroid."""
self.bot_name = 'opsdroid'
self.sys_status = 0
self.connectors = []
self.connector_jobs = []
self.connector_tasks = []
self.eventloop = asyncio.get_event_loop()
self.skills = []
self.memory = Memory()
self.loader = Loader(self)
self.config = {}
logging.info("Created main opsdroid object")

def __enter__(self):
Expand All @@ -41,6 +49,8 @@ def exit(self):
"""Exit application."""
logging.info("Exiting application with return code " +
str(self.sys_status))
if self.eventloop.is_running():
self.eventloop.stop()
sys.exit(self.sys_status)

def critical(self, error, code):
Expand All @@ -50,7 +60,37 @@ def critical(self, error, code):
print("Error: " + error)
self.exit()

def start_connectors(self, connectors):
def load(self):
"""Load configuration."""
self.config = self.loader.load_config_file([
"./configuration.yaml",
"~/.opsdroid/configuration.yaml",
"/etc/opsdroid/configuration.yaml"
])

def start_loop(self):
"""Start the event loop."""
connectors, databases, skills = self.loader.load_config(self.config)
if databases is not None:
self.start_databases(databases)
self.setup_skills(skills)
self.start_connector_tasks(connectors)
try:
self.eventloop.run_forever()
except (KeyboardInterrupt, EOFError):
print('') # Prints a character return for return to shell
logging.info("Keyboard interrupt, exiting.")
self.exit()

def setup_skills(self, skills):
"""Call the setup function on the passed in skills."""
for skill in skills:
try:
skill["module"].setup(self)
except AttributeError:
pass

def start_connector_tasks(self, connectors):
"""Start the connectors."""
for connector_module in connectors:
for _, cls in connector_module["module"].__dict__.items():
Expand All @@ -61,21 +101,19 @@ def start_connectors(self, connectors):
connector = cls(connector_module["config"])
self.connectors.append(connector)

if len(self.connectors) == 1:
self.connectors[0].connect(self)
elif len(connectors) > 1:
if len(connectors) > 0:
for connector in self.connectors:
self.eventloop.run_until_complete(connector.connect(self))
for connector in self.connectors:
job = Process(target=connector.connect, args=(self,))
job.start()
self.connector_jobs.append(job)
for job in self.connector_jobs:
job.join()
task = self.eventloop.create_task(connector.listen(self))
self.connector_tasks.append(task)
else:
self.critical("All connectors failed to load", 1)

def start_databases(self, databases):
"""Start the databases."""
if len(databases) == 0:
logging.debug(databases)
logging.warning("All databases failed to load")
for database_module in databases:
for name, cls in database_module["module"].__dict__.items():
Expand All @@ -85,13 +123,13 @@ def start_databases(self, databases):
logging.debug("Adding database: " + name)
database = cls(database_module["config"])
self.memory.databases.append(database)
database.connect(self)
self.eventloop.run_until_complete(database.connect(self))

def load_regex_skill(self, regex, skill):
"""Load skills."""
self.skills.append({"regex": regex, "skill": skill})

def parse(self, message):
async def parse(self, message):
"""Parse a string against all skills."""
if message.text.strip() != "":
logging.debug("Parsing input: " + message.text)
Expand All @@ -100,4 +138,4 @@ def parse(self, message):
regex = match(skill["regex"], message.text)
if regex:
message.regex = regex
skill["skill"](self, message)
await skill["skill"](self, message)
6 changes: 3 additions & 3 deletions opsdroid/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, config):
self.client = None
self.database = None

def connect(self, opsdroid):
async def connect(self, opsdroid):
"""Connect to chat service and store the connection object.
This method should connect to the given database using a native
Expand All @@ -40,7 +40,7 @@ def connect(self, opsdroid):
"""
raise NotImplementedError

def put(self, key, data):
async def put(self, key, data):
"""Store the data object in a database against the key.
The data object will need to be serialised in a sensible way which
Expand All @@ -57,7 +57,7 @@ def put(self, key, data):
"""
raise NotImplementedError

def get(self, key):
async def get(self, key):
"""Return a data object for a given key.
Args:
Expand Down

0 comments on commit 16ad6d3

Please sign in to comment.