Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bazarr] Add bounded concurrency to episodes endpoint #238

Merged
merged 5 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/actions/lint/action.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
name: Tests
description: Runs Go tests

runs:
using: composite
steps:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: ">=1.19"

# Run golangcilint before `go get` is ran
# https://github.com/golangci/golangci-lint-action/issues/23
- uses: golangci/golangci-lint-action@v2
with:
version: v1.55.1
args: --timeout 5m --config .github/lint/golangci.yaml
5 changes: 0 additions & 5 deletions .github/actions/tests/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ runs:
go mod tidy
git diff --exit-code

- uses: golangci/golangci-lint-action@v2
with:
version: v1.51.2
args: --timeout 5m --config .github/lint/golangci.yaml

- name: Run Unit tests
shell: bash
run: |
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/on-merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ jobs:
- name: Tests
uses: ./.github/actions/tests

lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4

- name: Lint
uses: ./.github/actions/lint

release-image:
runs-on: ubuntu-latest
steps:
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/on-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ on:
- synchronize

jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4

- name: Lint
uses: ./.github/actions/lint
tests:
runs-on: ubuntu-latest
steps:
Expand Down
17 changes: 15 additions & 2 deletions .github/workflows/on-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@ jobs:
- name: Tests
uses: ./.github/actions/tests

lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4

- name: Lint
uses: ./.github/actions/lint

release-image:
runs-on: ubuntu-latest
needs: tests
needs:
- tests
- lint
steps:
- name: Checkout
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4
Expand All @@ -30,7 +41,9 @@ jobs:

release-binaries:
runs-on: ubuntu-latest
needs: tests
needs:
- tests
- lint
steps:
- name: Checkout
uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4
Expand Down
161 changes: 100 additions & 61 deletions internal/arr/collector/bazarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,39 @@ package collector

import (
"fmt"
"sync"
"time"

"github.com/onedr0p/exportarr/internal/arr/client"
"github.com/onedr0p/exportarr/internal/arr/config"
"github.com/onedr0p/exportarr/internal/arr/model"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

type stats struct {
downloaded int
monitored int
unmonitored int
missing int
wanted int
fileSize int64
history int
languages map[string]int
scores map[string]int
providers map[string]int
mut sync.Mutex
}

func newStats() *stats {
return &stats{
languages: make(map[string]int),
scores: make(map[string]int),
providers: make(map[string]int),
}
}

type bazarrCollector struct {
config *config.ArrConfig // App configuration
subtitlesHistoryMetric *prometheus.Desc // Total number of subtitles history
Expand Down Expand Up @@ -46,6 +70,19 @@ type bazarrCollector struct {
errorMetric *prometheus.Desc // Error Description for use with InvalidMetric
}

func createIDBatches(ids []string, batchSize int) [][]string {
if len(ids) == 0 {
return [][]string{}
}
items := ids
ret := [][]string{}
for batchSize < len(items) {
ret = append(ret, items[0:batchSize:batchSize])
items = items[batchSize:]
}
return append(ret, items)
}

func NewBazarrCollector(c *config.ArrConfig) *bazarrCollector {
subtitleText := "subtitles"
episodeText := "episode"
Expand Down Expand Up @@ -268,38 +305,30 @@ func (collector *bazarrCollector) Collect(ch chan<- prometheus.Metric) {
log.Debugw("All Completed", "duration", mt)
}

type stats struct {
downloaded int
monitored int
unmonitored int
missing int
wanted int
fileSize int64
history int
languages map[string]int
scores map[string]int
providers map[string]int
}

func (collector *bazarrCollector) EpisodeMovieMetrics(ch chan<- prometheus.Metric, c *client.Client) {

episodeStats := collector.CollectEpisodeStats(ch, c)
if episodeStats == nil {
return
episodeStats := newStats()
if collector.config.EnableAdditionalMetrics {
episodeStats = collector.CollectEpisodeStats(ch, c)
if episodeStats == nil {
return
}
}

movieStats := collector.CollectMovieStats(ch, c)
if movieStats == nil {
return
}

ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesHistoryMetric, prometheus.GaugeValue, float64(episodeStats.history))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesDownloadedMetric, prometheus.GaugeValue, float64(episodeStats.downloaded))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesMonitoredMetric, prometheus.GaugeValue, float64(episodeStats.monitored))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesUnmonitoredMetric, prometheus.GaugeValue, float64(episodeStats.unmonitored))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesWantedMetric, prometheus.GaugeValue, float64(episodeStats.wanted))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesMissingMetric, prometheus.GaugeValue, float64(episodeStats.missing))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesFileSizeMetric, prometheus.GaugeValue, float64(episodeStats.fileSize))
if collector.config.EnableAdditionalMetrics {
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesHistoryMetric, prometheus.GaugeValue, float64(episodeStats.history))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesDownloadedMetric, prometheus.GaugeValue, float64(episodeStats.downloaded))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesMonitoredMetric, prometheus.GaugeValue, float64(episodeStats.monitored))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesUnmonitoredMetric, prometheus.GaugeValue, float64(episodeStats.unmonitored))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesWantedMetric, prometheus.GaugeValue, float64(episodeStats.wanted))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesMissingMetric, prometheus.GaugeValue, float64(episodeStats.missing))
ch <- prometheus.MustNewConstMetric(collector.episodeSubtitlesFileSizeMetric, prometheus.GaugeValue, float64(episodeStats.fileSize))
}

ch <- prometheus.MustNewConstMetric(collector.movieSubtitlesHistoryMetric, prometheus.GaugeValue, float64(movieStats.history))
ch <- prometheus.MustNewConstMetric(collector.movieSubtitlesDownloadedMetric, prometheus.GaugeValue, float64(movieStats.downloaded))
Expand Down Expand Up @@ -365,15 +394,11 @@ func (collector *bazarrCollector) EpisodeMovieMetrics(ch chan<- prometheus.Metri

func (collector *bazarrCollector) CollectEpisodeStats(ch chan<- prometheus.Metric, c *client.Client) *stats {
log := zap.S().With("collector", "bazarr")
episodeStats := new(stats)
episodeStats.languages = make(map[string]int)
episodeStats.scores = make(map[string]int)
episodeStats.providers = make(map[string]int)
episodeStats := newStats()

mseries := time.Now()

series := model.BazarrSeries{}

if err := c.DoRequest("series", &series); err != nil {
log.Errorw("Error getting series",
"error", err)
Expand All @@ -385,45 +410,59 @@ func (collector *bazarrCollector) CollectEpisodeStats(ch chan<- prometheus.Metri
for _, s := range series.Data {
ids = append(ids, fmt.Sprintf("%d", s.Id))
}

params := client.QueryParams{"seriesid[]": ids}

episodes := model.BazarrEpisodes{}
if err := c.DoRequest("episodes", &episodes, params); err != nil {
log.Errorw("Error getting episodes subtitles", "error", err)
ch <- prometheus.NewInvalidMetric(collector.errorMetric, err)
return nil
}
for _, e := range episodes.Data {
if !e.Monitored {
episodeStats.unmonitored++
} else {
episodeStats.monitored++
}

if len(e.MissingSubtitles) > 0 {
episodeStats.missing += len(e.MissingSubtitles)
if len(e.Subtitles) == 0 {
episodeStats.wanted++
batches := createIDBatches(ids, collector.config.Bazarr.SeriesBatchSize)

eg := errgroup.Group{}
sem := make(chan int, collector.config.Bazarr.SeriesBatchConcurrency) // limit concurrency via semaphore
for _, batch := range batches {
params := client.QueryParams{"seriesid[]": batch}
sem <- 1 //
eg.Go(func() error {
defer func() { <-sem }()
episodes := model.BazarrEpisodes{}
if err := c.DoRequest("episodes", &episodes, params); err != nil {
return err
}
}
episodeStats.mut.Lock()
defer episodeStats.mut.Unlock()
for _, e := range episodes.Data {
if !e.Monitored {
episodeStats.unmonitored++
} else {
episodeStats.monitored++
}

if len(e.Subtitles) > 0 {
episodeStats.downloaded += len(e.Subtitles)
for _, subtitle := range e.Subtitles {
if subtitle.Language != "" {
episodeStats.languages[subtitle.Language]++
} else if subtitle.Code2 != "" {
episodeStats.languages[subtitle.Code2]++
} else if subtitle.Code3 != "" {
episodeStats.languages[subtitle.Code3]++
if len(e.MissingSubtitles) > 0 {
episodeStats.missing += len(e.MissingSubtitles)
if len(e.Subtitles) == 0 {
episodeStats.wanted++
}
}

if subtitle.Size != 0 {
episodeStats.fileSize += subtitle.Size
if len(e.Subtitles) > 0 {
episodeStats.downloaded += len(e.Subtitles)
for _, subtitle := range e.Subtitles {
if subtitle.Language != "" {
episodeStats.languages[subtitle.Language]++
} else if subtitle.Code2 != "" {
episodeStats.languages[subtitle.Code2]++
} else if subtitle.Code3 != "" {
episodeStats.languages[subtitle.Code3]++
}

if subtitle.Size != 0 {
episodeStats.fileSize += subtitle.Size
}
}
}
}
}
return nil
})
}
if err := eg.Wait(); err != nil {
log.Errorw("Error getting episodes subtitles", "error", err)
ch <- prometheus.NewInvalidMetric(collector.errorMetric, err)
return nil
}

history := model.BazarrHistory{}
Expand Down
Loading
Loading