Skip to content

Commit

Permalink
[bazarr] Add bounded concurrency to episodes endpoint (#238)
Browse files Browse the repository at this point in the history
* [bazarr] Add bounded concurrency to episodes endpoint

Signed-off-by: Russell <russell@troxel.io>

* Plumb Bazarr Config & Add defaults

Signed-off-by: Russell <russell@troxel.io>

* Move GolangCILint to run first

Signed-off-by: Russell <russell@troxel.io>

* break linter out into it's own job

Signed-off-by: Russell <russell@troxel.io>

* Bump default batch size to 300, put episode metrics behind EnableAdditionalMetrics

Signed-off-by: Russell Troxel <russell@troxel.io>

---------

Signed-off-by: Russell <russell@troxel.io>
Signed-off-by: Russell Troxel <russell@troxel.io>
  • Loading branch information
rtrox authored Jan 25, 2024
1 parent 4fd39ca commit 3276edb
Show file tree
Hide file tree
Showing 16 changed files with 693 additions and 104 deletions.
18 changes: 18 additions & 0 deletions .github/actions/lint/action.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
name: Tests
description: Runs Go tests

runs:
using: composite
steps:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: ">=1.19"

# Run golangcilint before `go get` is ran
# https://github.com/golangci/golangci-lint-action/issues/23
- uses: golangci/golangci-lint-action@v2
with:
version: v1.55.1
args: --timeout 5m --config .github/lint/golangci.yaml
5 changes: 0 additions & 5 deletions .github/actions/tests/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ runs:
go mod tidy
git diff --exit-code
- uses: golangci/golangci-lint-action@v2
with:
version: v1.51.2
args: --timeout 5m --config .github/lint/golangci.yaml

- name: Run Unit tests
shell: bash
run: |
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/on-merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ jobs:
- name: Tests
uses: ./.github/actions/tests

lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4

- name: Lint
uses: ./.github/actions/lint

release-image:
runs-on: ubuntu-latest
steps:
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/on-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ on:
- synchronize

jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4

- name: Lint
uses: ./.github/actions/lint
tests:
runs-on: ubuntu-latest
steps:
Expand Down
17 changes: 15 additions & 2 deletions .github/workflows/on-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@ jobs:
- name: Tests
uses: ./.github/actions/tests

lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4

- name: Lint
uses: ./.github/actions/lint

release-image:
runs-on: ubuntu-latest
needs: tests
needs:
- tests
- lint
steps:
- name: Checkout
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4
Expand All @@ -30,7 +41,9 @@ jobs:

release-binaries:
runs-on: ubuntu-latest
needs: tests
needs:
- tests
- lint
steps:
- name: Checkout
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4
Expand Down
161 changes: 100 additions & 61 deletions internal/arr/collector/bazarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,39 @@ package collector

import (
"fmt"
"sync"
"time"

"github.com/onedr0p/exportarr/internal/arr/client"
"github.com/onedr0p/exportarr/internal/arr/config"
"github.com/onedr0p/exportarr/internal/arr/model"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type stats struct {
downloaded int
monitored int
unmonitored int
missing int
wanted int
fileSize int64
history int
languages map[string]int
scores map[string]int
providers map[string]int
mut sync.Mutex
}

func newStats() *stats {
return &stats{
languages: make(map[string]int),
scores: make(map[string]int),
providers: make(map[string]int),
}
}

type bazarrCollector struct {
config *config.ArrConfig // App configuration
subtitlesHistoryMetric *prometheus.Desc // Total number of subtitles history
Expand Down Expand Up @@ -46,6 +70,19 @@ type bazarrCollector struct {
errorMetric *prometheus.Desc // Error Description for use with InvalidMetric
}

func createIDBatches(ids []string, batchSize int) [][]string {
if len(ids) == 0 {
return [][]string{}
}
items := ids
ret := [][]string{}
for batchSize < len(items) {
ret = append(ret, items[0:batchSize:batchSize])
items = items[batchSize:]
}
return append(ret, items)
}

func NewBazarrCollector(c *config.ArrConfig) *bazarrCollector {
subtitleText := "subtitles"
episodeText := "episode"
Expand Down Expand Up @@ -268,38 +305,30 @@ func (collector *bazarrCollector) Collect(ch chan<- prometheus.Metric) {
log.Debugw("All Completed", "duration", mt)
}

type stats struct {
downloaded int
monitored int
unmonitored int
missing int
wanted int
fileSize int64
history int
languages map[string]int
scores map[string]int
providers map[string]int
}

func (collector *bazarrCollector) EpisodeMovieMetrics(ch chan<- prometheus.Metric, c *client.Client) {

episodeStats := collector.CollectEpisodeStats(ch, c)
if episodeStats == nil {
return
episodeStats := newStats()
if collector.config.EnableAdditionalMetrics {
episodeStats = collector.CollectEpisodeStats(ch, c)
if episodeStats == nil {
return
}
}

movieStats := collector.CollectMovieStats(ch, c)
if movieStats == nil {
return
}

ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesHistoryMetric, prometheus.GaugeValue, float64(episodeStats.history))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesDownloadedMetric, prometheus.GaugeValue, float64(episodeStats.downloaded))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesMonitoredMetric, prometheus.GaugeValue, float64(episodeStats.monitored))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesUnmonitoredMetric, prometheus.GaugeValue, float64(episodeStats.unmonitored))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesWantedMetric, prometheus.GaugeValue, float64(episodeStats.wanted))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesMissingMetric, prometheus.GaugeValue, float64(episodeStats.missing))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesFileSizeMetric, prometheus.GaugeValue, float64(episodeStats.fileSize))
if collector.config.EnableAdditionalMetrics {
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesHistoryMetric, prometheus.GaugeValue, float64(episodeStats.history))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesDownloadedMetric, prometheus.GaugeValue, float64(episodeStats.downloaded))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesMonitoredMetric, prometheus.GaugeValue, float64(episodeStats.monitored))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesUnmonitoredMetric, prometheus.GaugeValue, float64(episodeStats.unmonitored))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesWantedMetric, prometheus.GaugeValue, float64(episodeStats.wanted))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesMissingMetric, prometheus.GaugeValue, float64(episodeStats.missing))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesFileSizeMetric, prometheus.GaugeValue, float64(episodeStats.fileSize))
}

ch <- prometheus.MustNewConstMetric(collector.movieSubtitlesHistoryMetric, prometheus.GaugeValue, float64(movieStats.history))
ch <- prometheus.MustNewConstMetric(collector.movieSubtitlesDownloadedMetric, prometheus.GaugeValue, float64(movieStats.downloaded))
Expand Down Expand Up @@ -365,15 +394,11 @@ func (collector *bazarrCollector) EpisodeMovieMetrics(ch chan<- prometheus.Metri

func (collector *bazarrCollector) CollectEpisodeStats(ch chan<- prometheus.Metric, c *client.Client) *stats {
log := zap.S().With("collector", "bazarr")
episodeStats := new(stats)
episodeStats.languages = make(map[string]int)
episodeStats.scores = make(map[string]int)
episodeStats.providers = make(map[string]int)
episodeStats := newStats()

mseries := time.Now()

series := model.BazarrSeries{}

if err := c.DoRequest("series", &series); err != nil {
log.Errorw("Error getting series",
"error", err)
Expand All @@ -385,45 +410,59 @@ func (collector *bazarrCollector) CollectEpisodeStats(ch chan<- prometheus.Metri
for _, s := range series.Data {
ids = append(ids, fmt.Sprintf("%d", s.Id))
}

params := client.QueryParams{"seriesid[]": ids}

episodes := model.BazarrEpisodes{}
if err := c.DoRequest("episodes", &episodes, params); err != nil {
log.Errorw("Error getting episodes subtitles", "error", err)
ch <- prometheus.NewInvalidMetric(collector.errorMetric, err)
return nil
}
for _, e := range episodes.Data {
if !e.Monitored {
episodeStats.unmonitored++
} else {
episodeStats.monitored++
}

if len(e.MissingSubtitles) > 0 {
episodeStats.missing += len(e.MissingSubtitles)
if len(e.Subtitles) == 0 {
episodeStats.wanted++
batches := createIDBatches(ids, collector.config.Bazarr.SeriesBatchSize)

eg := errgroup.Group{}
sem := make(chan int, collector.config.Bazarr.SeriesBatchConcurrency) // limit concurrency via semaphore
for _, batch := range batches {
params := client.QueryParams{"seriesid[]": batch}
sem <- 1 //
eg.Go(func() error {
defer func() { <-sem }()
episodes := model.BazarrEpisodes{}
if err := c.DoRequest("episodes", &episodes, params); err != nil {
return err
}
}
episodeStats.mut.Lock()
defer episodeStats.mut.Unlock()
for _, e := range episodes.Data {
if !e.Monitored {
episodeStats.unmonitored++
} else {
episodeStats.monitored++
}

if len(e.Subtitles) > 0 {
episodeStats.downloaded += len(e.Subtitles)
for _, subtitle := range e.Subtitles {
if subtitle.Language != "" {
episodeStats.languages[subtitle.Language]++
} else if subtitle.Code2 != "" {
episodeStats.languages[subtitle.Code2]++
} else if subtitle.Code3 != "" {
episodeStats.languages[subtitle.Code3]++
if len(e.MissingSubtitles) > 0 {
episodeStats.missing += len(e.MissingSubtitles)
if len(e.Subtitles) == 0 {
episodeStats.wanted++
}
}

if subtitle.Size != 0 {
episodeStats.fileSize += subtitle.Size
if len(e.Subtitles) > 0 {
episodeStats.downloaded += len(e.Subtitles)
for _, subtitle := range e.Subtitles {
if subtitle.Language != "" {
episodeStats.languages[subtitle.Language]++
} else if subtitle.Code2 != "" {
episodeStats.languages[subtitle.Code2]++
} else if subtitle.Code3 != "" {
episodeStats.languages[subtitle.Code3]++
}

if subtitle.Size != 0 {
episodeStats.fileSize += subtitle.Size
}
}
}
}
}
return nil
})
}
if err := eg.Wait(); err != nil {
log.Errorw("Error getting episodes subtitles", "error", err)
ch <- prometheus.NewInvalidMetric(collector.errorMetric, err)
return nil
}

history := model.BazarrHistory{}
Expand Down
Loading

0 comments on commit 3276edb

Please sign in to comment.