Skip to content
This repository has been archived by the owner on May 14, 2021. It is now read-only.

Commit

Permalink
Merge pull request #31 from kinecosystem/fix-sse
Browse files Browse the repository at this point in the history
Fix SSE PubSub issue where stream as closed with no information after first event
  • Loading branch information
oryband committed Feb 13, 2019
2 parents 7fd9694 + 94d00f0 commit b47e762
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
20 changes: 12 additions & 8 deletions services/horizon/internal/actions/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (base *Base) Execute(action interface{}) {
}

case render.MimeEventStream:
var pumped chan interface{}
var notification chan interface{}

action, ok := action.(SSE)
if !ok {
Expand All @@ -69,8 +69,8 @@ func (base *Base) Execute(action interface{}) {
// This causes action.SSE to only be triggered by this topic. Unsubscribe when done.
topic := action.GetTopic()
if topic != "" {
pumped = sse.Subscribe(topic)
defer sse.Unsubscribe(pumped, topic)
notification = sse.Subscribe(topic)
defer sse.Unsubscribe(notification, topic)
}

stream := sse.NewStream(ctx, base.W)
Expand Down Expand Up @@ -125,14 +125,18 @@ func (base *Base) Execute(action interface{}) {
}

select {
case <-pumped:
//no-op, continue onto the next iteration
case <-notification:
// No-op, continue onto the next iteration.
continue
case <-ctx.Done():
// Close stream and exit.
stream.Done()
return
case <-base.appCtx.Done():
// Close stream and exit.
stream.Done()
return
}

stream.Done()
return
}
case render.MimeRaw:
action, ok := action.(Raw)
Expand Down
30 changes: 16 additions & 14 deletions services/horizon/internal/ingest/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,20 +182,22 @@ func (ingest *Ingestion) UpdateAccountIDs(tables []TableName) error {
return nil
}

// Return a channel which emits a message when ingections were committed to the database
func (ingest *Ingestion) getCommitChannel() chan interface{} {
// subscribeToDBCommit subscribes to updates for ingestions that were committed to the database,
// and returns a notification channel that notifies when a commit happened.
func (ingest *Ingestion) subscribeToDBCommit() chan interface{} {
return commitPubsub.SubOnce(pubsubCommitTopic)
}

// Listen until the insert action committed to the DB and then publish to sse subscriptors and
// then publish the topic that was ingested.
func (ingest *Ingestion) waitAndPublish(committed chan interface{}, topic string) {
// publishOnDBCommit listens until the insert action was committed to the database,
// then publishes a notification to SSE subscriptors, and also to the topic that was ingested.
func (ingest *Ingestion) publishOnDBCommit(committed chan interface{}, topic string) {
l := log.WithFields(ilog.F{"topic": topic, "channel": committed})
l.Debug("Waiting for topic")
select {
case <-committed:
sse.Publish(topic, false)
case <-time.After(10 * time.Second):
// Timeout after a while
l.Errorf("Failed to get publish approval, releasing channel")
sse.Publish(topic, false)
}
Expand All @@ -209,12 +211,12 @@ func (ingest *Ingestion) Ledger(
ops int,
) {

// Wait for data to be committed to database, then notify subscribes.
go ingest.waitAndPublish(ingest.getCommitChannel(), "ledger")
go ingest.waitAndPublish(ingest.getCommitChannel(), strconv.FormatInt(id, 10))
// Wait for data to be committed to database, then notify subscribers.
go ingest.publishOnDBCommit(ingest.subscribeToDBCommit(), "ledger")
go ingest.publishOnDBCommit(ingest.subscribeToDBCommit(), strconv.FormatInt(id, 10))

if txs > 0 {
go ingest.waitAndPublish(ingest.getCommitChannel(), "transactions")
go ingest.publishOnDBCommit(ingest.subscribeToDBCommit(), "transactions")
}

ingest.builders[LedgersTableName].Values(
Expand Down Expand Up @@ -264,7 +266,7 @@ func (ingest *Ingestion) Operation(
func (ingest *Ingestion) OperationParticipants(op int64, aids []xdr.AccountId) {
for _, aid := range aids {
// Wait for data to be committed to database, then notify subscribes.
go ingest.waitAndPublish(ingest.getCommitChannel(), aid.Address())
go ingest.publishOnDBCommit(ingest.subscribeToDBCommit(), aid.Address())

ingest.builders[OperationParticipantsTableName].Values(op, Address(aid.Address()))
}
Expand Down Expand Up @@ -297,7 +299,7 @@ func (ingest *Ingestion) Trade(
ledgerClosedAt int64,
) error {
// Wait for data to be committed to database, then notify subscribes.
go ingest.waitAndPublish(ingest.getCommitChannel(), "order_book")
go ingest.publishOnDBCommit(ingest.subscribeToDBCommit(), "order_book")

q := history.Q{Session: ingest.DB}

Expand Down Expand Up @@ -359,7 +361,7 @@ func (ingest *Ingestion) Transaction(
signatures := tx.Base64Signatures()

// Wait for data to be committed to database, then notify subscribes.
go ingest.waitAndPublish(ingest.getCommitChannel(), tx.TransactionHash)
go ingest.publishOnDBCommit(ingest.subscribeToDBCommit(), tx.TransactionHash)

ingest.builders[TransactionsTableName].Values(
id,
Expand Down Expand Up @@ -389,7 +391,7 @@ func (ingest *Ingestion) Transaction(
func (ingest *Ingestion) TransactionParticipants(tx int64, aids []xdr.AccountId) {
for _, aid := range aids {
// Wait for data to be committed to database, then notify subscribes.
go ingest.waitAndPublish(ingest.getCommitChannel(), aid.Address())
go ingest.publishOnDBCommit(ingest.subscribeToDBCommit(), aid.Address())
ingest.builders[TransactionParticipantsTableName].Values(tx, Address(aid.Address()))
}
}
Expand Down Expand Up @@ -506,7 +508,7 @@ func (ingest *Ingestion) commit() error {
if err != nil {
return err
}
// Update subscribers that commit is done to the DB.
// Update subscribers that a database commit occured.
commitPubsub.TryPub(pubsubStubValue, pubsubCommitTopic)
return nil
}
Expand Down
11 changes: 6 additions & 5 deletions services/horizon/internal/render/sse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,22 @@ func getJSON(val interface{}) string {
// Pubsub for SSE requests, so they will run SSE.action only upon relevant data changes.
var ssePubsub = pubsub.New(pubsubCapacity)

// Subscribe to topic by SSE connection usually with an id (account, ledger, tx)
// Once a change in database happens, Publish is used by ingestor so channel is notified.
// Subscribe to topic by SSE connection, usually with an ID (account, ledger, tx).
// Once a change occurs in Horizon database happens, Publish() is called by ingestor so the
// subscription channel is notified.
func Subscribe(topic string) chan interface{} {
topicChan := ssePubsub.Sub(topic)
log.WithFields(log.F{"topic": topic, "channel": topicChan}).Debug("Subscribed to topic")
return topicChan
}

// Unsubscribe to a topic, for example when SSE connection is closed.
func Unsubscribe(channel chan interface{}, topic string) {
func Unsubscribe(notification chan interface{}, topic string) {
log.WithField("topic", topic).Debug("Unsubscribed from topic")
ssePubsub.Unsub(channel, topic)
ssePubsub.Unsub(notification, topic)
}

// Publish publishes to channel.
// Publish publishes to a PubSub subscription notification channel.
//
// NOTE there is good reason to usually publish in a non-blocking manner i.e. skipping publishing
// and dropping sending the notification to the channel. The reason is in case channel queue is full,
Expand Down

0 comments on commit b47e762

Please sign in to comment.