-
-
Notifications
You must be signed in to change notification settings - Fork 300
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add new status and notification websocket streaming capabilities
- Loading branch information
1 parent
ad2e982
commit aa8a0d0
Showing
21 changed files
with
621 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package streaming | ||
|
||
import ( | ||
"fmt" | ||
"net/http" | ||
"time" | ||
|
||
"github.com/gin-gonic/gin" | ||
"github.com/gorilla/websocket" | ||
) | ||
|
||
// StreamGETHandler handles the creation of a new websocket streaming request. | ||
func (m *Module) StreamGETHandler(c *gin.Context) { | ||
l := m.log.WithField("func", "StreamGETHandler") | ||
|
||
streamType := c.Query(StreamQueryKey) | ||
if streamType == "" { | ||
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("no stream type provided under query key %s", StreamQueryKey)}) | ||
return | ||
} | ||
|
||
accessToken := c.Query(AccessTokenQueryKey) | ||
if accessToken == "" { | ||
c.JSON(http.StatusUnauthorized, gin.H{"error": fmt.Sprintf("no access token provided under query key %s", AccessTokenQueryKey)}) | ||
return | ||
} | ||
|
||
// make sure a valid token has been provided and obtain the associated account | ||
account, err := m.processor.AuthorizeStreamingRequest(accessToken) | ||
if err != nil { | ||
c.JSON(http.StatusUnauthorized, gin.H{"error": "could not authorize with given token"}) | ||
return | ||
} | ||
|
||
// prepare to upgrade the connection to a websocket connection | ||
upgrader := websocket.Upgrader{ | ||
ReadBufferSize: 1024, | ||
WriteBufferSize: 1024, | ||
CheckOrigin: func(r *http.Request) bool { | ||
// we fully expect cors requests (via something like pinafore.social) so we should be lenient here | ||
return true | ||
}, | ||
} | ||
|
||
// do the actual upgrade here | ||
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) | ||
if err != nil { | ||
l.Infof("error upgrading websocket connection: %s", err) | ||
return | ||
} | ||
defer conn.Close() // whatever happens, when we leave this function we want to close the websocket connection | ||
|
||
// inform the processor that we have a new connection and want a stream for it | ||
stream, errWithCode := m.processor.OpenStreamForAccount(account, streamType) | ||
if errWithCode != nil { | ||
c.JSON(errWithCode.Code(), errWithCode.Safe()) | ||
return | ||
} | ||
defer close(stream.Hangup) // closing stream.Hangup indicates that we've finished with the connection (the client has gone), so we want to do this on exiting this handler | ||
|
||
// spawn a new ticker for pinging the connection periodically | ||
t := time.NewTicker(30 * time.Second) | ||
|
||
// we want to stay in the sendloop as long as possible while the client is connected -- the only thing that should break the loop is if the client leaves or something else goes wrong | ||
sendLoop: | ||
for { | ||
select { | ||
case m := <-stream.Messages: | ||
// we've got a streaming message!! | ||
l.Debug("received message from stream") | ||
if err := conn.WriteJSON(m); err != nil { | ||
l.Infof("error writing json to websocket connection: %s", err) | ||
// if something is wrong we want to bail and drop the connection -- the client will create a new one | ||
break sendLoop | ||
} | ||
l.Debug("wrote message into websocket connection") | ||
case <-t.C: | ||
l.Debug("received TICK from ticker") | ||
if err := conn.WriteMessage(websocket.PingMessage, []byte(": ping")); err != nil { | ||
l.Infof("error writing ping to websocket connection: %s", err) | ||
// if something is wrong we want to bail and drop the connection -- the client will create a new one | ||
break sendLoop | ||
} | ||
l.Debug("wrote ping message into websocket connection") | ||
} | ||
} | ||
|
||
l.Debug("leaving StreamGETHandler") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
GoToSocial | ||
Copyright (C) 2021 GoToSocial Authors admin@gotosocial.org | ||
This program is free software: you can redistribute it and/or modify | ||
it under the terms of the GNU Affero General Public License as published by | ||
the Free Software Foundation, either version 3 of the License, or | ||
(at your option) any later version. | ||
This program is distributed in the hope that it will be useful, | ||
but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
GNU Affero General Public License for more details. | ||
You should have received a copy of the GNU Affero General Public License | ||
along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
package streaming | ||
|
||
import ( | ||
"net/http" | ||
|
||
"github.com/sirupsen/logrus" | ||
"github.com/superseriousbusiness/gotosocial/internal/api" | ||
"github.com/superseriousbusiness/gotosocial/internal/config" | ||
"github.com/superseriousbusiness/gotosocial/internal/processing" | ||
"github.com/superseriousbusiness/gotosocial/internal/router" | ||
) | ||
|
||
const ( | ||
// BasePath is the path for the streaming api | ||
BasePath = "/api/v1/streaming" | ||
|
||
// StreamQueryKey is the query key for the type of stream being requested | ||
StreamQueryKey = "stream" | ||
|
||
// AccessTokenQueryKey is the query key for an oauth access token that should be passed in streaming requests. | ||
AccessTokenQueryKey = "access_token" | ||
) | ||
|
||
// Module implements the api.ClientModule interface for everything related to streaming | ||
type Module struct { | ||
config *config.Config | ||
processor processing.Processor | ||
log *logrus.Logger | ||
} | ||
|
||
// New returns a new streaming module | ||
func New(config *config.Config, processor processing.Processor, log *logrus.Logger) api.ClientModule { | ||
return &Module{ | ||
config: config, | ||
processor: processor, | ||
log: log, | ||
} | ||
} | ||
|
||
// Route attaches all routes from this module to the given router | ||
func (m *Module) Route(r router.Router) error { | ||
r.AttachHandler(http.MethodGet, BasePath, m.StreamGETHandler) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
package gtsmodel | ||
|
||
import "sync" | ||
|
||
// StreamsForAccount is a wrapper for the multiple streams that one account can have running at the same time. | ||
// TODO: put a limit on this | ||
type StreamsForAccount struct { | ||
// The currently held streams for this account | ||
Streams []*Stream | ||
// Mutex to lock/unlock when modifying the slice of streams. | ||
sync.Mutex | ||
} | ||
|
||
// Stream represents one open stream for a client. | ||
type Stream struct { | ||
// ID of this stream, generated during creation. | ||
ID string | ||
// Type of this stream: user/public/etc | ||
Type string | ||
// Channel of messages for the client to read from | ||
Messages chan *Message | ||
// Channel to close when the client drops away | ||
Hangup chan interface{} | ||
// Only put messages in the stream when Connected | ||
Connected bool | ||
// Mutex to lock/unlock when inserting messages, hanging up, changing the connected state etc. | ||
sync.Mutex | ||
} | ||
|
||
// Message represents one streamed message. | ||
type Message struct { | ||
// All the stream types this message should be delivered to. | ||
Stream []string `json:"stream"` | ||
// The event type of the message (update/delete/notification etc) | ||
Event string `json:"event"` | ||
// The actual payload of the message. In case of an update or notification, this will be a JSON string. | ||
Payload string `json:"payload"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.