Skip to content

Commit

Permalink
feat: add core commands + metadata store (#3931)
Browse files Browse the repository at this point in the history
* feat: add remote metadata store

* feat: add core mirroring commands

* chore: remove remote private api

* chore: remove upstream

* test: edge basic commands tests

* fix: use edge instead core to register

* fix: bump minor instead of patch

* fix: export should check if edge exist

* fix: apply feedbacks

* fix: bump only patch for fluvio-protocol

* fix: avoid clonings in list remote

* chore: hide core command

* fix: remove k8s feature for core commands
  • Loading branch information
fraidev committed Apr 10, 2024
1 parent 58f7e4d commit 03b766a
Show file tree
Hide file tree
Showing 40 changed files with 1,035 additions and 33 deletions.
55 changes: 55 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,61 @@ jobs:
timeout-minutes: 5
run: cat /tmp/flv_sc.log

mirroring_smoke_test:
runs-on: ubuntu-latest
needs:
- build_primary_binaries
- config
strategy:
matrix:
run-mode: [local, local-k8]
steps:
- uses: actions/checkout@v4
- name: Setup BATS
uses: mig4/setup-bats@v1
with:
bats-version: ${{ env.BATS_VERSION }}

# Download artifacts from development build
- name: Download artifact - fluvio
uses: actions/download-artifact@v3
with:
name: fluvio-x86_64-unknown-linux-musl
path: ~/bin
- name: Download artifact - fluvio-run
uses: actions/download-artifact@v3
with:
name: fluvio-run-x86_64-unknown-linux-musl
path: ~/extensions

- name: Setup K8s Cluster
if: matrix.run-mode == 'local-k8'
run: |
curl -s https://raw.githubusercontent.com/rancher/k3d/main/install.sh | TAG=${{ env.K3D_VERSION }} bash
./k8-util/cluster/reset-k3d.sh
- name: Wait 15s for K3D Reset
if: matrix.run-mode == 'local-k8'
run: sleep 15

- run: |
chmod +x ~/bin/fluvio
chmod +x ~/extensions/fluvio-run
mkdir -p ~/.fluvio/bin
mkdir -p ~/.fluvio/extensions
mv ~/bin/fluvio ~/.fluvio/bin/fluvio
mv ~/extensions/fluvio-run ~/.fluvio/extensions/fluvio-run
echo "~/.fluvio/bin" >> $GITHUB_PATH
- name: Start cluster
run: fluvio cluster start --${{ matrix.run-mode }}
- name: Run Fluvio mirroring tests
run: make FLUVIO_BIN=~/.fluvio/bin/fluvio cli-fluvio-mirroring-smoke

- name: Print SC logs
if: ${{ !success() }}
timeout-minutes: 5
run: cat /tmp/flv_sc.log

# Runs tests on `tests/cli/partition_test`
partition_test:
name: Partitions Test (${{ matrix.test }} with ${{ matrix.spu }} SPUs, ${{ matrix.partitions }} Partitions and ${{ matrix.replication }} Replicas), Run Mode ${{ matrix.run-mode }}
Expand Down
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ fluvio-cli-common = { path = "crates/fluvio-cli-common"}
fluvio-compression = { version = "0.3", path = "crates/fluvio-compression" }
fluvio-connector-package = { path = "crates/fluvio-connector-package/" }
fluvio-controlplane = { path = "crates/fluvio-controlplane" }
fluvio-controlplane-metadata = { version = "0.26.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
fluvio-controlplane-metadata = { version = "0.27.0", default-features = false, path = "crates/fluvio-controlplane-metadata" }
fluvio-extension-common = { path = "crates/fluvio-extension-common", default-features = false }
fluvio-hub-util = { path = "crates/fluvio-hub-util" }
fluvio-package-index = { version = "0.7.0", path = "crates/fluvio-package-index", default-features = false }
fluvio-protocol = { version = "0.10.6", path = "crates/fluvio-protocol" }
fluvio-sc-schema = { version = "0.22.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-protocol = { version = "0.10.12", path = "crates/fluvio-protocol" }
fluvio-sc-schema = { version = "0.23.0", path = "crates/fluvio-sc-schema", default-features = false }
fluvio-service = { path = "crates/fluvio-service" }
fluvio-smartengine = { version = "0.7.0", path = "crates/fluvio-smartengine", default-features = false }
fluvio-smartmodule = { version = "0.7.0", path = "crates/fluvio-smartmodule", default-features = false }
Expand Down
77 changes: 77 additions & 0 deletions crates/fluvio-cli/src/client/core/export.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::sync::Arc;
use anyhow::{Context, Result};
use clap::Parser;
use fluvio_extension_common::{target::ClusterTarget, Terminal};
use fluvio_sc_schema::{
edge::EdgeMetadataExport,
remote::{Core, RemoteSpec, RemoteType},
};
use k8_types::K8Obj;
use anyhow::anyhow;

use super::get_admin;

#[derive(Debug, Parser)]
pub struct ExportOpt {
/// id of the edge cluster to export
edge_id: String,
/// name of the file where we should put the file
#[arg(long, short = 'f')]
file: Option<String>,
/// override endpoint of the core cluster
#[arg(long, short = 'e')]
public_endpoint: Option<String>,
// id of the core cluster to share
#[arg(name = "c")]
core_id: Option<String>,
}

impl ExportOpt {
pub async fn execute<T: Terminal>(
self,
out: Arc<T>,
cluster_target: ClusterTarget,
) -> Result<()> {
let public_endpoint = if let Some(public_endpoint) = self.public_endpoint {
public_endpoint
} else {
let fluvio_config = cluster_target.clone().load()?;
fluvio_config.endpoint
};

let admin = get_admin(cluster_target).await?;
let all_remotes = admin.all::<RemoteSpec>().await?;
let _edge = all_remotes
.iter()
.find(|remote| match &remote.spec.remote_type {
RemoteType::Edge(edge) => edge.id == self.edge_id,
_ => false,
})
.ok_or_else(|| anyhow!("edge cluster not found"))?;

let core_id = self.core_id.clone().unwrap_or_else(|| "core".to_owned());

let metadata_name = format!("edge-{}", self.edge_id);
let edge_metadata = vec![K8Obj::new(
metadata_name,
RemoteSpec {
remote_type: RemoteType::Core(Core {
id: core_id,
edge_id: self.edge_id,
public_endpoint,
}),
},
)];

let metadata = EdgeMetadataExport::new(edge_metadata);

if let Some(filename) = self.file {
std::fs::write(filename, serde_json::to_string_pretty(&metadata)?)
.context("failed to write output file")?;
} else {
out.println(&serde_json::to_string_pretty(&metadata)?);
}

Ok(())
}
}
115 changes: 115 additions & 0 deletions crates/fluvio-cli/src/client/core/list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
pub use std::sync::Arc;

use anyhow::Result;
use clap::Parser;
use fluvio_extension_common::target::ClusterTarget;
use fluvio_extension_common::{OutputFormat, Terminal};
use fluvio_sc_schema::remote::{RemoteSpec, RemoteStatus};

use super::get_admin;

#[derive(Debug, Parser)]
pub struct ListOpt {
#[clap(flatten)]
output: OutputFormat,
}
impl ListOpt {
pub async fn execute<T: Terminal>(
self,
out: Arc<T>,
cluster_target: ClusterTarget,
) -> Result<()> {
let admin = get_admin(cluster_target).await?;
let list = admin.all::<RemoteSpec>().await?;

let outlist: Vec<(String, String, String, String)> = list
.into_iter()
.map(|item| {
let status: RemoteStatus = item.status;
(
item.name,
item.spec.type_name().to_string(),
status.to_string(),
status.connection_stat.last_seen.to_string(),
)
})
.collect();
output::format(out, outlist, self.output.format)
}
}

#[allow(dead_code)]
mod output {

//!
//! # Fluvio list - output processing
//!
//! Format SmartModules response based on output type
use comfy_table::{Cell, Row};
use comfy_table::CellAlignment;
use tracing::debug;
use serde::Serialize;
use anyhow::Result;

use fluvio_extension_common::output::OutputType;
use fluvio_extension_common::Terminal;
use fluvio_extension_common::output::TableOutputHandler;
use fluvio_extension_common::t_println;

type ListVec = Vec<(String, String, String, String)>;

#[derive(Serialize)]
struct TableList(ListVec);

// -----------------------------------
// Format Output
// -----------------------------------

/// Format SmartModules based on output type
pub fn format<O: Terminal>(
out: std::sync::Arc<O>,
listvec: ListVec,
output_type: OutputType,
) -> Result<()> {
debug!("listvec: {:#?}", listvec);

if !listvec.is_empty() {
let rlist = TableList(listvec);
out.render_list(&rlist, output_type)?;
Ok(())
} else {
t_println!(out, "no items");
Ok(())
}
}

// -----------------------------------
// Output Handlers
// -----------------------------------
impl TableOutputHandler for TableList {
/// table header implementation
fn header(&self) -> Row {
Row::from(["REMOTE ", "TYPE", "STATUS", "LAST-SEEN"])
}

/// return errors in string format
fn errors(&self) -> Vec<String> {
vec![]
}

/// table content implementation
fn content(&self) -> Vec<Row> {
self.0
.iter()
.map(|e| {
Row::from([
Cell::new(&e.0).set_alignment(CellAlignment::Left),
Cell::new(&e.1).set_alignment(CellAlignment::Left),
Cell::new(&e.2).set_alignment(CellAlignment::Left),
Cell::new(&e.3).set_alignment(CellAlignment::Left),
])
})
.collect()
}
}
}
53 changes: 53 additions & 0 deletions crates/fluvio-cli/src/client/core/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
pub mod unregister;
pub mod list;
pub mod register;
pub mod export;

use std::sync::Arc;
use anyhow::Result;
use clap::Parser;
use fluvio_extension_common::target::ClusterTarget;
use unregister::UnregisterOpt;
use list::ListOpt;
use register::RegisterOpt;
use fluvio::FluvioAdmin;
use fluvio_extension_common::output::Terminal;
use self::export::ExportOpt;

#[derive(Debug, Parser)]
pub enum CoreCmd {
/// Register a new remote cluster
#[command(name = "register")]
Register(RegisterOpt),
/// List all remote clusters
#[command(name = "list")]
List(ListOpt),
/// List all remote clusters
#[command(name = "unregister")]
Unregister(UnregisterOpt),
// /// Generate metadata file for remote cluster
#[command(name = "export")]
Export(ExportOpt),
}

impl CoreCmd {
pub async fn process<O: Terminal>(
self,
out: Arc<O>,
cluster_target: ClusterTarget,
) -> Result<()> {
match self {
Self::Register(reg) => reg.execute(out, cluster_target).await,
Self::Unregister(del) => del.execute(out, cluster_target).await,
Self::List(list) => list.execute(out, cluster_target).await,
Self::Export(meta) => meta.execute(out, cluster_target).await,
}
}
}

pub async fn get_admin(cluster_target: ClusterTarget) -> Result<FluvioAdmin> {
let fluvio_config = cluster_target.load()?;
let flv = fluvio::Fluvio::connect_with_config(&fluvio_config).await?;
let admin = flv.admin().await;
Ok(admin)
}
36 changes: 36 additions & 0 deletions crates/fluvio-cli/src/client/core/register.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::sync::Arc;
use anyhow::Result;
use clap::Parser;
use fluvio_controlplane_metadata::remote::{RemoteSpec, RemoteType};
use fluvio_extension_common::target::ClusterTarget;
use fluvio_extension_common::Terminal;
use fluvio_sc_schema::remote::Edge;

#[derive(Debug, Parser)]
pub struct RegisterOpt {
name: String,
}

impl RegisterOpt {
pub async fn execute<T: Terminal>(
self,
_out: Arc<T>,
cluster_target: ClusterTarget,
) -> Result<()> {
let fluvio_config = cluster_target.load()?;
let admin = fluvio::Fluvio::connect_with_config(&fluvio_config)
.await?
.admin()
.await;

let spec = RemoteSpec {
remote_type: RemoteType::Edge(Edge {
id: self.name.clone(),
}),
};

admin.create(self.name.clone(), false, spec).await?;
println!("edge cluster \"{}\" was registered", self.name);
Ok(())
}
}
Loading

0 comments on commit 03b766a

Please sign in to comment.