Skip to content

Commit

Permalink
async Mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
iovxw committed Aug 25, 2021
1 parent 91c10db commit f2d7bad
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 29 deletions.
2 changes: 1 addition & 1 deletion README.en.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

**Other Languages:** [Chinese](README.md)

Chinese Telegram RSS bot [@RustRssBot](http://t.me/RustRssBot)
Telegram RSS bot [@RustRssBot](http://t.me/RustRssBot)

**Supports:**
- [x] RSS 0.9
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

**Other Languages:** [English](README.en.md)

中文 Telegram RSS 机器人 [@RustRssBot](http://t.me/RustRssBot)
Telegram RSS 机器人 [@RustRssBot](http://t.me/RustRssBot)

**支持:**
- [x] RSS 0.9
Expand Down
2 changes: 1 addition & 1 deletion src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::sync::Mutex;

use tbot::{contexts::Command, types::parameters, Bot};
use tokio::sync::Mutex;

use crate::data::Database;

Expand Down
4 changes: 2 additions & 2 deletions src/commands/export.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::Arc;
use std::sync::Mutex;

use tbot::{
contexts::Command,
types::{input_file, parameters},
};
use tokio::sync::Mutex;

use crate::data::Database;
use crate::opml::into_opml;
Expand All @@ -28,7 +28,7 @@ pub async fn export(
target_id = channel_id.unwrap();
}

let feeds = db.lock().unwrap().subscribed_feeds(target_id.0);
let feeds = db.lock().await.subscribed_feeds(target_id.0);
if feeds.is_none() {
update_response(
&cmd.bot,
Expand Down
4 changes: 2 additions & 2 deletions src/commands/rss.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::Arc;
use std::sync::Mutex;

use either::Either;
use pinyin::{Pinyin, ToPinyin};
use tbot::{contexts::Command, types::parameters};
use tokio::sync::Mutex;

use crate::data::Database;
use crate::messages::{format_large_msg, Escape};
Expand All @@ -27,7 +27,7 @@ pub async fn rss(
target_id = channel_id.unwrap();
}

let feeds = db.lock().unwrap().subscribed_feeds(target_id.0);
let feeds = db.lock().await.subscribed_feeds(target_id.0);
let mut msgs = if let Some(mut feeds) = feeds {
feeds.sort_by_cached_key(|feed| {
feed.title
Expand Down
2 changes: 1 addition & 1 deletion src/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::sync::Mutex;

use tbot::{contexts::Command, types::parameters};
use tokio::sync::Mutex;

use super::{update_response, Database, MsgTarget};

Expand Down
8 changes: 4 additions & 4 deletions src/commands/sub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::sync::Mutex;

use tbot::{contexts::Command, types::parameters};
use tokio::sync::Mutex;

use crate::client::pull_feed;
use crate::data::Database;
Expand Down Expand Up @@ -36,7 +36,7 @@ pub async fn sub(
return Ok(());
}
};
if db.lock().unwrap().is_subscribed(target_id.0, feed_url) {
if db.lock().await.is_subscribed(target_id.0, feed_url) {
update_response(
&cmd.bot,
target,
Expand All @@ -46,7 +46,7 @@ pub async fn sub(
return Ok(());
}

if cfg!(feature = "hosted-by-iovxw") && db.lock().unwrap().all_feeds().len() >= 1500 {
if cfg!(feature = "hosted-by-iovxw") && db.lock().await.all_feeds().len() >= 1500 {
let msg = tr!("subscription_rate_limit");
update_response(&cmd.bot, target, parameters::Text::with_markdown(msg)).await?;
return Ok(());
Expand All @@ -59,7 +59,7 @@ pub async fn sub(
.await?;
let msg = match pull_feed(feed_url).await {
Ok(feed) => {
if db.lock().unwrap().subscribe(target_id.0, feed_url, &feed) {
if db.lock().await.subscribe(target_id.0, feed_url, &feed) {
tr!(
"subscription_succeeded",
link = Escape(&feed.link),
Expand Down
4 changes: 2 additions & 2 deletions src/commands/unsub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;
use std::sync::Mutex;

use tbot::{contexts::Command, types::parameters};
use tokio::sync::Mutex;

use crate::data::Database;
use crate::messages::Escape;
Expand Down Expand Up @@ -35,7 +35,7 @@ pub async fn unsub(
return Ok(());
}
};
let msg = if let Some(feed) = db.lock().unwrap().unsubscribe(target_id.0, feed_url) {
let msg = if let Some(feed) = db.lock().await.unsubscribe(target_id.0, feed_url) {
tr!(
"unsubscription_succeeded",
link = Escape(&feed.link),
Expand Down
18 changes: 8 additions & 10 deletions src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::cmp;
use std::collections::HashMap;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
Arc,
};

use futures::{future::FutureExt, select_biased};
use tbot::{types::parameters, Bot};
use tokio::{
self,
sync::Notify,
sync::{Mutex, Notify},
time::{self, Duration, Instant},
};
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -41,7 +41,7 @@ pub fn start(bot: Bot, db: Arc<Mutex<Database>>, min_interval: u32, max_interval
});
}
_ = interval.tick().fuse() => {
let feeds = db.lock().unwrap().all_feeds();
let feeds = db.lock().await.all_feeds();
for feed in feeds {
let feed_interval = cmp::min(
cmp::max(
Expand All @@ -66,14 +66,14 @@ async fn fetch_and_push_updates(
let new_feed = match pull_feed(&feed.link).await {
Ok(feed) => feed,
Err(e) => {
let down_time = db.lock().unwrap().get_or_update_down_time(&feed.link);
let down_time = db.lock().await.get_or_update_down_time(&feed.link);
if down_time.is_none() {
// user unsubscribed while fetching the feed
return Ok(());
}
// 5 days
if down_time.unwrap().as_secs() > 5 * 24 * 60 * 60 {
db.lock().unwrap().reset_down_time(&feed.link);
db.lock().await.reset_down_time(&feed.link);
let msg = tr!(
"continuous_fetch_error",
link = Escape(&feed.link),
Expand All @@ -92,7 +92,7 @@ async fn fetch_and_push_updates(
}
};

let updates = db.lock().unwrap().update(&feed.link, new_feed);
let updates = db.lock().await.update(&feed.link, new_feed);
for update in updates {
match update {
FeedUpdate::Items(items) => {
Expand Down Expand Up @@ -158,15 +158,13 @@ async fn push_updates<I: IntoIterator<Item = i64>>(
Err(MethodCall::RequestError { description, .. })
if chat_is_unavailable(&description) =>
{
db.lock().unwrap().delete_subscriber(subscriber);
db.lock().await.delete_subscriber(subscriber);
}
Err(MethodCall::RequestError {
migrate_to_chat_id: Some(new_chat_id),
..
}) => {
db.lock()
.unwrap()
.update_subscriber(subscriber, new_chat_id.0);
db.lock().await.update_subscriber(subscriber, new_chat_id.0);
subscriber = new_chat_id.0;
continue 'retry;
}
Expand Down
7 changes: 4 additions & 3 deletions src/gardener.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use tbot::Bot;
use tokio::{
self,
sync::Mutex,
time::{self, Duration},
};

Expand All @@ -22,7 +23,7 @@ pub fn start_pruning(bot: Bot, db: Arc<Mutex<Database>>) {
}

async fn prune(bot: &Bot, db: &Mutex<Database>) -> Result<(), tbot::errors::MethodCall> {
let subscribers = db.lock().unwrap().all_subscribers();
let subscribers = db.lock().await.all_subscribers();
for subscriber in subscribers {
let chat_id = tbot::types::chat::Id(subscriber);
let chat = bot.get_chat(chat_id).call().await?;
Expand All @@ -35,7 +36,7 @@ async fn prune(bot: &Bot, db: &Mutex<Database>) -> Result<(), tbot::errors::Meth
// so we don't need to check that.
// And just ignore `can_post_messages` or `can_send_messages`
if me.status.is_left() || me.status.is_kicked() {
db.lock().unwrap().delete_subscriber(subscriber);
db.lock().await.delete_subscriber(subscriber);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use std::env;
use std::panic;
use std::path::PathBuf;
use std::process;
use std::sync::{Arc, Mutex}; // TODO: async Mutex
use std::sync::Arc;

use anyhow::Context;
use hyper_proxy::{Intercept, Proxy};
use once_cell::sync::OnceCell;
use structopt::StructOpt;
use tbot;
use tokio;
use tokio::{self, sync::Mutex};

// Include the tr! macro and localizations
include!(concat!(env!("OUT_DIR"), "/ctl10n_macros.rs"));
Expand Down

0 comments on commit f2d7bad

Please sign in to comment.