Skip to content

Commit

Permalink
a pubrelay microservice
Browse files Browse the repository at this point in the history
  • Loading branch information
lyoshenka committed Jul 29, 2020
1 parent d99e200 commit 537fe58
Show file tree
Hide file tree
Showing 8 changed files with 830 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/hashicorp/serf v0.8.5 // indirect
github.com/lbryio/lbry.go/v2 v2.6.1-0.20200710180140-fcade7475323
github.com/lbryio/reflector.go v1.0.6-0.20190828131602-ce3d4403dbc6
github.com/mattn/go-sqlite3 v1.14.0
github.com/miekg/dns v1.1.22 // indirect
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ github.com/ChannelMeter/iso8601duration v0.0.0-20150204201828-8da3af7a2a61/go.mo
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU=
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand Down Expand Up @@ -209,6 +211,8 @@ github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCW
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-sqlite3 v1.14.0 h1:mLyGNKR8+Vv9CAU7PphKa2hkEqxxhn8i32J6FPj1/QA=
github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
Expand Down Expand Up @@ -343,6 +347,7 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand All @@ -360,6 +365,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191009170851-d66e71096ffb h1:TR699M2v0qoKTOHxeLgp6zPqaQNs74f01a/ob9W0qko=
golang.org/x/net v0.0.0-20191009170851-d66e71096ffb/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0=
Expand Down Expand Up @@ -391,6 +398,8 @@ golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191009170203-06d7bd2c5f4f h1:hjzMYz/7Ea1mNKfOnFOfktR0mlA5jqhvywClCMHM/qw=
golang.org/x/sys v0.0.0-20191009170203-06d7bd2c5f4f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
Expand Down
1 change: 1 addition & 0 deletions pubrelay/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.sqlite*
121 changes: 121 additions & 0 deletions pubrelay/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"database/sql"
"time"

"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/ytsync/v5/pubrelay/gohubbub"
)

const sqlTimeFormat = "2006-01-02 15:04:05"

func initDB(dbFile string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", "file:"+dbFile+"?mode=rwc")
if err != nil {
return nil, errors.Err(err)
}

_, err = db.Exec(`CREATE TABLE IF NOT EXISTS channel (
id TEXT NOT NULL PRIMARY KEY
)`)
if err != nil {
return nil, errors.Err(err)
}

_, err = db.Exec(`CREATE TABLE IF NOT EXISTS notification (
id INTEGER NOT NULL PRIMARY KEY,
channel_id TEXT,
video_id TEXT,
published_at DATETIME,
last_attempted_at DATETIME
)`)
if err != nil {
return nil, errors.Err(err)
}

_, err = db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS channel_and_video_idx ON notification (channel_id, video_id)`)
if err != nil {
return nil, errors.Err(err)
}

_, err = db.Exec(`CREATE INDEX IF NOT EXISTS last_attempted_at_idx ON notification (last_attempted_at)`)
if err != nil {
return nil, errors.Err(err)
}

return db, nil
}

func subscribeFromDB(db *sql.DB, s *gohubbub.Client) (err error) {
var rows *sql.Rows
rows, err = db.Query("SELECT id FROM channel")
if err != nil {
return errors.Err(err)
}
defer closeRows(rows, &err)

var channelID string
for rows.Next() {
err = rows.Scan(&channelID)
if err != nil {
return errors.Err(err)
}
subscribe(db, s, channelID)
}

return nil
}

func insertNewChannel(db *sql.DB, channelID string) error {
_, err := db.Exec(`INSERT OR IGNORE INTO channel (id) VALUES (?)`, channelID)
return errors.Err(err)
}

func touchFailedNotification(db *sql.DB, e entry) error {
_, err := db.Exec(
`INSERT INTO notification
(channel_id, video_id, published_at, last_attempted_at) VALUES (?,?,?,?)
ON CONFLICT (channel_id, video_id) DO UPDATE SET last_attempted_at = excluded.last_attempted_at`,
e.ChannelId, e.VideoId, e.PublishedAt, time.Now().UTC().Format(sqlTimeFormat),
)
return errors.Err(err)
}

func getNotificationsToRetry(db *sql.DB, since time.Time) (entries []entry, err error) {
var rows *sql.Rows
rows, err = db.Query(
`SELECT channel_id, video_id, published_at FROM notification WHERE last_attempted_at <= ?`,
since.UTC().Format(sqlTimeFormat),
)
if err != nil {
return nil, errors.Err(err)
}
defer closeRows(rows, &err)

for rows.Next() {
var e entry
err = rows.Scan(&e.ChannelId, &e.VideoId, &e.PublishedAt)
if err != nil {
return nil, errors.Err(err)
}
entries = append(entries, e)
}

return entries, nil
}

func deleteNotification(db *sql.DB, e entry) error {

}

func closeRows(rows *sql.Rows, err *error) {
closeErr := rows.Close()
if *err != nil {
if closeErr != nil {
*err = errors.Err(closeErr)
} else {
*err = errors.Err(rows.Err())
}
}
}
Loading

0 comments on commit 537fe58

Please sign in to comment.