Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topology building cleanup #1067

Merged
merged 2 commits into from Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 9 additions & 38 deletions shotover-proxy/src/config/topology.rs
Expand Up @@ -15,39 +15,14 @@ pub struct Topology {
pub source_to_chain_mapping: HashMap<String, String>,
}

#[derive(Deserialize, Debug)]
pub struct TopologyConfig {
pub sources: HashMap<String, SourcesConfig>,
pub chain_config: HashMap<String, Vec<TransformsConfig>>,
pub source_to_chain_mapping: HashMap<String, String>,
}

impl Topology {
pub fn from_yaml(yaml_contents: String) -> Topology {
let deserializer = serde_yaml::Deserializer::from_str(&yaml_contents);
let config: TopologyConfig =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();
Topology::topology_from_config(config)
}

pub fn from_file(filepath: String) -> Result<Topology> {
let file = std::fs::File::open(&filepath).map_err(|err| {
anyhow!(err).context(format!("Couldn't open the topology file {}", &filepath))
pub fn from_file(filepath: &str) -> Result<Topology> {
let file = std::fs::File::open(filepath).map_err(|err| {
anyhow!(err).context(format!("Couldn't open the topology file {}", filepath))
})?;
let deserializer = serde_yaml::Deserializer::from_reader(file);
let config: TopologyConfig =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer)
.context(format!("Failed to parse topology file {}", &filepath))?;

Ok(Topology::topology_from_config(config))
}

fn topology_from_config(config: TopologyConfig) -> Topology {
Topology {
sources: config.sources,
chain_config: config.chain_config,
source_to_chain_mapping: config.source_to_chain_mapping,
}
serde_yaml::with::singleton_map_recursive::deserialize(deserializer)
.context(format!("Failed to parse topology file {}", filepath))
}

async fn build_chains(&self) -> Result<HashMap<String, TransformChainBuilder>> {
Expand Down Expand Up @@ -120,7 +95,7 @@ impl Topology {

#[cfg(test)]
mod topology_tests {
use crate::config::topology::{Topology, TopologyConfig};
use crate::config::topology::Topology;
use crate::transforms::coalesce::CoalesceConfig;
use crate::{
sources::{redis_source::RedisConfig, Sources, SourcesConfig},
Expand All @@ -130,7 +105,7 @@ mod topology_tests {
TransformsConfig,
},
};
use std::{collections::HashMap, fs};
use std::collections::HashMap;
use tokio::sync::watch;

async fn run_test_topology(chain: Vec<TransformsConfig>) -> anyhow::Result<Vec<Sources>> {
Expand All @@ -148,15 +123,14 @@ mod topology_tests {
let mut sources = HashMap::new();
sources.insert("redis_prod".to_string(), redis_source);

let config = TopologyConfig {
let topology = Topology {
sources,
chain_config,
source_to_chain_mapping: HashMap::new(), // Leave source to chain mapping empty so it doesn't build and run the transform chains
};

let (_sender, trigger_shutdown_rx) = watch::channel::<bool>(false);

let topology = Topology::topology_from_config(config);
topology.run_chains(trigger_shutdown_rx).await
}

Expand Down Expand Up @@ -541,10 +515,7 @@ redis_chain:
async fn test_validate_chain_multiple_subchains() {
let (_sender, trigger_shutdown_rx) = watch::channel::<bool>(false);

let yaml_contents =
fs::read_to_string("tests/test-configs/invalid_subchains.yaml").unwrap();

let topology = Topology::from_yaml(yaml_contents);
let topology = Topology::from_file("tests/test-configs/invalid_subchains.yaml").unwrap();
let error = topology
.run_chains(trigger_shutdown_rx)
.await
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/runner.rs
Expand Up @@ -74,7 +74,7 @@ pub struct Runner {
impl Runner {
pub fn new(params: ConfigOpts) -> Result<Self> {
let config = Config::from_file(params.config_file)?;
let topology = Topology::from_file(params.topology_file)?;
let topology = Topology::from_file(&params.topology_file)?;

let tracing = TracingState::new(config.main_log_level.as_str(), params.log_format)?;

Expand Down