Skip to content

Commit

Permalink
Refactor asynchronous events to central location.
Browse files Browse the repository at this point in the history
  • Loading branch information
fancycode committed Jun 24, 2022
1 parent ddb7ece commit 0115c97
Show file tree
Hide file tree
Showing 16 changed files with 1,085 additions and 388 deletions.
4 changes: 2 additions & 2 deletions Makefile
Expand Up @@ -95,10 +95,10 @@ coverhtml: vet common
PATH="$(GODIR)":$(PATH) "$(GOPATHBIN)/easyjson" -all $*.go

common: \
api_async_easyjson.go \
api_backend_easyjson.go \
api_proxy_easyjson.go \
api_signaling_easyjson.go \
natsclient_easyjson.go
api_signaling_easyjson.go

$(BINDIR):
mkdir -p $(BINDIR)
Expand Down
38 changes: 38 additions & 0 deletions api_async.go
@@ -0,0 +1,38 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling

import "time"

type AsyncMessage struct {
SendTime time.Time `json:"sendtime"`

Type string `json:"type"`

Message *ServerMessage `json:"message,omitempty"`

Room *BackendServerRoomRequest `json:"room,omitempty"`

Permissions []Permission `json:"permissions,omitempty"`

Id string `json:"id"`
}
210 changes: 210 additions & 0 deletions async_events.go
@@ -0,0 +1,210 @@
/**
* Standalone signaling server for the Nextcloud Spreed app.
* Copyright (C) 2022 struktur AG
*
* @author Joachim Bauch <bauch@struktur.de>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package signaling

import "sync"

type AsyncBackendRoomEventListener interface {
ProcessBackendRoomRequest(request *BackendServerRoomRequest)
}

type AsyncRoomEventListener interface {
ProcessAsyncRoomMessage(message *AsyncMessage)
}

type AsyncUserEventListener interface {
ProcessAsyncUserMessage(message *AsyncMessage)
}

type AsyncSessionEventListener interface {
ProcessAsyncSessionMessage(message *AsyncMessage)
}

type AsyncEvents interface {
Close()

RegisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener) error
UnregisterBackendRoomListener(roomId string, backend *Backend, listener AsyncBackendRoomEventListener)

RegisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener) error
UnregisterRoomListener(roomId string, backend *Backend, listener AsyncRoomEventListener)

RegisterUserListener(userId string, backend *Backend, listener AsyncUserEventListener) error
UnregisterUserListener(userId string, backend *Backend, listener AsyncUserEventListener)

RegisterSessionListener(sessionId string, backend *Backend, listener AsyncSessionEventListener) error
UnregisterSessionListener(sessionId string, backend *Backend, listener AsyncSessionEventListener)

PublishBackendRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error
PublishRoomMessage(roomId string, backend *Backend, message *AsyncMessage) error
PublishUserMessage(userId string, backend *Backend, message *AsyncMessage) error
PublishSessionMessage(sessionId string, backend *Backend, message *AsyncMessage) error
}

func NewAsyncEvents(url string) (AsyncEvents, error) {
client, err := NewNatsClient(url)
if err != nil {
return nil, err
}

return NewAsyncEventsNats(client)
}

type asyncBackendRoomSubscriber struct {
mu sync.Mutex

listeners map[AsyncBackendRoomEventListener]bool
}

func (s *asyncBackendRoomSubscriber) processBackendRoomRequest(message *BackendServerRoomRequest) {
s.mu.Lock()
defer s.mu.Unlock()

for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessBackendRoomRequest(message)
s.mu.Lock()
}
}

func (s *asyncBackendRoomSubscriber) addListener(listener AsyncBackendRoomEventListener) {
s.mu.Lock()
defer s.mu.Unlock()

if s.listeners == nil {
s.listeners = make(map[AsyncBackendRoomEventListener]bool)
}
s.listeners[listener] = true
}

func (s *asyncBackendRoomSubscriber) removeListener(listener AsyncBackendRoomEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()

delete(s.listeners, listener)
return len(s.listeners) > 0
}

type asyncRoomSubscriber struct {
mu sync.Mutex

listeners map[AsyncRoomEventListener]bool
}

func (s *asyncRoomSubscriber) processAsyncRoomMessage(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()

for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessAsyncRoomMessage(message)
s.mu.Lock()
}
}

func (s *asyncRoomSubscriber) addListener(listener AsyncRoomEventListener) {
s.mu.Lock()
defer s.mu.Unlock()

if s.listeners == nil {
s.listeners = make(map[AsyncRoomEventListener]bool)
}
s.listeners[listener] = true
}

func (s *asyncRoomSubscriber) removeListener(listener AsyncRoomEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()

delete(s.listeners, listener)
return len(s.listeners) > 0
}

type asyncUserSubscriber struct {
mu sync.Mutex

listeners map[AsyncUserEventListener]bool
}

func (s *asyncUserSubscriber) processAsyncUserMessage(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()

for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessAsyncUserMessage(message)
s.mu.Lock()
}
}

func (s *asyncUserSubscriber) addListener(listener AsyncUserEventListener) {
s.mu.Lock()
defer s.mu.Unlock()

if s.listeners == nil {
s.listeners = make(map[AsyncUserEventListener]bool)
}
s.listeners[listener] = true
}

func (s *asyncUserSubscriber) removeListener(listener AsyncUserEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()

delete(s.listeners, listener)
return len(s.listeners) > 0
}

type asyncSessionSubscriber struct {
mu sync.Mutex

listeners map[AsyncSessionEventListener]bool
}

func (s *asyncSessionSubscriber) processAsyncSessionMessage(message *AsyncMessage) {
s.mu.Lock()
defer s.mu.Unlock()

for listener := range s.listeners {
s.mu.Unlock()
listener.ProcessAsyncSessionMessage(message)
s.mu.Lock()
}
}

func (s *asyncSessionSubscriber) addListener(listener AsyncSessionEventListener) {
s.mu.Lock()
defer s.mu.Unlock()

if s.listeners == nil {
s.listeners = make(map[AsyncSessionEventListener]bool)
}
s.listeners[listener] = true
}

func (s *asyncSessionSubscriber) removeListener(listener AsyncSessionEventListener) bool {
s.mu.Lock()
defer s.mu.Unlock()

delete(s.listeners, listener)
return len(s.listeners) > 0
}

0 comments on commit 0115c97

Please sign in to comment.