Skip to content

Commit

Permalink
feat: add blacklist/whitelist, exclusion and publisher indetinfication
Browse files Browse the repository at this point in the history
  • Loading branch information
cydrickn committed Jul 20, 2024
1 parent 9f1a42c commit d8306da
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 22 deletions.
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ That will now run the server
- [ ] Add Feature
- [ ] Add checking of role

## RPC Features
### RPC Features

| Feature | Status |
|-------------------------------------------------------------------|---------|
Expand All @@ -150,20 +150,32 @@ That will now run the server
| [(Interface) Procedure Reflection](#interface-reflection) | ✗ |


## PubSub Features
### PubSub Features

| Feature | Status |
|-----------------------------------------------------------|---------|
| [Subscriber Blackwhite Listing](#pubsub-bw-listing) | ✗ |
| [Publisher Exclusion](#pubsub-pub-exclusion) | ✗ |
| [Publisher Identification](#pubsub-pub-identification) | ✗ |
| [Subscriber Blackwhite Listing](#pubsub-bw-listing) | ✓ |
| [Publisher Exclusion](#pubsub-pub-exclusion) | ✓ |
| [Publisher Identification](#pubsub-pub-identification) | ✓ |
| [Publication Trustlevels](#pubsub-pub-trustlevels) | ✗ |
| [Subscription Meta API](#pubsub-sub-metapi) | ✗ |
| [Pattern-based Subscription](#pattern-based-subscription) | ✗ |
| [Sharded Subscription](#pubsub-sharded-subscription) | ✗ |
| [Event History](#pubsub-event-history) | ✗ |
| [(Interface) Topic Reflection](#interface-reflection) | ✗ |

### Others

| Feature | Status |
|-----------------------------|---------|
| Feature Announcement | ✓ |
| Broker Session Meta API | ✓ |
| Dealer Session Meta API | ✗ |
| RawSocket Transport | ✗ |
| Batched WebSocket transport | ✗ |
| Call Rerouting | ✗ |
| Payload Passthru Mode | ✗ |

## TODOs

- [ ] Implement CBOR Serializer https://wamp-proto.org/wamp_bp_latest_ietf.html#name-serializers
Expand Down
33 changes: 21 additions & 12 deletions src/Auth/AuthManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
use Octamp\Wamp\Auth\Ticket\TicketStaticAuthenticator;
use Octamp\Wamp\Auth\WampCra\WampCraDynamicAuthenticator;
use Octamp\Wamp\Auth\WampCra\WampCraStaticAuthenticator;
use Octamp\Wamp\Event\JoinRealmEvent;
use Octamp\Wamp\Peers\Router;
use Octamp\Wamp\Realm\RealmManager;
use Octamp\Wamp\Session\Session;
use OpenSwoole\Coroutine;
Expand Down Expand Up @@ -51,6 +53,8 @@ class AuthManager implements WithRealmManagerInterface

protected ?RealmManager $realmManager = null;

protected ?Router $router = null;

public function __construct(array $auths, protected EventDispatcherInterface $eventDispatcher)
{
foreach ($auths as $auth) {
Expand Down Expand Up @@ -155,13 +159,7 @@ public function processHelloMessage(Session $session, HelloMessage $message): vo
$session->sendMessage(new ChallengeMessage($authMethod, $challengeDetails));
} elseif ($status === self::STATUS_NO_CHALLENGE) {
$session->setAuthenticated(true);
$details = $session->getAuthenticationDetails()->jsonSerialize();
// todo update roles for details
$details = array_merge($details, (array) $message->getDetails());
$session->sendMessage(new WelcomeMessage(
$session->getSessionId(),
$details
));
$this->sendWelecome($session);
}
});
}
Expand Down Expand Up @@ -210,14 +208,20 @@ public function processAuthenticateMessage(Session $session, AuthenticateMessage
}

$session->setAuthenticated(true);
$details = $session->getAuthenticationDetails()->jsonSerialize();
// todo update roles for details
$details = array_merge($details, (array) $session->getHelloMessage()->getDetails());
$this->sendWelecome($session);
}

$session->sendMessage(new WelcomeMessage(
protected function sendWelecome(Session $session)
{
$details = $session->getAuthenticationDetails()->jsonSerialize();
$message = new WelcomeMessage(
$session->getSessionId(),
$details
));
);
$this->router?->addFeature($message);
$session->sendMessage($message);

$session->getRealm()->handle($session, new JoinRealmEvent($session));
}

/**
Expand Down Expand Up @@ -248,4 +252,9 @@ public function setRealmManager(RealmManager $realmManager): void
}
}
}

public function setRouter(Router $router): void
{
$this->router = $router;
}
}
12 changes: 12 additions & 0 deletions src/Event/JoinRealmEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace Octamp\Wamp\Event;

use Octamp\Wamp\Session\Session;

class JoinRealmEvent implements EventInterface
{
public function __construct(public readonly Session $session)
{
}
}
18 changes: 18 additions & 0 deletions src/Peers/Router.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Thruway\Logging\Logger;
use Thruway\Message\GoodbyeMessage;
use Thruway\Message\Message;
use Thruway\Message\WelcomeMessage;

class Router
{
Expand Down Expand Up @@ -73,4 +74,21 @@ public function onGoodbyeMessage(Session $session, GoodbyeMessage $message): voi
$session->setGoodByeSent(true);
$session->shutdown();
}

public function getFeatures(): object
{
$roles = new \stdClass();
foreach ($this->roles as $role) {
$roles->{$role->getName()} = $role->getFeatures();
}

return $roles;
}

public function addFeature(WelcomeMessage $message): void
{
foreach ($this->roles as $role) {
$message->addFeatures($role->getName(), $role->getFeatures());
}
}
}
5 changes: 4 additions & 1 deletion src/Realm/Realm.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Octamp\Server\Connection\Connection;
use Octamp\Wamp\Auth\AuthManager;
use Octamp\Wamp\Event\EventInterface;
use Octamp\Wamp\Event\JoinRealmEvent;
use Octamp\Wamp\Event\LeaveRealmEvent;
use Octamp\Wamp\Peers\Router;
use Octamp\Wamp\Session\Session;
Expand Down Expand Up @@ -77,8 +78,10 @@ public function onLeaveRealmEvent(Session $session, LeaveRealmEvent $event): voi

public function publishMeta(string $topicName, array $arguments, ?object $argumentsKw = null, ?object $options = null): void
{
$session = $this->getMetaSession();
$id = $session->incrementWampId();
$this->handle($this->getMetaSession(), new PublishMessage(
Utils::getUniqueId(),
$id,
$options,
$topicName,
$arguments,
Expand Down
122 changes: 119 additions & 3 deletions src/Roles/Broker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Octamp\Wamp\Roles;

use Octamp\Wamp\Adapter\AdapterInterface;
use Octamp\Wamp\Event\JoinRealmEvent;
use Octamp\Wamp\Event\LeaveRealmEvent;
use Octamp\Wamp\Helper\IDHelper;
use Octamp\Wamp\Session\Session;
Expand Down Expand Up @@ -77,15 +78,59 @@ public function onPublishMessage(Session $session, PublishMessage $message): voi
$message->setPublicationId(IDHelper::generateGlobalWampID());
}

$options = $message->getOptions();
$excludeSessions = $options->exclude ?? [];
$excludeAuths = $options->exclude_authid ?? [];
$excludeRoles = $options->exclude_authrole ?? [];

$eligibleSession = $options->eligible ?? null;
$eligibleAuths = $options->eligible_authid ?? null;
$eligibleRoles = $options->eligible_authrole ?? null;

$excludeMe = $message->excludeMe();

$subscriptionGroupsUri = $this->adapter->keys('sub:*');
foreach ($subscriptionGroupsUri as $subscriptionGroupUri) {
$messageUri = substr($subscriptionGroupUri, 4);
if ($messageUri === $message->getUri()) {
$subscriptionsRaw = $this->adapter->get($subscriptionGroupUri);
foreach ($subscriptionsRaw as $key => $subscriptionRaw) {
$subscription = $this->getSubscription($messageUri, $key, $subscriptionRaw);
$sessionId = $subscription->getSession()->getSessionId();
$authId = $subscription->getSession()->getAuthenticationDetails()->getAuthId();
$authRole = $subscription->getSession()->getAuthenticationDetails()->getAuthRole();

if (
($eligibleSession !== null && !in_array($sessionId, $eligibleSession))
|| ($eligibleAuths !== null && !in_array($authId, $eligibleAuths))
|| ($eligibleRoles !== null && !in_array($authId, $eligibleRoles))
) {
continue;
}
if (
in_array($sessionId, $excludeSessions)
|| in_array($authId, $excludeAuths)
|| in_array($authRole, $excludeRoles)
) {
continue;
}

if ($excludeMe && $sessionId === $session->getSessionId()) {
continue;
}

$eventMsg = EventMessage::createFromPublishMessage($message, $subscription->getId());
// do some additional conditions
$discloseMe = $message->getOptions()->disclose_me ?? false;
if ($discloseMe || $subscription->isDisclosePublisher()) {
$eventMsg->getDetails()->publisher = $session->getSessionId();
if ($authId !== null) {
$eventMsg->getDetails()->publisher_authid = $authId;
}
if ($authRole !== null) {
$eventMsg->getDetails()->publisher_authrole = $authId;
}
}

$subscription->sendEventMessage($eventMsg);
}
}
Expand All @@ -107,9 +152,8 @@ protected function getSubscription(string $groupKey, string $key, ?array $raw =
return null;
}
$subscribeMessage = SubscribeMessage::createMessageFromArray($raw['message']);
$subscribeMessage = Subscription::createSubscriptionFromSubscribeMessage($session, $subscribeMessage, $raw['subscriptionId']);

return $subscribeMessage;
return Subscription::createSubscriptionFromSubscribeMessage($session, $subscribeMessage, $raw['subscriptionId']);
}

public function onSubscribeMessage(Session $session, SubscribeMessage $message): void
Expand Down Expand Up @@ -173,6 +217,63 @@ public function onLeaveRealmEvent(Session $session, LeaveRealmEvent $event): voi
}
}
}

if (!$event->session->isAuthenticated()) {
return;
}

$subscriptionGroupsUri = $this->adapter->keys('sub:');
foreach ($subscriptionGroupsUri as $subscriptionGroupUri) {
$messageUri = substr($subscriptionGroupUri, 4);
if ($messageUri === 'wamp.session.on_leave') {
$subscriptionsRaw = $this->adapter->get($subscriptionGroupUri);
foreach ($subscriptionsRaw as $key => $subscriptionRaw) {
$subscription = $this->getSubscription($messageUri, $key, $subscriptionRaw);
$sessionId = $subscription->getSession()->getSessionId();
$authId = $subscription->getSession()->getAuthenticationDetails()->getAuthId();
$authRole = $subscription->getSession()->getAuthenticationDetails()->getAuthRole();

$eventMsg = EventMessage::createFromPublishMessage(new PublishMessage(IDHelper::generateGlobalWampID(), [], 'wamp.session.on_leave'), $subscription->getId());
$eventMsg->getDetails()->session = $sessionId;
$eventMsg->getDetails()->authid = $authId;
$eventMsg->getDetails()->authrole = $authRole;

$subscription->sendEventMessage($eventMsg);
}
}
}
}

public function onJoinRealmEvent(Session $session, JoinRealmEvent $event): void
{
if (!$event->session->isAuthenticated()) {
return;
}

$subscriptionGroupsUri = $this->adapter->keys('sub:');
foreach ($subscriptionGroupsUri as $subscriptionGroupUri) {
$messageUri = substr($subscriptionGroupUri, 4);
if ($messageUri === 'wamp.session.on_join') {
$subscriptionsRaw = $this->adapter->get($subscriptionGroupUri);
foreach ($subscriptionsRaw as $key => $subscriptionRaw) {
$subscription = $this->getSubscription($messageUri, $key, $subscriptionRaw);
$sessionId = $subscription->getSession()->getSessionId();
$authId = $subscription->getSession()->getAuthenticationDetails()->getAuthId();
$authRole = $subscription->getSession()->getAuthenticationDetails()->getAuthRole();
$authMethod = $subscription->getSession()->getAuthenticationDetails()->getAuthMethod();
$authProvider = $subscription->getSession()->getAuthenticationDetails()->getAuthProvider();

$eventMsg = EventMessage::createFromPublishMessage(new PublishMessage(IDHelper::generateGlobalWampID(), [], 'wamp.session.on_join'), $subscription->getId());
$eventMsg->getDetails()->session = $sessionId;
$eventMsg->getDetails()->authid = $authId;
$eventMsg->getDetails()->authrole = $authRole;
$eventMsg->getDetails()->authmethod = $authMethod;
$eventMsg->getDetails()->authprovider = $authProvider;

$subscription->sendEventMessage($eventMsg);
}
}
}
}

protected function removeSubscription(Subscription $subscription): void
Expand All @@ -193,4 +294,19 @@ protected function removeSubscriptionGroup(string $uri): void
}
}
}

public function getName(): string
{
return 'broker';
}

public function getFeatures(): object
{
$features = new \stdClass();
$features->subscriber_blackwhite_listing = true;
$features->publisher_exclusion = true;
$features->publisher_identification = true;

return $features;
}
}
10 changes: 10 additions & 0 deletions src/Roles/Dealer.php
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,14 @@ public function tryDeleteProcedure(string $name): void
unset($this->procedures[$name]);
}
}

public function getName(): string
{
return 'dealer';
}

public function getFeatures(): object
{
return new \stdClass();
}
}
4 changes: 4 additions & 0 deletions src/Roles/RoleInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@
interface RoleInterface
{
public function handle(Session $session, Message $message): void;

public function getName(): string;

public function getFeatures(): object;
}
2 changes: 1 addition & 1 deletion src/Session/Adapter/RedisAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public function generateId(): string
$prefix = date('Ymd');
$id = $this->adapter->inc('sesid:current', 1, $prefix);

return $prefix . str_pad($id, 6, 0, STR_PAD_LEFT);
return $prefix . str_pad((string) $id, 6, '0', STR_PAD_LEFT);
}

public function saveSession(Session $session): void
Expand Down
2 changes: 2 additions & 0 deletions src/Wamp.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public function init(): void
$router->addRole(new Broker($this->adapter, $sessionStorage, $this->serverId));
$router->addRole(new Dealer($this->adapter, $sessionStorage, $this->serverId));

$this->authManager->setRouter($router);

$router->addTransportProviders($this->transportProviders);

$connection = DummyConnection::createFromArray([
Expand Down

0 comments on commit d8306da

Please sign in to comment.