Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One RPCClient per Core #345

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions kernel/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,9 @@ def configure_network(args):
for _, ncfg in zip(range(0, args.workers), NETWORK_CONFIG):
sudo[tunctl[['-t', ncfg, '-u', user, '-g', group]]]()
sudo[ip[['link', 'set', ncfg, 'up']]](retcode=(0, 1))
sudo[ip[['link', 'set', ncfg, 'txqueuelen', 65536]]]()
sudo[brctl[['addif', 'br0', ncfg]]]()
sudo[ip[['link', 'set', 'br0', 'txqueuelen', 65536]]]()
sudo[ip[['link', 'set', 'br0', 'up']]](retcode=(0, 1))


Expand Down
31 changes: 9 additions & 22 deletions kernel/src/arch/x86_64/rackscale/client_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use crate::error::{KError, KResult};
use crate::memory::backends::MemManager;
use crate::memory::shmem_affinity::{local_shmem_affinity, mid_to_shmem_affinity};
use crate::process::MAX_PROCESSES;
use crate::transport::shmem::NUM_SHMEM_TRANSPORTS;

/// This is the state the client records about itself
pub(crate) struct ClientState {
/// The RPC client used to communicate with the controller
pub(crate) rpc_client: Arc<Mutex<Client>>,
pub(crate) rpc_clients: Arc<ArrayVec<Mutex<Client>, { NUM_SHMEM_TRANSPORTS as usize }>>,

/// Used to store shmem affinity base pages
pub(crate) affinity_base_pages: Arc<ArrayVec<Mutex<Box<dyn MemManager + Send>>, MAX_MACHINES>>,
Expand All @@ -40,26 +41,12 @@ pub(crate) struct ClientState {

impl ClientState {
pub(crate) fn new() -> ClientState {
// Create network stack and instantiate RPC Client
let rpc_client = if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Ethernet)
{
Arc::new(Mutex::new(
crate::transport::ethernet::init_ethernet_rpc(
smoltcp::wire::IpAddress::v4(172, 31, 0, 11),
CONTROLLER_PORT_BASE + (*crate::environment::MACHINE_ID as u16 - 1),
true,
)
.expect("Failed to initialize ethernet RPC"),
))
} else {
// Default is Shmem, even if transport unspecified
Arc::new(Mutex::new(
crate::transport::shmem::init_shmem_rpc(true)
.expect("Failed to initialize shmem RPC"),
))
};
let clients =
crate::transport::shmem::init_shmem_rpc(true).expect("Failed to initialize shmem RPC");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to create ethernet instead of shmem when ethernet is selected

let mut rpc_clients = ArrayVec::new();
for client in clients.into_iter() {
rpc_clients.push(Mutex::new(client));
}

let mut per_process_base_pages = ArrayVec::new();
for _i in 0..MAX_PROCESSES {
Expand All @@ -76,7 +63,7 @@ impl ClientState {

log::debug!("Finished initializing client state");
ClientState {
rpc_client,
rpc_clients: Arc::new(rpc_clients),
affinity_base_pages: Arc::new(affinity_base_pages),
per_process_base_pages: Arc::new(per_process_base_pages),
}
Expand Down
54 changes: 21 additions & 33 deletions kernel/src/arch/x86_64/rackscale/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::arch::rackscale::dcm::{
use crate::arch::MAX_MACHINES;
use crate::cmdline::Transport;
use crate::transport::ethernet::ETHERNET_IFACE;
use crate::transport::shmem::create_shmem_transport;
use crate::transport::shmem::{create_shmem_transport, NUM_SHMEM_TRANSPORTS};

use super::*;

Expand All @@ -31,46 +31,32 @@ static DCMServerReady: AtomicBool = AtomicBool::new(false);
pub(crate) fn run() {
let mid = *crate::environment::CORE_ID;

// Initialize one server per controller thread
let mut server = if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Ethernet)
{
let transport = Box::new(
TCPTransport::new(
None,
CONTROLLER_PORT_BASE + mid as u16 - 1,
Arc::clone(&ETHERNET_IFACE),
)
.expect("Failed to create TCP transport"),
);
let mut server = Server::new(transport);
register_rpcs(&mut server);
server
} else if crate::CMDLINE
.get()
.map_or(false, |c| c.transport == Transport::Shmem)
{
let transport = Box::new(
create_shmem_transport(mid.try_into().unwrap())
.expect("Failed to create shmem transport"),
);
let transports =
create_shmem_transport(mid.try_into().unwrap()).expect("Failed to create shmem transport");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use ethernet instead of shmem when ethernet is selected


let mut server = Server::new(transport);
let mut servers: ArrayVec<Server<'_>, { NUM_SHMEM_TRANSPORTS as usize }> = ArrayVec::new();
for transport in transports.into_iter() {
let mut server = Server::new(Box::new(transport));
register_rpcs(&mut server);
server
servers.push(server);
}

/*
} else {
unreachable!("No supported transport layer specified in kernel argument");
};
*/

ClientReadyCount.fetch_add(1, Ordering::SeqCst);

// Wait for all clients to connect before fulfilling any RPCs.
while !DCMServerReady.load(Ordering::SeqCst) {}

server
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");
for s_index in 0..servers.len() {
servers[s_index]
.add_client(&CLIENT_REGISTRAR)
.expect("Failed to accept client");
}

ClientReadyCount.fetch_add(1, Ordering::SeqCst);

Expand Down Expand Up @@ -114,9 +100,11 @@ pub(crate) fn run() {
// Start running the RPC server
log::info!("Starting RPC server for client {:?}!", mid);
loop {
let _handled = server
.try_handle()
.expect("Controller failed to handle RPC");
for s_index in 0..servers.len() {
let _handled = servers[s_index]
.try_handle()
.expect("Controller failed to handle RPC");
}
}
}

Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ pub(crate) fn rpc_close(pid: usize, fd: FileDescriptor) -> KResult<(u64, u64)> {
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call Close() RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::Close as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::Close as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode and return result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ pub(crate) fn rpc_delete(pid: usize, pathname: String) -> KResult<(u64, u64)> {
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::Delete as RPCType,
&[&req_data, &pathname.as_bytes()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::Delete as RPCType,
&[&req_data, &pathname.as_bytes()],
&mut [&mut res_data],
)?;

// Decode result - return result if decoding successful
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/getinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ pub(crate) fn rpc_getinfo<P: AsRef<[u8]> + Debug>(pid: usize, name: P) -> KResul

// Construct result buffer and call RPC
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::GetInfo as RPCType,
&[&req_data, name.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::GetInfo as RPCType,
&[&req_data, name.as_ref()],
&mut [&mut res_data],
)?;

// Decode and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/mkdir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ pub(crate) fn rpc_mkdir<P: AsRef<[u8]> + Debug>(
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::MkDir as RPCType,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::MkDir as RPCType,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;

// Parse and return result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ fn rpc_open_create<P: AsRef<[u8]> + Debug>(
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call the RPC
CLIENT_STATE.rpc_client.lock().call(
rpc_type,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
rpc_type,
&[&req_data, pathname.as_ref()],
&mut [&mut res_data],
)?;

// Decode and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/fileops/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ pub(crate) fn rpc_rename<P: AsRef<[u8]> + Debug>(
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];

// Call the RPC
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::FileRename as RPCType,
&[&req_data, oldname.as_ref(), newname.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::FileRename as RPCType,
&[&req_data, oldname.as_ref(), newname.as_ref()],
&mut [&mut res_data],
)?;

// Parse and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
15 changes: 8 additions & 7 deletions kernel/src/arch/x86_64/rackscale/fileops/rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ pub(crate) fn rpc_writeat(
} else {
KernelRpc::WriteAt as RPCType
};
CLIENT_STATE
.rpc_client
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(rpc_type, &[&req_data, &data], &mut [&mut res_data])?;

Expand Down Expand Up @@ -129,11 +128,13 @@ pub(crate) fn rpc_readat(
KernelRpc::ReadAt as RPCType
};

CLIENT_STATE.rpc_client.lock().call(
KernelRpc::ReadAt as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::ReadAt as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode result, if successful, return result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
9 changes: 6 additions & 3 deletions kernel/src/arch/x86_64/rackscale/get_shmem_frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ unsafe_abomonate!(ShmemRegion: base, affinity);
// This isn't truly a syscall
pub(crate) fn rpc_get_shmem_frames(pid: Option<Pid>, num_frames: usize) -> KResult<Vec<Frame>> {
assert!(num_frames > 0);
log::debug!("GetShmemFrames({:?})", num_frames);
log::debug!(
"GetShmemFrames({:?}) core={:?}",
num_frames,
kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)
);

let mid = if pid.is_none() {
Some(*crate::environment::MACHINE_ID)
Expand All @@ -66,8 +70,7 @@ pub(crate) fn rpc_get_shmem_frames(pid: Option<Pid>, num_frames: usize) -> KResu
for i in 0..max_res_size {
res_data.push(0u8);
}
CLIENT_STATE
.rpc_client
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::GetShmemFrames as RPCType,
Expand Down
3 changes: 1 addition & 2 deletions kernel/src/arch/x86_64/rackscale/get_shmem_structure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ pub(crate) fn rpc_get_shmem_structure(

// Make buffer max size of MAX_PROCESS (for NrProcLogs), 1 (for NrLog)
let mut res_data = [0u8; core::mem::size_of::<[u64; MAX_PROCESSES]>()];
CLIENT_STATE
.rpc_client
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::GetShmemStructure as RPCType,
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/processops/allocate_physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ pub(crate) fn rpc_allocate_physical(pid: Pid, size: u64, affinity: u64) -> KResu

// Create result buffer
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::AllocatePhysical as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::AllocatePhysical as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode result, return result if decoded successfully
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/processops/print.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ pub(crate) fn rpc_log(msg: String) -> KResult<(u64, u64)> {

// Construct result buffer and call RPC
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::Log as RPCType,
&[&req_data, print_str.as_ref()],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::Log as RPCType,
&[&req_data, print_str.as_ref()],
&mut [&mut res_data],
)?;

// Decode and return the result
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
12 changes: 7 additions & 5 deletions kernel/src/arch/x86_64/rackscale/processops/release_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ pub(crate) fn rpc_release_core(pid: Pid, gtid: ThreadId) -> KResult<(u64, u64)>

// Create result buffer
let mut res_data = [0u8; core::mem::size_of::<KResult<(u64, u64)>>()];
CLIENT_STATE.rpc_client.lock().call(
KernelRpc::ReleaseCore as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;
CLIENT_STATE.rpc_clients[kpi::system::mtid_from_gtid(*crate::environment::CORE_ID)]
.lock()
.call(
KernelRpc::ReleaseCore as RPCType,
&[&req_data],
&mut [&mut res_data],
)?;

// Decode result, return result if decoded successfully
if let Some((res, remaining)) = unsafe { decode::<KResult<(u64, u64)>>(&mut res_data) } {
Expand Down
Loading
Loading