Skip to content

Commit

Permalink
- Removing PubSub namespace.
Browse files Browse the repository at this point in the history
- Removing Dispatcher::listen() method.
- Adding DispatchException class to be thrown if Message cannot be dispatched.
- Queue::listen() now calls \pcntl_signal_dispatch() after passing message (if any) to callback.
- Adding Queue::shutdown() message that will exit infinite loop in listen().
- Setting some default values in Beanstalk instead of falling back to null.
- Removing release script and everything associated with. Releases now handled through Github.
  • Loading branch information
brentscheffler committed Oct 18, 2019
1 parent 6b9c050 commit b5677b4
Show file tree
Hide file tree
Showing 22 changed files with 891 additions and 993 deletions.
8 changes: 8 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
root=true

[*]
end_of_line=lf
indent_style=tab
indent_size=4
charset=utf-8
trim_trailing_whitespace=true
7 changes: 1 addition & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
.PHONY: release

test:
vendor/bin/phpunit

coverage:
vendor/bin/phpunit --coverage-clover=build/logs/clover.xml

analyze:
vendor/bin/psalm

release:
php release
vendor/bin/psalm
170 changes: 90 additions & 80 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,51 @@
# Syndicate

See the syndicate-framework repo for framework documentation.
## Install

## Using without a Router and Dispatcher
```bash
composer require nimbly/syndicate
```

## Basic usage

### Create Queue instance

```php
$queue = new Syndicate\Queue\Sqs(
getenv("SQS_QUEUE_URL"),
"https://queue.url",
new SqsClient([
'version' => 'latest',
'region' => 'us-west-2'
])
);
```

### Set a serializer and deserializer callback (optional)
### Listen on queue

The serializer callback is applied to each outgoing queue message payload. It defaults to ```\json_encode```.
Listening is a **blocking** call and runs in an infinite loop. Your callback will be triggered when a new Message has arrived.

You can pass in any ```\callable``` instance as the serializer.
```php
$queue->listen(function(Message $message): void {

/**
*
* Process the message...
*
*/

// Delete the message from Queue.
$message->delete();

});
```

### Setting a custom serializer and deserializer

By default, Syndicate assumes your messages are JSON and will attempt to auto serialize/deserialize accordingly.

However, if your messages are in some other format, you may supply your own serializer and deserializer callbacks.

The serializer is applied to all outgoing message payloads.

```php
$queue->setSerializer(function($message){
Expand All @@ -29,20 +55,16 @@ $queue->setSerializer(function($message){
});
```

The deserializer callback is applied to each incoming queue message payload. It defaults to ```\json_decode```.
The deserializer callback is applied to all incoming message payloads.

You can pass in any ```\callable``` instance as the deserializer. This is useful for performing more advanced
message deserialization.

For example, to handle deserializing a message payload from SQS that was forwarded by SNS, you could pass in
the following.
For example, to handle deserializing a message payload from SQS that was forwarded by SNS, you could pass in the following deserializer callback.

```php
$queue->setDeserializer(function($payload){
$queue->setDeserializer(function($payload) {

$payload = \json_decode($payload);

if( property_exists($payload, "Type") &&
if( \property_exists($payload, "Type") &&
$payload->Type === "Notification" ){
return \json_decode($payload->Message);
}
Expand All @@ -52,113 +74,101 @@ $queue->setDeserializer(function($payload){
});
```

### Listen on queue
### Shutting down the Queue

Listening is a blocking call and your callback will be triggered when a new Message has arrived.
You may shutdown the queue by using the ```shutdown()``` method.

The Queue instance will respond to PCNTL signals in a safe manner that will not interrupt in the middle of Message processing.
You can install signal handlers in your code to cleanly and safely shutdown the service.

```php
$queue->listen(function(Message $message){
\pcntl_signal(
SIGINT,
function() use ($queue): void {

echo "Message received!\n";
$message->delete();
echo "Got SIGINT. Shutting queue down.";
$queue->shutdown();

});
}
);
```

## Using a Router and Dispatcher
## Routing and Dispatching

Using a Router and Dispatcher you can have your messages passed off to specific Handlers. How you route is up to you and the message format.
Using the `Dispatcher` and `Router` you can have your messages passed off to specific Handlers. How you route is up to you and the message format.

Commonly, a message will contain a message type or event name - these are prime candidates for keys to routing.

### Router
Create a new ```Router``` by passing in the ```\callable``` route resolver and an ```array``` of key and value pairs as the route definitions.

Create a new `Router` instance by passing in a `\callable` route resolver and an `array` of key and value pairs as route definitions.

### Route resolver

The route resolver is responsible for taking the incoming Message instance and finding a matching route to dispatch the Message to.

The dispatcher will loop through all configured routes and call the resolver with the Message and a route.

The resolver must simple return a `bool` value indicating whether the message matches the given route.


### Route definitions

The route definitions are an array of key/value pairs mapping any key you want to either a `callable`, `string` in the format of `Full\Namespace\ClassName@methodName`, or an array of the above.


```php
$router = new Router(function(Message $message, string $route){
$router = new Router(function(Message $message, string $routeKey): bool {

return $message->getPayload()->eventName == $route;
return $message->getPayload()->eventName == $routeKey;

}, [

'UserRegistered' => ["\App\Handlers\UserHandler", "userRegistered"],
'UserClosedAccount' => ["\App\Handlers\UserHandler", "userAccountClosed"]
'UserLoggedOff' => function(Message $message): void {
// Do some session cleanup stuff...
},

'UserRegistered' => '\App\Handlers\UserHandler@userRegistered',

'UserClosedAccount' => [
'\App\Handlers\UserHandler@userAccountClosed',
'\App\Handlers\NotificationHandler@userAccountClosed'
]

]);
```

### Dispatcher
Create a new ```Dispatcher``` by passing the ```Router``` instance.

Create a new ```Dispatcher``` instance by passing the ```Router``` instance.

```php
$dispatcher = new Dispatcher($router);
```

### Add a default handler
If the ```Router``` cannot resolve a route for the ```Message```, the ```Dispatcher``` will attempt to pass the message off to the default handler.

If the ```Router``` cannot resolve a route for the ```Message```, the ```Dispatcher``` will attempt to pass the message off to its default handler.

The default handler can be set as a ```callable``` and accepts the ```Message``` instance.

```php
$dispatcher->setDefaultHandler(function(Message $message){
$dispatcher->setDefaultHandler(function(Message $message): void {

Log::critical("No route defined for {$message->getPayload()->eventName}!");
$message->release();

});
```

### Starting the listener
```php
$dispatcher->listen($queue);
```

## Putting it all together
```php
// Create Queue instance
$queue = new Syndicate\Queue\Sqs(
getenv("SQS_QUEUE_URL"),
new SqsClient([
'version' => 'latest',
'region' => 'us-west-2'
])
);

// Set a custom deserializer.
$queue->setDeserializer(function($payload){

$payload = \json_decode($payload);

if( property_exists($payload, "Type") &&
$payload->Type === "Notification" ){
return \json_decode($payload->Message);
}

return $payload;

});

// Create Router instance with a resolver and our list of routes.
$router = new Router(function(Message $message, string $route){

return $message->getPayload()->eventName == $route;

}, [
If the Message cannot be dispatched and no default handler was given, a `DispatchException` will be thrown.

'UserRegistered' => ["\App\Handlers\UserHandler", "userRegistered"],
'UserClosedAccount' => ["\App\Handlers\UserHandler", "userAccountClosed"]
### Using the Dispatcher with the Queue

]);

// Create Dispatcher instance.
$dispatcher = new Dispatcher($router);

// Set a default handler.
$dispatcher->setDefaultHandler(function(Message $message){
```php
$queue->listen(function(Message $message) use ($dispatcher): void {

Log::critical("No route defined for {$message->getPayload()->eventName}!");
$message->release();
$dispatcher->dispatch($message);

});

// Listen for new messages.
$dispatcher->listen();
```
1 change: 0 additions & 1 deletion VERSION

This file was deleted.

38 changes: 19 additions & 19 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"name": "nimbly\/syndicate",
"description": "Framework agnostic queue and pubsub library.",
"version": "0.5.1",
"name": "nimbly/syndicate",
"description": "Simple queue consumer framework that supports message dispatching.",
"type": "library",
"license": "MIT",
"authors": [
Expand All @@ -11,29 +10,30 @@
}
],
"suggest": {
"pda\/pheanstalk": "To add support for Beanstalkd queue.",
"predis\/predis": "To add support for Redis based queue and pub\/sub functionality.",
"aws\/aws-sdk-php": "To add support for AWS Simple Queue Service (SQS).",
"google\/cloud-pubsub": "To add support for Google Cloud Pubsub."
"pda/pheanstalk": "To add support for Beanstalkd queue.",
"predis/predis": "To add support for Redis based queue and pub/sub functionality.",
"aws/aws-sdk-php": "To add support for AWS Simple Queue Service (SQS).",
"google/cloud-pubsub": "To add support for Google Cloud Pubsub."
},
"require": {
"php": ">=7.0"
"php": ">=7.2",
"ext-json": "*"
},
"require-dev": {
"vimeo\/psalm": "^3.1",
"phpunit\/phpunit": "^8.0",
"pda\/pheanstalk": "~3.0",
"aws\/aws-sdk-php": "~3.0",
"predis\/predis": "~1.0",
"google\/cloud-pubsub": "^1.7",
"symfony\/var-dumper": "^4.2",
"php-coveralls\/php-coveralls": "^2.1",
"squizlabs\/php_codesniffer": "^3.4",
"phploc\/phploc": "^5.0"
"vimeo/psalm": "^3.1",
"phpunit/phpunit": "^8.0",
"pda/pheanstalk": "~3.0",
"aws/aws-sdk-php": "~3.0",
"predis/predis": "~1.0",
"google/cloud-pubsub": "^1.7",
"symfony/var-dumper": "^4.2",
"php-coveralls/php-coveralls": "^2.1",
"squizlabs/php_codesniffer": "^3.4",
"phploc/phploc": "^5.0"
},
"autoload": {
"psr-4": {
"Syndicate\\": "src\/"
"Syndicate\\": "src/"
}
}
}
Loading

0 comments on commit b5677b4

Please sign in to comment.