Skip to content

Commit

Permalink
feat(term): handle subscribe feed error
Browse files Browse the repository at this point in the history
  • Loading branch information
ymgyt committed Feb 22, 2024
1 parent 90c47d3 commit d6abb26
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/synd_feed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ feed-rs = { workspace = true }
futures-util = { workspace = true }
moka = { workspace = true, features = ["future"] }
reqwest = { workspace = true, features = ["stream"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
17 changes: 10 additions & 7 deletions crates/synd_feed/src/feed/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
use moka::future::Cache;

use crate::{
feed::parser::{FetchFeed, ParseResult},
feed::parser::{FetchFeed, FetchFeedResult},
types,
};

Expand Down Expand Up @@ -44,9 +44,10 @@ impl CacheConfig {

#[async_trait]
pub trait FetchCachedFeed: Send + Sync {
async fn fetch_feed(&self, url: String) -> ParseResult<Arc<types::Feed>>;
async fn fetch_feed(&self, url: String) -> FetchFeedResult<Arc<types::Feed>>;
/// Fetch feeds by spawning tasks
async fn fetch_feeds_parallel(&self, urls: &[String]) -> Vec<ParseResult<Arc<types::Feed>>>;
async fn fetch_feeds_parallel(&self, urls: &[String])
-> Vec<FetchFeedResult<Arc<types::Feed>>>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -87,23 +88,25 @@ where
S: FetchFeed + Clone + 'static,
{
#[tracing::instrument(skip_all, fields(%url))]
async fn fetch_feed(&self, url: String) -> ParseResult<Arc<types::Feed>> {
async fn fetch_feed(&self, url: String) -> FetchFeedResult<Arc<types::Feed>> {
// lookup cache
if let Some(feed) = self.cache.get(&url).await {
tracing::debug!(url, "Feed cache hit");
return Ok(feed);
}

let feed = self.service.fetch_feed(url.clone()).await?;
let feed = Arc::new(feed);
let feed = self.service.fetch_feed(url.clone()).await.map(Arc::new)?;

self.cache.insert(url, Arc::clone(&feed)).await;

Ok(feed)
}

/// Fetch feeds by spawning tasks
async fn fetch_feeds_parallel(&self, urls: &[String]) -> Vec<ParseResult<Arc<types::Feed>>> {
async fn fetch_feeds_parallel(
&self,
urls: &[String],
) -> Vec<FetchFeedResult<Arc<types::Feed>>> {
let mut handles = Vec::with_capacity(urls.len());

for url in urls {
Expand Down
63 changes: 38 additions & 25 deletions crates/synd_feed/src/feed/parser.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,49 @@
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use feed_rs::parser::Parser;
use feed_rs::parser::{ParseErrorKind, ParseFeedError, Parser};

use crate::types::Feed;

pub type ParseResult<T> = std::result::Result<T, ParserError>;
pub type FetchFeedResult<T> = std::result::Result<T, FetchFeedError>;

#[derive(Debug, thiserror::Error)]
pub enum ParserError {
pub enum FetchFeedError {
#[error("fetch failed")]
Fetch(#[from] reqwest::Error),
#[error("response size limit exceeded")]
ResponseLimitExceed,

#[error("parse error url: {url} {source}")]
Parse { url: String, source: anyhow::Error },
#[error("invalid feed: {0}")]
InvalidFeed(ParseErrorKind),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("json format error: {0}")]
JsonFormat(#[from] serde_json::Error),
#[error("unsupported json version: {0}")]
JsonUnsupportedVersion(String),
#[error("xml format error: {0}")]
XmlFormat(String),
#[error(transparent)]
Other(#[from] anyhow::Error),
}

#[async_trait]
pub trait FetchFeed: Send + Sync {
async fn fetch_feed(&self, url: String) -> ParseResult<Feed>;
async fn fetch_feed(&self, url: String) -> FetchFeedResult<Feed>;
/// Fetch feeds by spawning tasks
async fn fetch_feeds_parallel(&self, urls: &[String]) -> ParseResult<Vec<Feed>>;
async fn fetch_feeds_parallel(&self, urls: &[String]) -> FetchFeedResult<Vec<Feed>>;
}

#[async_trait]
impl<T> FetchFeed for Arc<T>
where
T: FetchFeed,
{
async fn fetch_feed(&self, url: String) -> ParseResult<Feed> {
async fn fetch_feed(&self, url: String) -> FetchFeedResult<Feed> {
self.fetch_feed(url).await
}
/// Fetch feeds by spawning tasks
async fn fetch_feeds_parallel(&self, urls: &[String]) -> ParseResult<Vec<Feed>> {
async fn fetch_feeds_parallel(&self, urls: &[String]) -> FetchFeedResult<Vec<Feed>> {
self.fetch_feeds_parallel(urls).await
}
}
Expand All @@ -50,32 +57,32 @@ pub struct FeedService {

#[async_trait]
impl FetchFeed for FeedService {
async fn fetch_feed(&self, url: String) -> ParseResult<Feed> {
async fn fetch_feed(&self, url: String) -> FetchFeedResult<Feed> {
use futures_util::StreamExt;
let mut stream = self
.http
.get(&url)
.send()
.await
.map_err(ParserError::Fetch)?
.map_err(FetchFeedError::Fetch)?
.error_for_status()
.map_err(ParserError::Fetch)?
.map_err(FetchFeedError::Fetch)?
.bytes_stream();

let mut buff = Vec::new();

while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(ParserError::Fetch)?;
let chunk = chunk.map_err(FetchFeedError::Fetch)?;
if buff.len() + chunk.len() > self.buff_limit {
return Err(ParserError::ResponseLimitExceed);
return Err(FetchFeedError::ResponseLimitExceed);
}
buff.extend(chunk);
}

self.parse(url, buff.as_slice())
}

async fn fetch_feeds_parallel(&self, urls: &[String]) -> ParseResult<Vec<Feed>> {
async fn fetch_feeds_parallel(&self, urls: &[String]) -> FetchFeedResult<Vec<Feed>> {
// Order is matter, so we could not use tokio JoinSet or futures FuturesUnordered
// should use FuturesOrders ?
let mut handles = Vec::with_capacity(urls.len());
Expand Down Expand Up @@ -108,21 +115,27 @@ impl FeedService {
Self { http, buff_limit }
}

pub fn parse<S>(&self, url: impl Into<String>, source: S) -> ParseResult<Feed>
pub fn parse<S>(&self, url: impl Into<String>, source: S) -> FetchFeedResult<Feed>
where
S: std::io::Read,
{
let url = url.into();
let parser = Self::build_parser(&url);

match parser.parse(source) {
Ok(feed) => Ok(Feed::from((url, feed))),
// TODO: handle error
Err(err) => Err(ParserError::Parse {
url,
source: anyhow::Error::from(err),
}),
}
parser
.parse(source)
.map(|feed| Feed::from((url, feed)))
.map_err(|err| match err {
ParseFeedError::ParseError(kind) => FetchFeedError::InvalidFeed(kind),
ParseFeedError::IoError(io_err) => FetchFeedError::Io(io_err),
ParseFeedError::JsonSerde(json_err) => FetchFeedError::JsonFormat(json_err),
ParseFeedError::JsonUnsupportedVersion(version) => {
FetchFeedError::JsonUnsupportedVersion(version)
}
ParseFeedError::XmlReader(xml_err) => {
FetchFeedError::XmlFormat(format!("{xml_err}"))
}
})
}

fn build_parser(base_uri: impl AsRef<str>) -> Parser {
Expand Down
1 change: 1 addition & 0 deletions crates/synd_term/gql/mutation.gql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mutation SubscribeFeed($input: SubscribeFeedInput!) {
status {
code
}
message
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions crates/synd_term/gql/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@
{
"args": [],
"deprecationReason": null,
"description": null,
"description": "Title of the feed",
"isDeprecated": false,
"name": "title",
"type": {
Expand All @@ -685,7 +685,7 @@
{
"args": [],
"deprecationReason": null,
"description": null,
"description": "Url of the feed",
"isDeprecated": false,
"name": "url",
"type": {
Expand Down Expand Up @@ -1151,6 +1151,12 @@
"isDeprecated": false,
"name": "UNAUTHORIZED"
},
{
"deprecationReason": null,
"description": "Given url is not valid feed url",
"isDeprecated": false,
"name": "INVALID_FEED_URL"
},
{
"deprecationReason": null,
"description": "Something went wrong",
Expand Down
30 changes: 26 additions & 4 deletions crates/synd_term/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use graphql_client::{GraphQLQuery, Response};
use reqwest::header::{self, HeaderValue};
use serde::{de::DeserializeOwned, Serialize};
use synd_o11y::opentelemetry::extension::*;
use thiserror::Error;
use tracing::{error, Span};
use url::Url;

Expand All @@ -18,6 +19,14 @@ pub mod mutation;
pub mod payload;
pub mod query;

#[derive(Error, Debug)]
pub enum SubscribeFeedError {
#[error("invalid feed url: `{feed_url}` ({message})`")]
InvalidFeedUrl { feed_url: String, message: String },
#[error("internal error: {0}")]
Internal(anyhow::Error),
}

#[derive(Clone)]
pub struct Client {
client: reqwest::Client,
Expand Down Expand Up @@ -64,19 +73,32 @@ impl Client {
}

#[tracing::instrument(skip(self))]
pub async fn subscribe_feed(&self, url: String) -> anyhow::Result<types::Feed> {
pub async fn subscribe_feed(&self, url: String) -> Result<types::Feed, SubscribeFeedError> {
use crate::client::mutation::subscribe_feed::ResponseCode;
let var = mutation::subscribe_feed::Variables {
input: mutation::subscribe_feed::SubscribeFeedInput { url },
input: mutation::subscribe_feed::SubscribeFeedInput { url: url.clone() },
};
let request = mutation::SubscribeFeed::build_query(var);
let response: mutation::subscribe_feed::ResponseData = self.request(&request).await?;
let response: mutation::subscribe_feed::ResponseData = self
.request(&request)
.await
.map_err(SubscribeFeedError::Internal)?;

match response.subscribe_feed {
mutation::subscribe_feed::SubscribeFeedSubscribeFeed::SubscribeFeedSuccess(success) => {
Ok(types::Feed::from(success.feed))
}
mutation::subscribe_feed::SubscribeFeedSubscribeFeed::SubscribeFeedError(err) => {
Err(anyhow!("Failed to mutate subscribe_feed {err:?}"))
match err.status.code {
ResponseCode::OK => unreachable!(),
ResponseCode::INVALID_FEED_URL => Err(SubscribeFeedError::InvalidFeedUrl {
feed_url: url,
message: err.message,
}),
err_code => Err(SubscribeFeedError::Internal(anyhow::anyhow!(
"{err_code:?}"
))),
}
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions crates/synd_term/src/client/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod subscribe_feed {
#![allow(dead_code)]
use std::result::Result;
pub const OPERATION_NAME: &str = "SubscribeFeed";
pub const QUERY : & str = "mutation SubscribeFeed($input: SubscribeFeedInput!) {\n subscribeFeed(input: $input) {\n __typename\n ... on SubscribeFeedSuccess {\n feed {\n ...Feed\n }\n status {\n code\n }\n }\n ... on SubscribeFeedError {\n status {\n code\n }\n }\n }\n}\n\nmutation UnsubscribeFeed($input: UnsubscribeFeedInput!) {\n unsubscribeFeed(input: $input) {\n __typename\n ... on UnsubscribeFeedSuccess {\n status {\n code\n }\n }\n ... on UnsubscribeFeedError {\n status {\n code\n }\n }\n }\n}\n\nfragment Feed on Feed {\n id\n title\n url\n updated\n websiteUrl\n description\n entries(first: 10) {\n nodes {\n ...EntryMeta\n }\n }\n links {\n nodes {\n ...Link\n }\n }\n}\n\nfragment EntryMeta on Entry {\n title,\n published,\n summary,\n}\n\nfragment Link on Link {\n href\n rel\n mediaType\n title \n}\n" ;
pub const QUERY : & str = "mutation SubscribeFeed($input: SubscribeFeedInput!) {\n subscribeFeed(input: $input) {\n __typename\n ... on SubscribeFeedSuccess {\n feed {\n ...Feed\n }\n status {\n code\n }\n }\n ... on SubscribeFeedError {\n status {\n code\n }\n message\n }\n }\n}\n\nmutation UnsubscribeFeed($input: UnsubscribeFeedInput!) {\n unsubscribeFeed(input: $input) {\n __typename\n ... on UnsubscribeFeedSuccess {\n status {\n code\n }\n }\n ... on UnsubscribeFeedError {\n status {\n code\n }\n }\n }\n}\n\nfragment Feed on Feed {\n id\n title\n url\n updated\n websiteUrl\n description\n entries(first: 10) {\n nodes {\n ...EntryMeta\n }\n }\n links {\n nodes {\n ...Link\n }\n }\n}\n\nfragment EntryMeta on Entry {\n title,\n published,\n summary,\n}\n\nfragment Link on Link {\n href\n rel\n mediaType\n title \n}\n" ;
use super::*;
use serde::{Deserialize, Serialize};
#[allow(dead_code)]
Expand All @@ -20,6 +20,7 @@ pub mod subscribe_feed {
pub enum ResponseCode {
OK,
UNAUTHORIZED,
INVALID_FEED_URL,
INTERNAL_ERROR,
Other(String),
}
Expand All @@ -28,6 +29,7 @@ pub mod subscribe_feed {
ser.serialize_str(match *self {
ResponseCode::OK => "OK",
ResponseCode::UNAUTHORIZED => "UNAUTHORIZED",
ResponseCode::INVALID_FEED_URL => "INVALID_FEED_URL",
ResponseCode::INTERNAL_ERROR => "INTERNAL_ERROR",
ResponseCode::Other(ref s) => &s,
})
Expand All @@ -39,6 +41,7 @@ pub mod subscribe_feed {
match s.as_str() {
"OK" => Ok(ResponseCode::OK),
"UNAUTHORIZED" => Ok(ResponseCode::UNAUTHORIZED),
"INVALID_FEED_URL" => Ok(ResponseCode::INVALID_FEED_URL),
"INTERNAL_ERROR" => Ok(ResponseCode::INTERNAL_ERROR),
_ => Ok(ResponseCode::Other(s)),
}
Expand Down Expand Up @@ -113,6 +116,7 @@ pub mod subscribe_feed {
#[derive(Deserialize, Debug)]
pub struct SubscribeFeedSubscribeFeedOnSubscribeFeedError {
pub status: SubscribeFeedSubscribeFeedOnSubscribeFeedErrorStatus,
pub message: String,
}
#[derive(Deserialize, Debug)]
pub struct SubscribeFeedSubscribeFeedOnSubscribeFeedErrorStatus {
Expand All @@ -135,7 +139,7 @@ pub mod unsubscribe_feed {
#![allow(dead_code)]
use std::result::Result;
pub const OPERATION_NAME: &str = "UnsubscribeFeed";
pub const QUERY : & str = "mutation SubscribeFeed($input: SubscribeFeedInput!) {\n subscribeFeed(input: $input) {\n __typename\n ... on SubscribeFeedSuccess {\n feed {\n ...Feed\n }\n status {\n code\n }\n }\n ... on SubscribeFeedError {\n status {\n code\n }\n }\n }\n}\n\nmutation UnsubscribeFeed($input: UnsubscribeFeedInput!) {\n unsubscribeFeed(input: $input) {\n __typename\n ... on UnsubscribeFeedSuccess {\n status {\n code\n }\n }\n ... on UnsubscribeFeedError {\n status {\n code\n }\n }\n }\n}\n\nfragment Feed on Feed {\n id\n title\n url\n updated\n websiteUrl\n description\n entries(first: 10) {\n nodes {\n ...EntryMeta\n }\n }\n links {\n nodes {\n ...Link\n }\n }\n}\n\nfragment EntryMeta on Entry {\n title,\n published,\n summary,\n}\n\nfragment Link on Link {\n href\n rel\n mediaType\n title \n}\n" ;
pub const QUERY : & str = "mutation SubscribeFeed($input: SubscribeFeedInput!) {\n subscribeFeed(input: $input) {\n __typename\n ... on SubscribeFeedSuccess {\n feed {\n ...Feed\n }\n status {\n code\n }\n }\n ... on SubscribeFeedError {\n status {\n code\n }\n message\n }\n }\n}\n\nmutation UnsubscribeFeed($input: UnsubscribeFeedInput!) {\n unsubscribeFeed(input: $input) {\n __typename\n ... on UnsubscribeFeedSuccess {\n status {\n code\n }\n }\n ... on UnsubscribeFeedError {\n status {\n code\n }\n }\n }\n}\n\nfragment Feed on Feed {\n id\n title\n url\n updated\n websiteUrl\n description\n entries(first: 10) {\n nodes {\n ...EntryMeta\n }\n }\n links {\n nodes {\n ...Link\n }\n }\n}\n\nfragment EntryMeta on Entry {\n title,\n published,\n summary,\n}\n\nfragment Link on Link {\n href\n rel\n mediaType\n title \n}\n" ;
use super::*;
use serde::{Deserialize, Serialize};
#[allow(dead_code)]
Expand All @@ -150,6 +154,7 @@ pub mod unsubscribe_feed {
pub enum ResponseCode {
OK,
UNAUTHORIZED,
INVALID_FEED_URL,
INTERNAL_ERROR,
Other(String),
}
Expand All @@ -158,6 +163,7 @@ pub mod unsubscribe_feed {
ser.serialize_str(match *self {
ResponseCode::OK => "OK",
ResponseCode::UNAUTHORIZED => "UNAUTHORIZED",
ResponseCode::INVALID_FEED_URL => "INVALID_FEED_URL",
ResponseCode::INTERNAL_ERROR => "INTERNAL_ERROR",
ResponseCode::Other(ref s) => &s,
})
Expand All @@ -169,6 +175,7 @@ pub mod unsubscribe_feed {
match s.as_str() {
"OK" => Ok(ResponseCode::OK),
"UNAUTHORIZED" => Ok(ResponseCode::UNAUTHORIZED),
"INVALID_FEED_URL" => Ok(ResponseCode::INVALID_FEED_URL),
"INTERNAL_ERROR" => Ok(ResponseCode::INTERNAL_ERROR),
_ => Ok(ResponseCode::Other(s)),
}
Expand Down
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ integration-test:

# Update synd_api graphql schema
update-gql-schema:
@graphql-client introspect-schema http://localhost:5959/graphql \
@graphql-client introspect-schema https://localhost:5959/graphql --no-ssl \
--header 'authorization: github {{ github_pat }}' out> crates/synd_term/gql/schema.json

# Generate graphql code
Expand Down

0 comments on commit d6abb26

Please sign in to comment.