Skip to content

Commit

Permalink
Merge pull request #1 from worksinmagic/development
Browse files Browse the repository at this point in the history
For v0.1.0
  • Loading branch information
didasy committed Sep 27, 2020
2 parents 284395e + f63c80e commit 6a06499
Show file tree
Hide file tree
Showing 14 changed files with 406 additions and 319 deletions.
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ language: go
sudo: false
dist: focal

branches:
only:
- master
- development

services:
- docker

Expand Down
164 changes: 102 additions & 62 deletions README.md

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions app/ytfeed/ytfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Run(ctx context.Context, logger mainytfeed.Logger) (err error) {

var streamScheduler *streamschedule.StreamSchedule
if cfg.BoltDBPath != "" {
streamScheduler, err = streamschedule.New(logger, cfg.BoltDBPath, cfg.ResubCallbackAddr, cfg.StreamSchedulerRetryDelay, cfg.StreamSchedulerWorkerInterval, cfg.StreamSchedulerMaxRetries)
streamScheduler, err = streamschedule.New(logger, cfg.BoltDBPath, cfg.StreamSchedulerWorkerInterval)
if err != nil {
err = errors.Wrap(err, "failed to create stream scheduler service")
return
Expand Down Expand Up @@ -178,7 +178,7 @@ func Run(ctx context.Context, logger mainytfeed.Logger) (err error) {
}

// run workers
subscriber := autosubscribefeed.New(logger, cfg.VerificationToken, cfg.ResubTargetAddr, cfg.ResubTopic, cfg.ResubCallbackAddr, cfg.ResubInterval)
subscriber := autosubscribefeed.New(logger, cfg.VerificationToken, cfg.VerificationSecret, cfg.ResubTargetAddr, cfg.ResubCallbackAddr, cfg.ResubTopics, cfg.ResubInterval)
go func(ctx context.Context, subscriber *autosubscribefeed.Subscriber) {
err := subscriber.Subscribe(ctx)
if err != nil {
Expand All @@ -190,6 +190,7 @@ func Run(ctx context.Context, logger mainytfeed.Logger) (err error) {

if streamScheduler != nil {
go func(ctx context.Context, streamScheduler *streamschedule.StreamSchedule) {
streamScheduler.RegisterDataHandler(dataHandlers...)
err := streamScheduler.RunWorker(ctx)
if err != nil {
err = errors.Wrap(err, "stream scheduler worker exited with error")
Expand All @@ -201,7 +202,7 @@ func Run(ctx context.Context, logger mainytfeed.Logger) (err error) {

// declare handler functions
http.HandleFunc("/health", health.Handler)
http.HandleFunc("/", rss.Handler(ctx, logger, cfg.VerificationToken, dataHandlers...))
http.HandleFunc("/", rss.Handler(ctx, logger, cfg.VerificationToken, cfg.VerificationSecret, dataHandlers...))

// listen
errCh := make(chan error, 1)
Expand Down
23 changes: 8 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@ import (
)

const (
DefaultVersion = "v1.0.0"
DefaultVersion = "v0.1.0"
DefaultHost = ":8123"
DefaultResubTargetAddr = "https://pubsubhubbub.appspot.com/subscribe"
DefaultResubInterval = 24 * 3 * time.Hour
DefaultFileNameTemplate = "{{.ChannelID}}/{{.PublishedYear}}/{{.PublishedMonth}}/{{.PublishedDay}}/{{.PublishedTimeZone}}/{{.VideoID}}.{{.VideoExtension}}"
DefaultFormatQuality = "720"
DefaultFormatExtension = "webm"
DefaultRedisChannel = "ytfeed"
DefaultStreamSchedulerRetryDelay = 1 * time.Minute
DefaultStreamSchedulerMaxRetries = 5
DefaultStreamSchedulerWorkerInterval = 1 * time.Minute
DefaultVideoDownloadMaxRetries = 5
DefaultTemporaryFileDir = "./"
Expand Down Expand Up @@ -55,6 +53,7 @@ func init() {
handleError(viper.BindEnv("host"))
handleError(viper.BindEnv("youtube_api_key"))
handleError(viper.BindEnv("verification_token"))
handleError(viper.BindEnv("verification_secret"))

handleError(viper.BindEnv("resub_interval"))
handleError(viper.BindEnv("resub_target_addr"))
Expand Down Expand Up @@ -99,9 +98,7 @@ func init() {
handleError(viper.BindEnv("redis_idle_check_frequency"))

handleError(viper.BindEnv("boltdb_path"))
handleError(viper.BindEnv("stream_scheduler_retry_delay"))
handleError(viper.BindEnv("stream_scheduler_worker_interval"))
handleError(viper.BindEnv("stream_scheduler_max_retries"))

handleError(viper.BindEnv("amqp_dsn"))
handleError(viper.BindEnv("amqp_exchange"))
Expand All @@ -122,9 +119,7 @@ func init() {
viper.SetDefault("video_format_quality", DefaultFormatQuality)
viper.SetDefault("video_format_extension", DefaultFormatExtension)
viper.SetDefault("redis_channel", DefaultRedisChannel)
viper.SetDefault("stream_scheduler_retry_delay", DefaultStreamSchedulerRetryDelay)
viper.SetDefault("stream_scheduler_worker_interval", DefaultStreamSchedulerWorkerInterval)
viper.SetDefault("stream_scheduler_max_retries", DefaultStreamSchedulerMaxRetries)
viper.SetDefault("video_download_max_retries", DefaultVideoDownloadMaxRetries)
viper.SetDefault("temporary_file_dir", DefaultTemporaryFileDir)
viper.SetDefault("amqp_exchange", DefaultAMQPExchange)
Expand All @@ -151,11 +146,12 @@ type Validator interface {
type Configuration struct {
validator Validator `validate:"required"`

YoutubeAPIKey string `validate:"required"`
VerificationToken string `validate:"required"`
YoutubeAPIKey string `validate:"required"`
VerificationToken string `validate:"required"`
VerificationSecret string `validate:"required"`

ResubTargetAddr string `validate:"required"`
ResubTopic string `validate:"required"`
ResubTopics []string `validate:"required"`
ResubCallbackAddr string `validate:"required"`
ResubInterval time.Duration `validate:"required"`

Expand Down Expand Up @@ -197,9 +193,7 @@ type Configuration struct {
RedisIdleCheckFrequency time.Duration `validate:""`

BoltDBPath string `validate:"omitempty,file"`
StreamSchedulerRetryDelay time.Duration `validate:"required,min=0"`
StreamSchedulerWorkerInterval time.Duration `validate:"required,min=1000000000"`
StreamSchedulerMaxRetries int `validate:""`

AMQPDSN string `validate:""`
AMQPExchange string `validate:"required"`
Expand All @@ -219,10 +213,11 @@ func New() (c *Configuration) {

c.YoutubeAPIKey = viper.GetString("youtube_api_key")
c.VerificationToken = viper.GetString("verification_token")
c.VerificationSecret = viper.GetString("verification_secret")

c.ResubCallbackAddr = viper.GetString("resub_callback_addr")
c.ResubTargetAddr = viper.GetString("resub_target_addr")
c.ResubTopic = viper.GetString("resub_topic")
c.ResubTopics = viper.GetStringSlice("resub_topic")
c.ResubInterval = viper.GetDuration("resub_interval")

c.S3Endpoint = viper.GetString("s3_endpoint")
Expand Down Expand Up @@ -263,9 +258,7 @@ func New() (c *Configuration) {
c.RedisIdleCheckFrequency = viper.GetDuration("redis_idle_check_frequency")

c.BoltDBPath = viper.GetString("boltdb_path")
c.StreamSchedulerRetryDelay = viper.GetDuration("stream_scheduler_retry_delay")
c.StreamSchedulerWorkerInterval = viper.GetDuration("stream_scheduler_worker_interval")
c.StreamSchedulerMaxRetries = viper.GetInt("stream_scheduler_max_retries")

c.AMQPDSN = viper.GetString("amqp_dsn")
c.AMQPExchange = viper.GetString("amqp_exchange")
Expand Down
6 changes: 6 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"errors"
"os"
"testing"

"github.com/golang/mock/gomock"
Expand All @@ -18,6 +19,9 @@ func TestConfig(t *testing.T) {
mockValidator := mock.NewMockValidator(ctrl)

t.Run("Validate success", func(t *testing.T) {
version := "v100.0.0"
os.Setenv("YTFEED_VERSION", version)

cfg = New()
require.NotNil(t, cfg)
cfg.StorageBackend = StorageBackendDisk
Expand All @@ -28,6 +32,8 @@ func TestConfig(t *testing.T) {

err := cfg.Validate()
require.NoError(t, err)
require.Equal(t, version, cfg.Version)
require.Equal(t, DefaultHost, cfg.Host)
})

t.Run("Validate failed", func(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions mock/savevideo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 53 additions & 19 deletions plugin/autosubscribefeed/autosubscribefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package autosubscribefeed

import (
"context"
"fmt"
"net/http"
"net/url"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -20,6 +22,9 @@ const (
HubMode = "hub.mode"
HubVerify = "hub.verify"
HubVerificationToken = "hub.verify_token"
HubSecret = "hub.secret"

ErrResubscribeFormat = "failed to resubscribe for topic %s with error '%v'"
)

var (
Expand All @@ -29,21 +34,23 @@ var (
type Subscriber struct {
resubInterval time.Duration
targetAddr string
topic string
topics []string
callbackAddr string
verificationToken string
hmacSecret string

logger ytfeed.Logger
client *http.Client
}

func New(logger ytfeed.Logger, verificationToken, targetAddr, topic, callbackAddr string, resubInterval time.Duration) (s *Subscriber) {
func New(logger ytfeed.Logger, verificationToken, hmacSecret, targetAddr, callbackAddr string, topics []string, resubInterval time.Duration) (s *Subscriber) {
s = &Subscriber{}
s.resubInterval = resubInterval
s.targetAddr = targetAddr
s.callbackAddr = callbackAddr
s.verificationToken = verificationToken
s.topic = topic
s.hmacSecret = hmacSecret
s.topics = topics
s.client = &http.Client{}
s.client.Timeout = DefaultTimeout
s.logger = logger
Expand All @@ -69,25 +76,52 @@ func (s *Subscriber) Subscribe(ctx context.Context) (err error) {
}
}

type ErrorSub struct {
Topic string
Err error
}

func (s *Subscriber) subscribe() (err error) {
data := url.Values{}
data.Set(HubTopic, s.topic)
data.Set(HubCallback, s.callbackAddr)
data.Set(HubMode, DefaultHubMode)
data.Set(HubVerify, DefaultHubVerify)
data.Set(HubVerificationToken, s.verificationToken)

var resp *http.Response
resp, err = s.client.PostForm(s.targetAddr, data)
if err != nil {
return
}
if resp.StatusCode >= http.StatusBadRequest {
err = errors.Wrapf(ErrFailedToSubscribeFeed, "HTTP status %d", resp.StatusCode)
return
failedReqs := make([]ErrorSub, 0, 8)

for _, topic := range s.topics {
data := url.Values{}
data.Set(HubTopic, topic)
data.Set(HubCallback, s.callbackAddr)
data.Set(HubMode, DefaultHubMode)
data.Set(HubVerify, DefaultHubVerify)
data.Set(HubVerificationToken, s.verificationToken)
data.Set(HubSecret, s.hmacSecret)

var resp *http.Response
resp, err = s.client.PostForm(s.targetAddr, data)
if err != nil {
failedReqs = append(failedReqs, ErrorSub{
Topic: topic,
Err: err,
})
continue
}
if resp.StatusCode >= http.StatusBadRequest {
err = errors.Wrapf(ErrFailedToSubscribeFeed, "HTTP status %d", resp.StatusCode)
failedReqs = append(failedReqs, ErrorSub{
Topic: topic,
Err: err,
})
continue
}

s.logger.Infof("Resubscribed to topic %s with callback address %s", topic, s.callbackAddr)
}

s.logger.Infof("Resubscribed to topic %s with callback address %s", s.topic, s.callbackAddr)
if len(failedReqs) > 0 {
errMessages := make([]string, 0, len(failedReqs))
for _, f := range failedReqs {
errMessages = append(errMessages, fmt.Sprintf(ErrResubscribeFormat, f.Topic, f.Err))
}

err = errors.New(strings.Join(errMessages, ","))
}

return
}
16 changes: 12 additions & 4 deletions plugin/autosubscribefeed/autosubscribefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ func TestSubscriber(t *testing.T) {

logger := mock.NewMockLogger(ctrl)
verificationToken := "mytoken"
secret := "mysecret"
targetAddr := ts.URL
topic := "mytopic"
topics := []string{"mytopic", "yourtopic"}
callbackAddr := "http://localhost:9876"
resubInterval := 100 * time.Millisecond

t.Run("success", func(t *testing.T) {
s := New(logger, verificationToken, targetAddr, topic, callbackAddr, resubInterval)
s := New(logger, verificationToken, secret, targetAddr, callbackAddr, topics, resubInterval)
require.NotNil(t, s)

customHTTPClient := &http.Client{}
Expand All @@ -49,6 +50,12 @@ func TestSubscriber(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 200*time.Millisecond)
defer cancel()

// do this twice because we have two topics
logger.EXPECT().Infof(
gomock.AssignableToTypeOf("resubscribed"),
gomock.AssignableToTypeOf("topic"),
gomock.AssignableToTypeOf("callback address"),
)
logger.EXPECT().Infof(
gomock.AssignableToTypeOf("resubscribed"),
gomock.AssignableToTypeOf("topic"),
Expand All @@ -60,13 +67,14 @@ func TestSubscriber(t *testing.T) {
})

t.Run("failed", func(t *testing.T) {
s := New(logger, verificationToken, targetAddr, wrongTopic, callbackAddr, resubInterval)
topics := []string{wrongTopic}
s := New(logger, verificationToken, secret, targetAddr, callbackAddr, topics, resubInterval)
require.NotNil(t, s)

customHTTPClient := &http.Client{}
s.SetHTTPClient(customHTTPClient)

ctx, cancel := context.WithTimeout(context.TODO(), 200*time.Millisecond)
ctx, cancel := context.WithTimeout(context.TODO(), 150*time.Millisecond)
defer cancel()

logger.EXPECT().Errorf(
Expand Down
6 changes: 1 addition & 5 deletions plugin/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
)

const (
DefaultCacheControl = ""
DefaultVideoContentType = "video/mp4"
DefaultCacheControl = ""
)

type GCS struct {
Expand Down Expand Up @@ -46,9 +45,6 @@ func (g *GCS) SaveAs(ctx context.Context, name string, r io.Reader) (written int
w := g.cli.Bucket(g.bucketName).Object(name).NewWriter(ctx)
defer w.Close()

w.CacheControl = DefaultCacheControl
w.ContentType = DefaultVideoContentType

written, err = io.Copy(w, r)

return
Expand Down
Loading

0 comments on commit 6a06499

Please sign in to comment.