Skip to content

Commit

Permalink
feat: init
Browse files Browse the repository at this point in the history
  • Loading branch information
didasy committed Sep 20, 2020
0 parents commit 0070389
Show file tree
Hide file tree
Showing 43 changed files with 4,939 additions and 0 deletions.
1 change: 1 addition & 0 deletions .czrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{ "path": "cz-conventional-changelog" }
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/ytfeed
/sample_data
/build.sh
/TODO.md
/full_test.sh
/*.log
/experiments
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2020 Andida Syahendar Dwi Putra

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
90 changes: 90 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# ytfeed
[![Build Status](https://travis-ci.org/worksinmagic/ytfeed.svg?branch=master)](https://travis-ci.org/worksinmagic/ytfeed)
[![codecov](https://codecov.io/gh/worksinmagic/ytfeed/branch/master/graph/badge.svg)](https://codecov.io/gh/worksinmagic/ytfeed)
[![GoDoc](https://godoc.org/github.com/worksinmagic/ytfeed?status.svg)](https://godoc.org/github.com/worksinmagic/ytfeed)
[![Go Report Card](https://goreportcard.com/badge/github.com/worksinmagic/ytfeed)](https://goreportcard.com/report/github.com/worksinmagic/ytfeed)
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fworksinmagic%2Fytfeed.svg?type=small)](https://app.fossa.io/projects/git%2Bgithub.com%2Fworksinmagic%2Fytfeed?ref=badge_small)
[![Donate](https://img.shields.io/badge/Donate-PayPal-green.svg)](https://paypal.me/didasy)

Automatic Youtube video and stream archiver.

## Note

- You **MUST** install [youtube-dl](https://github.com/ytdl-org/youtube-dl) first and **MUST** be available at `$PATH`.
- To run s3 test or complete suite test, you need to run `test_s3.sh` for it to start a Minio server. You can stop it or delete it after testing by running `test_s3_teardown.sh`
- You also need to set `YTFEED_YOUTUBE_API_KEY`, `YTFEED_YOUTUBE_VIDEO_ID`, and `YTFEED_YOUTUBE_VIDEO_URL` to do a complete suite test.
- You can redo failed download by sending a `POST` request with the XML message in the log as the body.
- If you are planning to download live broadcast, the downloaded video will be in the format `mp4` regardless of your extension config.

## Before Using

- You have to run the program first and make sure it could be reached from the outside, then go [here](https://pubsubhubbub.appspot.com/subscribe) to subscribe to a channel.
- If the program got killed because of OOM, you can turn on swap file if you cannot raise the machine's memory. Or you can PR me a better way to handle the upload.

## Configuration

Set configuration by setting up environment variables.

- `YTFEED_YOUTUBE_API_KEY` Youtube API Key, required.
- `YTFEED_VERIFICATION_TOKEN` Verification token used to subscribe and unsubscribe topics, required.
- `YTFEED_RESUB_CALLBACK_ADDR` Callback address to ytfeed, required.
- `YTFEED_RESUB_TARGET_ADDR` The subscription page of pubsubhubbub. Defaulted to [this.](https://pubsubhubbub.appspot.com/subscribe)
- `YTFEED_RESUB_TOPIC` The topic the subscription should subscribe to, for example `https://www.youtube.com/xml/feeds/videos.xml?channel_id=mychannelid`, required.
- `YTFEED_RESUB_INTERVAL` The interval between resubscription, defaulted to 3 days or `72h`.
- `YTFEED_STORAGE_BACKEND` The storage backend, required. Must be one of `disk`, `gcs`, or `s3`.
- `YTFEED_S3_ENDPOINT` The S3 compliant server endpoint, required if `YTFEED_STORAGE_BACKEND` is `s3`.
- `YTFEED_S3_ACCESS_KEY_ID` The access key id for the S3 compliant server, required if `YTFEED_STORAGE_BACKEND` is `s3`.
- `YTFEED_S3_SECRET_ACCESS_KEY` The secret access key id for the S3 compliant server, required if `YTFEED_STORAGE_BACKEND` is `s3`.
- `YTFEED_S3_BUCKET_NAME` The bucket name for the S3 compliant server, required if `YTFEED_STORAGE_BACKEND` is `s3`.
- `YTFEED_GCS_CREDENTIAL_JSON_FILE_PATH` The JSON credential file for GCS, only used if `YTFEED_STORAGE_BACKEND` is `gcs`.
- `YTFEED_GCS_BUCKET_NAME` The bucket name for GCS, required if `YTFEED_STORAGE_BACKEND` is `gcs`.
- `YTFEED_DISK_DIRECTORY` The disk directory path, required if `YTFEED_STORAGE_BACKEND` is `disk`.
- `YTFEED_FILENAME_TEMPLATE` The filename template, defaulted to `{{.ChannelID}}/{{.PublishedYear}}/{{.PublishedMonth}}/{{.PublishedDay}}/{{.PublishedTimeZone}}/{{.VideoID}}.{{.VideoExtension}}`. The usable variables are `.ChannelID`, `.VideoID`, `.Published`, `.Title`, `.PublishedYear`, `.PublishedMonth`, `.PublishedDay`, `.PublishedHour`, `.PublishedMinute`, `.PublishedSecond`, `.PublishedNanosecond`, `.PublishedTimeZone`, `.PublishedTimeZoneOffsetSeconds`, `.VideoQuality`, `.VideoExtension`, and `.Author`.
- `YTFEED_HOST` The host address, defaulted to `:8123`.
- `YTFEED_VIDEO_FORMAT_QUALITY` The quality of the video to download, must be one of `1080`, `720`, `640`, `480`, `360`, `240`, or `144` and defaulted to `720`.
- `YTFEED_VIDEO_FORMAT_EXTENSION` The extension of the video to download, defaulted to `webm`.
- `YTFEED_VIDEO_DOWNLOAD_RETRY_DELAY` Delay time when retrying, set to activate retries.
- `YTFEED_VIDEO_DOWNLOAD_MAX_RETRIES` Maximum retries before giving up, defaulted to `5`
- `YTFEED_REDIS_ADDR` Redis address, required if you want to publish the data to Redis PubSub.
- `YTFEED_REDIS_USERNAME`
- `YTFEED_REDIS_PASSWORD`
- `YTFEED_REDIS_CHANNEL` Redis publish channel, defaulted to `ytfeed`.
- `YTFEED_REDIS_DB`
- `YTFEED_REDIS_MAX_RETRIES`
- `YTFEED_REDIS_DIAL_TIMEOUT`
- `YTFEED_REDIS_WRITE_TIMEOUT`
- `YTFEED_REDIS_READ_TIMEOUT`
- `YTFEED_REDIS_POOL_SIZE`
- `YTFEED_REDIS_MIN_IDLE_CONNS`
- `YTFEED_REDIS_MAX_CONN_AGE`
- `YTFEED_REDIS_POOL_TIMEOUT`
- `YTFEED_REDIS_IDLE_TIMEOUT`
- `YTFEED_REDIS_IDLE_CHECK_FREQUENCY`
- `YTFEED_BOLTDB_PATH` Set this to a file path if you want to activate stream scheduler.
- `YTFEED_STREAM_SCHEDULER_RETRY_DELAY` Retry delay. Defaulted to 1 minute.
- `YTFEED_STREAM_SCHEDULER_MAX_RETRIES` Defaulted to 5 retries.
- `YTFEED_STREAM_SCHEDULER_WORKER_INTERVAL` Defaulted to 1 minute.
- `YTFEED_AMQP_DSN` AMQP DSN, required if you want to publish the data to AMQP broker.
- `YTFEED_AMQP_EXCHANGE` Defaulted to `ytfeed`
- `YTFEED_AMQP_KEY` Defaulted to `schedule`
- `YTFEED_AMQP_PUBLISH_MANDATORY` Defaulted to true
- `YTFEED_AMQP_PUBLISH_IMMEDIATE` Defaulted to false
- `YTFEED_AMQP_EXCHANGE_KIND` Defaulted to `topic`
- `YTFEED_AMQP_EXCHANGE_DURABLE` Defaulted to true
- `YTFEED_AMQP_EXCHANGE_INTERNAL` Defaulted to false
- `YTFEED_AMQP_EXCHANGE_AUTO_DELETE` Defaulted to false
- `YTFEED_AMQP_EXCHANGE_NO_WAIT` Defaulted to false

## Building

Your ol' plain `go build cmd/ytfeed/main.go`

## Usage

Either create `.env` or use shellscript to set the required environment variables and run the binary.

## How to Contribute

- Keep your code super simple and clean.
- Add comments to your code if you have to explain what your code is doing.
- Make sure you have Commitizen CLI installed and your commit message must be written through `git cz`.
225 changes: 225 additions & 0 deletions app/ytfeed/ytfeed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package ytfeed

import (
"context"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"github.com/streadway/amqp"
mainytfeed "github.com/worksinmagic/ytfeed"
"github.com/worksinmagic/ytfeed/config"
"github.com/worksinmagic/ytfeed/health"
"github.com/worksinmagic/ytfeed/plugin/autosubscribefeed"
"github.com/worksinmagic/ytfeed/plugin/disk"
"github.com/worksinmagic/ytfeed/plugin/gcs"
"github.com/worksinmagic/ytfeed/plugin/publishamqp"
"github.com/worksinmagic/ytfeed/plugin/publishredis"
"github.com/worksinmagic/ytfeed/plugin/s3"
"github.com/worksinmagic/ytfeed/plugin/savevideo"
"github.com/worksinmagic/ytfeed/plugin/streamschedule"
"github.com/worksinmagic/ytfeed/rss"
"google.golang.org/api/option"
"google.golang.org/api/youtube/v3"
)

func Run(ctx context.Context, logger mainytfeed.Logger) (err error) {
// declare dependencies
cfg := config.New()
err = cfg.Validate()
if err != nil {
err = errors.Wrap(err, "failed to validate configuration")
return
}

yts, err := youtube.NewService(ctx, option.WithAPIKey(cfg.YoutubeAPIKey))
if err != nil {
err = errors.Wrap(err, "failed to create new YouTube service")
return
}

// declare data handlers
dataHandlers := make([]mainytfeed.DataHandlerFunc, 0, 3)

var streamScheduler *streamschedule.StreamSchedule
if cfg.BoltDBPath != "" {
streamScheduler, err = streamschedule.New(logger, cfg.BoltDBPath, cfg.ResubCallbackAddr, cfg.StreamSchedulerRetryDelay, cfg.StreamSchedulerWorkerInterval, cfg.StreamSchedulerMaxRetries)
if err != nil {
err = errors.Wrap(err, "failed to create stream scheduler service")
return
}
defer func(streamScheduler *streamschedule.StreamSchedule) {
err := streamScheduler.CloseDatabase()
if err != nil {
logger.Errorf("Failed to close stream scheduler database: %v", err)
}
}(streamScheduler)
}

var dataSaver savevideo.DataSaver
switch cfg.StorageBackend {
case config.StorageBackendS3:
dataSaver, err = s3.New(cfg.S3Endpoint, cfg.S3AccessKeyID, cfg.S3SecretAccessKey, cfg.S3BucketName, s3.UseSSL)
if err != nil {
err = errors.Wrap(err, "failed to create new S3 data saver service")
return
}
case config.StorageBackendGCS:
dataSaver, err = gcs.New(cfg.GCSBucketName, cfg.GCSCredentialJSONFilePath, nil)
if err != nil {
err = errors.Wrap(err, "failed to create new GCS data saver service")
return
}
case config.StorageBackendDisk:
dataSaver, err = disk.New(cfg.DiskDirectory)
if err != nil {
err = errors.Wrap(err, "failed to create new disk data saver service")
return
}
case config.StorageBackendNone:
// No backend, don't save video
default:
err = errors.Wrap(err, "storage backend type is invalid")
return
}

var saveVideo *savevideo.SaveVideo
if dataSaver != nil {
saveVideo, err = savevideo.New(
logger, yts.Videos, dataSaver,
cfg.TemporaryFileDir, cfg.FileNameTemplate, cfg.VideoFormatQuality, cfg.VideoFormatExtension,
)
if err != nil {
err = errors.Wrap(err, "failed to initialize savevideo")
return
}
}
if saveVideo != nil && streamScheduler != nil {
saveVideo.SetStreamScheduler(streamScheduler)
}
if saveVideo != nil && cfg.VideoDownloadRetryDelay > 0 && cfg.VideoDownloadMaxRetries > 0 {
saveVideo.SetRetries(cfg.VideoDownloadRetryDelay, cfg.VideoDownloadMaxRetries)
}
if saveVideo != nil {
dataHandlers = append(dataHandlers, saveVideo.DataHandler)
}

if cfg.RedisAddr != "" {
opts := &redis.Options{}
opts.Addr = cfg.RedisAddr
opts.DB = cfg.RedisDB
opts.DialTimeout = cfg.RedisDialTimeout
opts.IdleCheckFrequency = cfg.RedisIdleCheckFrequency
opts.IdleTimeout = cfg.RedisIdleTimeout
opts.MaxConnAge = cfg.RedisMaxConnAge
opts.MaxRetries = cfg.RedisMaxRetries
opts.MinIdleConns = cfg.RedisMinIdleConns
opts.Password = cfg.RedisPassword
opts.PoolSize = cfg.RedisPoolSize
opts.PoolTimeout = cfg.RedisPoolTimeout
opts.ReadTimeout = cfg.RedisReadTimeout
opts.Username = cfg.RedisUsername
opts.WriteTimeout = cfg.RedisWriteTimeout
redisClient := publishredis.New(logger, cfg.RedisChannel, opts)

dataHandlers = append(dataHandlers, redisClient.DataHandler)
}

if cfg.AMQPDSN != "" {
var conn *amqp.Connection
conn, err = amqp.Dial(cfg.AMQPDSN)
if err != nil {
err = errors.Wrap(err, "failed to initialize publishamqp")
return
}
defer conn.Close()

var amqpChannel *amqp.Channel
amqpChannel, err = conn.Channel()
if err != nil {
err = errors.Wrap(err, "failed to create amqp channel")
return
}
defer amqpChannel.Close()

err = amqpChannel.Confirm(cfg.AMQPExchangeNoWait)
if err != nil {
err = errors.Wrap(err, "failed to set amqp broker to confirm mode")
return
}

err = amqpChannel.ExchangeDeclare(
cfg.AMQPExchange,
cfg.AMQPExchangeKind,
cfg.AMQPExchangeDurable,
cfg.AMQPExchangeAutoDelete,
cfg.AMQPExchangeInternal,
cfg.AMQPExchangeNoWait,
nil,
)
if err != nil {
err = errors.Wrap(err, "failed to declare amqp exchange")
return
}

pa := publishamqp.New(
logger,
amqpChannel,
cfg.AMQPExchange,
cfg.AMQPKey,
cfg.AMQPPublishMandatory,
cfg.AMQPPublishImmediate,
)

dataHandlers = append(dataHandlers, pa.DataHandler)
}

// run workers
subscriber := autosubscribefeed.New(logger, cfg.VerificationToken, cfg.ResubTargetAddr, cfg.ResubTopic, cfg.ResubCallbackAddr, cfg.ResubInterval)
go func(ctx context.Context, subscriber *autosubscribefeed.Subscriber) {
err := subscriber.Subscribe(ctx)
if err != nil {
err = errors.Wrap(err, "resubscriber worker exited with error")
logger.Errorln(err)
return
}
}(ctx, subscriber)

if streamScheduler != nil {
go func(ctx context.Context, streamScheduler *streamschedule.StreamSchedule) {
err := streamScheduler.RunWorker(ctx)
if err != nil {
err = errors.Wrap(err, "stream scheduler worker exited with error")
logger.Errorln(err)
return
}
}(ctx, streamScheduler)
}

// declare handler functions
http.HandleFunc("/health", health.Handler)
http.HandleFunc("/", rss.Handler(ctx, logger, cfg.VerificationToken, dataHandlers...))

// listen
errCh := make(chan error, 1)
go func(errCh chan<- error) {
logger.Infof("Server is listening at %s", cfg.Host)
errCh <- http.ListenAndServe(cfg.Host, nil)
}(errCh)

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

select {
case <-ctx.Done():
case err = <-errCh:
err = errors.Wrap(err, "unexpected server error")
return
case <-quit:
}

return
}
35 changes: 35 additions & 0 deletions cmd/ytfeed/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"context"
"time"

log "github.com/sirupsen/logrus"
"github.com/worksinmagic/ytfeed/app/ytfeed"
)

const (
cleanupDelay = 3 * time.Second
)

func main() {
logger := log.New()

logger.Infoln("Starting server")

ctx, cancel := context.WithCancel(context.Background())
err := ytfeed.Run(ctx, logger)
if err != nil {
cancel()
logger.Infoln("Server shutting down because of unexpected error, waiting 3 seconds to complete cleaning up")
<-time.After(cleanupDelay)

logger.Fatalf("Unexpected error: %v", err)
}

cancel()
logger.Infoln("Server shutting down, waiting 3 seconds to complete cleaning up")
<-time.After(cleanupDelay)

logger.Infoln("Server shut down")
}
Loading

0 comments on commit 0070389

Please sign in to comment.