Skip to content

Commit

Permalink
Introduce retry blocks for initialization tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
udoprog committed Jul 14, 2020
1 parent 77a38bc commit d26b4f1
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 72 deletions.
38 changes: 38 additions & 0 deletions bot/src/backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::time::Duration;

/// Global maximum span delay for a retry.
const GLOBAL_MAX: Duration = Duration::from_secs(120);

/// An exponential backoff.
pub struct Exponential {
initial: Duration,
attempt: usize,
}

impl Exponential {
/// Construct a new exponential backoff.
pub fn new(initial: Duration) -> Self {
Self {
initial,
attempt: 0,
}
}

/// Get the next duration and increment the attempt counter.
pub fn next(&mut self) -> Duration {
let mut duration = self.initial;

if self.attempt <= 4 {
duration *= 2u32 << usize::min(self.attempt, 4);

if duration > GLOBAL_MAX {
duration = GLOBAL_MAX;
}
} else {
duration = GLOBAL_MAX;
}

self.attempt += 1;
duration
}
}
1 change: 1 addition & 0 deletions bot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub const VERSION: &str = version_str!();
mod macros;
pub mod api;
pub mod auth;
mod backoff;
pub mod bus;
mod command;
pub mod currency;
Expand Down
24 changes: 24 additions & 0 deletions bot/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,27 @@ macro_rules! respond {
$ctx.respond(&format!($($t)*)).await;
}};
}

/// Backoff and retry until the given action has been successfully executed.
#[macro_export]
macro_rules! retry_until_ok {
($id:expr, { $($f:tt)* }) => {
async {
let mut backoff = $crate::backoff::Exponential::new(std::time::Duration::from_secs(2));

loop {
log::info!("{}", $id);
let res = async { $($f)* }.await;

match res {
Ok(output) => break output,
Err(e) => {
let duration = backoff.next();
log_warn!(e, "\"{}\" failed, trying again in {:?}", $id, duration);
tokio::time::delay_for(duration).await;
}
}
}
}
};
}
4 changes: 2 additions & 2 deletions bot/src/player/mixer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{api, convert_item, utils, Item, Song};
use crate::{api::spotify::PrivateUser, db, track_id::TrackId};
use crate::{db, track_id::TrackId};
use anyhow::Result;
use chrono::Utc;
use std::{collections::VecDeque, sync::Arc};
Expand Down Expand Up @@ -40,7 +40,7 @@ impl Mixer {
youtube: &api::YouTube,
) -> Result<()> {
// TODO: cache this value
let streamer: PrivateUser = spotify.me().await?;
let streamer = spotify.me().await?;
let market = streamer.country.as_deref();

// Add tracks from database.
Expand Down
56 changes: 32 additions & 24 deletions bot/src/player/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ pub async fn run(
.boxed(),
);

futures.push(
SongFile::run(injector.clone(), settings.scoped("song-file"))
.instrument(trace_span!(target: "futures", "song-file"))
.boxed(),
);

let bus = bus::Bus::new();

let (song_update_interval_stream, song_update_interval) = settings
Expand Down Expand Up @@ -225,6 +231,7 @@ pub async fn run(
.await?;

let internal = Arc::new(RwLock::new(PlayerInternal {
initialized: Default::default(),
injector: injector.clone(),
player: PlayerKind::None,
detached,
Expand All @@ -247,39 +254,40 @@ pub async fn run(
closed: None,
}));

let playback = PlaybackFuture {
internal: internal.clone(),
connect_stream,
playback_mode_stream,
detached_stream,
song_update_interval,
song_update_interval_stream,
};

futures.push(
playback
.run(injector.clone(), settings)
.instrument(trace_span!(target: "futures", "playback"))
.boxed(),
);

let parent_player = Player {
inner: internal.clone(),
};

// future to initialize the player future.
// Yeah, I know....
let future = async move {
internal.write().await.initialize_queue().await?;

let playback = PlaybackFuture {
internal: internal.clone(),
connect_stream,
playback_mode_stream,
detached_stream,
song_update_interval,
song_update_interval_stream,
};

futures.push(
SongFile::run(injector.clone(), settings.scoped("song-file"))
.instrument(trace_span!(target: "futures", "song-file"))
.boxed(),
);
// Note: these tasks might fail sporadically, since we need to perform external
// API calls to initialize metadata for playback items.
retry_until_ok!("Initialize Player", {
internal.write().await.initialize().await
})
.await;

futures.push(
playback
.run(injector.clone(), settings)
.instrument(trace_span!(target: "futures", "playback"))
.boxed(),
);
log::info!("Player is up and running!");

futures.select_next_some().await?;
Ok(())
// Drive child futures now that initialization is done.
futures.select_next_some().await
};

Ok((parent_player, future.boxed()))
Expand Down
11 changes: 0 additions & 11 deletions bot/src/player/playback_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ impl PlaybackFuture {
.update_fallback_items(fallback)
.await;

let mut spotify_token_ready = Some({
let internal = self.internal.read().await;
let spotify_token = internal.spotify.token.clone();

Box::pin(async move { spotify_token.wait_until_ready().await })
});

let (mut song_stream, song) = injector.stream::<Song>().await;
let mut song_timeout = song.map(|s| tokio::time::delay_until(s.deadline().into()));

Expand All @@ -60,10 +53,6 @@ impl PlaybackFuture {
fallback = fallback_stream.select_next_some() => {
self.internal.write().await.update_fallback_items(fallback).await;
}
_ = spotify_token_ready.current() => {
log::info!("Synchronizing Spotify Playback");
self.internal.write().await.sync_spotify_playback().await?;
}
/* player */
_ = song_timeout.current() => {
let mut internal = self.internal.write().await;
Expand Down
72 changes: 37 additions & 35 deletions bot/src/player/player_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use std::{sync::Arc, time::Duration};

#[derive(Default)]
pub(super) struct Initialized {
queue: bool,
playback_state: bool,
}

pub(super) struct PlayerInternal {
pub(super) initialized: Initialized,
pub(super) injector: injector::Injector,
/// Current player kind.
pub(super) player: PlayerKind,
Expand Down Expand Up @@ -46,45 +53,40 @@ pub(super) struct PlayerInternal {
}

impl PlayerInternal {
/// Initialize the queue from the database.
pub async fn initialize_queue(&mut self) -> Result<()> {
self.mixer
.initialize_queue(&*self.spotify, &*self.youtube)
.await
}
/// Initialize the internal player if necessary.
pub async fn initialize(&mut self) -> Result<()> {
if !self.initialized.playback_state {
let p = self.spotify.me_player().await?;

if let Some(p) = p {
log::trace!("Detected Spotify playback: {:?}", p);

match Song::from_playback(&p) {
Some(song) => {
log::trace!("Syncing playback");
let volume_percent = p.device.volume_percent;
self.device.sync_device(Some(p.device)).await?;
self.connect_player
.set_scaled_volume(volume_percent)
.await?;
self.sync(song).await?;
}
None => {
log::trace!("Pausing playback since item is missing");
self.pause(Source::Automatic).await?;
}
}
}

/// Try to sync Spotify playback.
pub async fn sync_spotify_playback(&mut self) -> Result<()> {
if !self.spotify.token.is_ready().await {
return Ok(());
self.initialized.playback_state = true;
}

let p = match self.spotify.me_player().await {
Ok(p) => p,
Err(e) => {
log::warn!("Failed to sync playback: {}", e);
return Ok(());
}
};
if !self.initialized.queue {
self.mixer
.initialize_queue(&*self.spotify, &*self.youtube)
.await?;

if let Some(p) = p {
log::trace!("Detected playback: {:?}", p);

match Song::from_playback(&p) {
Some(song) => {
log::trace!("Syncing playback");
let volume_percent = p.device.volume_percent;
self.device.sync_device(Some(p.device)).await?;
self.connect_player
.set_scaled_volume(volume_percent)
.await?;
self.sync(song).await?;
}
None => {
log::trace!("Pausing playback since item is missing");
self.pause(Source::Automatic).await?;
}
}
self.initialized.queue = true;
}

Ok(())
Expand Down

0 comments on commit d26b4f1

Please sign in to comment.