Skip to content

Commit

Permalink
subkernels: pass destination with messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Spaqin committed Nov 29, 2023
1 parent 6e9f6b3 commit d6566c1
Show file tree
Hide file tree
Showing 13 changed files with 60 additions and 57 deletions.
12 changes: 8 additions & 4 deletions artiq/compiler/transforms/llvm_ir_generator.py
Expand Up @@ -399,7 +399,7 @@ def llbuiltin(self, name):
llty = ll.FunctionType(lli32, [llptr])

elif name == "subkernel_send_message":
llty = ll.FunctionType(llvoid, [lli32, lli8, llsliceptr, llptrptr])
llty = ll.FunctionType(llvoid, [lli32, lli1, lli8, lli8, llsliceptr, llptrptr])
elif name == "subkernel_load_run":
llty = ll.FunctionType(llvoid, [lli32, lli8, lli1])
elif name == "subkernel_await_finish":
Expand Down Expand Up @@ -1710,8 +1710,10 @@ def arg_error_handler(typ):

llargcount = ll.Constant(lli8, len(args))

llisreturn = ll.Constant(lli1, False)

self.llbuilder.call(self.llbuiltin("subkernel_send_message"),
[llsid, llargcount, lltagptr, llargs])
[llsid, llisreturn, lldest, llargcount, lltagptr, llargs])
self.llbuilder.call(self.llbuiltin("llvm.stackrestore"), [llstackptr])

return llsid
Expand Down Expand Up @@ -1748,10 +1750,12 @@ def ret_error_handler(typ):
llretslot = self.llbuilder.bitcast(llretslot, llptr)
self.llbuilder.store(llretslot, llrets)

llsid = ll.Constant(lli32, 0) # return goes back to master, sid is ignored
llsid = ll.Constant(lli32, 0) # return goes back to the caller, sid is ignored
lltagcount = ll.Constant(lli8, 1) # only one thing is returned
llisreturn = ll.Constant(lli1, True) # it's a return, so destination is ignored
lldest = ll.Constant(lli8, 0)
self.llbuilder.call(self.llbuiltin("subkernel_send_message"),
[llsid, lltagcount, lltagptr, llrets])
[llsid, llisreturn, lldest, lltagcount, lltagptr, llrets])

def process_Call(self, insn):
functiontyp = insn.target_function().type
Expand Down
4 changes: 3 additions & 1 deletion artiq/firmware/ksupport/lib.rs
Expand Up @@ -489,9 +489,11 @@ extern fn subkernel_await_finish(id: u32, timeout: u64) {
}

#[unwind(aborts)]
extern fn subkernel_send_message(id: u32, count: u8, tag: &CSlice<u8>, data: *const *const ()) {
extern fn subkernel_send_message(id: u32, is_return: bool, destination: u8,
count: u8, tag: &CSlice<u8>, data: *const *const ()) {
send(&SubkernelMsgSend {
id: id,
destination: if is_return { None } else { Some(destination) },
count: count,
tag: tag.as_ref(),
data: data
Expand Down
2 changes: 1 addition & 1 deletion artiq/firmware/libproto_artiq/kernel_proto.rs
Expand Up @@ -107,7 +107,7 @@ pub enum Message<'a> {
SubkernelLoadRunReply { succeeded: bool },
SubkernelAwaitFinishRequest { id: u32, timeout: u64 },
SubkernelAwaitFinishReply { status: SubkernelStatus },
SubkernelMsgSend { id: u32, count: u8, tag: &'a [u8], data: *const *const () },
SubkernelMsgSend { id: u32, destination: Option<u8>, count: u8, tag: &'a [u8], data: *const *const () },
SubkernelMsgRecvRequest { id: u32, timeout: u64, tags: &'a [u8] },
SubkernelMsgRecvReply { status: SubkernelStatus, count: u8 },

Expand Down
2 changes: 1 addition & 1 deletion artiq/firmware/runtime/session.rs
Expand Up @@ -701,7 +701,7 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex,
kern_send(io, &kern::SubkernelAwaitFinishReply { status: status })
}
#[cfg(has_drtio)]
&kern::SubkernelMsgSend { id, count, tag, data } => {
&kern::SubkernelMsgSend { id, destination: _, count, tag, data } => {
subkernel::message_send(io, aux_mutex, _subkernel_mutex, routing_table, id, count, tag, data)?;
kern_acknowledge()
}
Expand Down
72 changes: 35 additions & 37 deletions artiq/firmware/satman/kernel.rs
Expand Up @@ -107,7 +107,8 @@ macro_rules! unexpected {
#[derive(Debug)]
pub struct Sliceable {
it: usize,
data: Vec<u8>
data: Vec<u8>,
destination: u8
}

/* represents interkernel messages */
Expand All @@ -119,7 +120,6 @@ struct Message {
#[derive(PartialEq)]
enum OutMessageState {
NoMessage,
MessageReady,
MessageBeingSent,
MessageSent,
MessageAcknowledged
Expand Down Expand Up @@ -165,6 +165,7 @@ pub struct SubkernelFinished {
}

pub struct SliceMeta {
pub destination: u8,
pub len: u16,
pub status: PayloadStatus
}
Expand All @@ -180,6 +181,7 @@ macro_rules! get_slice_fn {
self.it += len;

SliceMeta {
destination: self.destination,
len: len as u16,
status: status
}
Expand All @@ -188,10 +190,11 @@ macro_rules! get_slice_fn {
}

impl Sliceable {
pub fn new(data: Vec<u8>) -> Sliceable {
pub fn new(destination: u8, data: Vec<u8>) -> Sliceable {
Sliceable {
it: 0,
data: data
data: data,
destination: destination
}
}

Expand Down Expand Up @@ -230,17 +233,6 @@ impl MessageManager {
}
}

pub fn is_outgoing_ready(&mut self) -> bool {
// called by main loop, to see if there's anything to send, will send it afterwards
match self.out_state {
OutMessageState::MessageReady => {
self.out_state = OutMessageState::MessageBeingSent;
true
},
_ => false
}
}

pub fn was_message_acknowledged(&mut self) -> bool {
match self.out_state {
OutMessageState::MessageAcknowledged => {
Expand Down Expand Up @@ -280,14 +272,27 @@ impl MessageManager {
}
}

pub fn accept_outgoing(&mut self, count: u8, tag: &[u8], data: *const *const ()) -> Result<(), Error> {
pub fn accept_outgoing(&mut self, id: u32, self_destination: u8, destination: u8,
count: u8, tag: &[u8], data: *const *const (),
routing_table: &RoutingTable, rank: u8, router: &mut Router
) -> Result<(), Error> {
let mut writer = Cursor::new(Vec::new());
rpc::send_args(&mut writer, 0, tag, data, false)?;
// skip service tag, but write the count
let mut data = writer.into_inner().split_off(3);
data[0] = count;
self.out_message = Some(Sliceable::new(data));
self.out_state = OutMessageState::MessageReady;
self.out_message = Some(Sliceable::new(destination, data));

let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
self.out_state = OutMessageState::MessageBeingSent;
let meta = self.get_outgoing_slice(&mut data_slice).unwrap();
let res = router.route(drtioaux::Packet::SubkernelMessage {
source: self_destination, destination: destination, id: id,
status: meta.status, length: meta.len as u16, data: data_slice
}, routing_table, rank, self_destination);
if let Err(e) = res {
warn!("error sending SubkernelMessage: {}", e);
}
Ok(())
}

Expand Down Expand Up @@ -390,7 +395,6 @@ impl Manager {
info!("starting subkernel #{}", id);
if self.session.kernel_state != KernelState::Loaded
|| self.current_id != id {
info!("re-loading");
self.load(id)?;
}
self.session.source = source;
Expand Down Expand Up @@ -459,7 +463,7 @@ impl Manager {
pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta {
match self.session.last_exception.as_mut() {
Some(exception) => exception.get_slice_sat(data_slice),
None => SliceMeta { len: 0, status: PayloadStatus::FirstAndLast }
None => SliceMeta { destination: 0, len: 0, status: PayloadStatus::FirstAndLast }
}
}

Expand All @@ -484,7 +488,7 @@ impl Manager {
backtrace: &[],
async_errors: 0
}).write_to(&mut writer) {
Ok(_) => self.session.last_exception = Some(Sliceable::new(writer.into_inner())),
Ok(_) => self.session.last_exception = Some(Sliceable::new(0, writer.into_inner())),
Err(_) => error!("Error writing exception data")
}
}
Expand Down Expand Up @@ -541,18 +545,6 @@ impl Manager {
warn!("error sending SubkernelFinished: {}", e);
}
}

if self.session.messages.is_outgoing_ready() {
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
let meta = self.message_get_slice(&mut data_slice).unwrap();
let res = router.route(drtioaux::Packet::SubkernelMessage {
source: destination, destination: self.session.source, id: self.current_id,
status: meta.status, length: meta.len as u16, data: data_slice
}, &routing_table, rank, destination);
if let Err(e) = res {
warn!("error sending SubkernelMessage: {}", e);
}
}
}

fn process_external_messages(&mut self) -> Result<(), Error> {
Expand Down Expand Up @@ -707,8 +699,14 @@ impl Manager {
return Ok(Some(true))
}

&kern::SubkernelMsgSend { id: _, count, tag, data } => {
self.session.messages.accept_outgoing(count, tag, data)?;
&kern::SubkernelMsgSend { id: _, destination: msg_dest, count, tag, data } => {
let dest = match msg_dest {
Some(dest) => dest,
None => self.session.source
};
self.session.messages.accept_outgoing(self.current_id, destination,
dest, count, tag, data,
routing_table, rank, router)?;
// acknowledge after the message is sent
self.session.kernel_state = KernelState::MsgSending;
Ok(())
Expand All @@ -731,7 +729,7 @@ impl Manager {
&kern::SubkernelAwaitFinishRequest{ id, timeout } => {
let max_time = clock::get_ms() + timeout as u64;
self.session.kernel_state = KernelState::SubkernelAwaitFinish { max_time: max_time, id: id };
Ok(())
kern_acknowledge()
}

request => unexpected!("unexpected request {:?} from kernel CPU", request)
Expand Down Expand Up @@ -807,7 +805,7 @@ fn slice_kernel_exception(exceptions: &[Option<eh_artiq::Exception>],
async_errors: 0
}).write_to(&mut writer) {
// save last exception data to be received by master
Ok(_) => Ok(Sliceable::new(writer.into_inner())),
Ok(_) => Ok(Sliceable::new(0, writer.into_inner())),
Err(_) => Err(Error::SubkernelIoError)
}
}
Expand Down
3 changes: 1 addition & 2 deletions artiq/firmware/satman/main.rs
Expand Up @@ -414,7 +414,6 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg
succeeded |= kernelmgr.run(source, id).is_ok();
}
}
info!("succ: {}, routing", succeeded);
router.send(drtioaux::Packet::SubkernelLoadRunReply {
destination: source, succeeded: succeeded
},
Expand Down Expand Up @@ -455,7 +454,7 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
if let Some(meta) = kernelmgr.message_get_slice(&mut data_slice) {
router.send(drtioaux::Packet::SubkernelMessage {
source: *self_destination, destination: 0, id: kernelmgr.get_current_id().unwrap(),
source: *self_destination, destination: meta.destination, id: kernelmgr.get_current_id().unwrap(),
status: meta.status, length: meta.len as u16, data: data_slice
}, _routing_table, *rank, *self_destination)?;
} else {
Expand Down
2 changes: 1 addition & 1 deletion artiq/test/lit/embedding/subkernel_no_arg.py
Expand Up @@ -12,7 +12,7 @@ def entrypoint():


# CHECK-L: declare void @subkernel_load_run(i32, i8, i1) local_unnamed_addr
# CHECK-NOT-L: declare void @subkernel_send_message(i32, { i8*, i32 }*, i8**) local_unnamed_addr
# CHECK-NOT-L: declare void @subkernel_send_message(i32, i1, i8, { i8*, i32 }*, i8**) local_unnamed_addr
@subkernel(destination=1)
def no_arg() -> TStr:
pass
2 changes: 1 addition & 1 deletion artiq/test/lit/embedding/subkernel_return.py
Expand Up @@ -14,7 +14,7 @@ def entrypoint():
subkernel_await(returning)

# CHECK-L: declare void @subkernel_load_run(i32, i8, i1) local_unnamed_addr
# CHECK-NOT-L: declare void @subkernel_send_message(i32, i8, { i8*, i32 }*, i8**) local_unnamed_addr
# CHECK-NOT-L: declare void @subkernel_send_message(i32, i1, i8, i8, { i8*, i32 }*, i8**) local_unnamed_addr
# CHECK-L: declare i8 @subkernel_await_message(i32, i64, { i8*, i32 }*, i8, i8) local_unnamed_addr
# CHECK-L: declare void @subkernel_await_finish(i32, i64) local_unnamed_addr
@subkernel(destination=1)
Expand Down
2 changes: 1 addition & 1 deletion artiq/test/lit/embedding/subkernel_return_none.py
Expand Up @@ -14,7 +14,7 @@ def entrypoint():
subkernel_await(returning_none)

# CHECK-L: declare void @subkernel_load_run(i32, i8, i1) local_unnamed_addr
# CHECK-NOT-L: declare void @subkernel_send_message(i32, { i8*, i32 }*, i8**) local_unnamed_addr
# CHECK-NOT-L: declare void @subkernel_send_message(i32, i1, i8, { i8*, i32 }*, i8**) local_unnamed_addr
# CHECK-L: declare void @subkernel_await_finish(i32, i64) local_unnamed_addr
# CHECK-NOT-L: declare i8 @subkernel_await_message(i32, i64, { i8*, i32 }*, i8, i8) local_unnamed_addr
@subkernel(destination=1)
Expand Down
2 changes: 1 addition & 1 deletion artiq/test/lit/embedding/subkernel_self.py
Expand Up @@ -22,4 +22,4 @@ def entrypoint():
a.kernel_entrypoint()

# CHECK-L: declare void @subkernel_load_run(i32, i8, i1) local_unnamed_addr
# CHECK-NOT-L: declare void @subkernel_send_message(i32, i8, { i8*, i32 }*, i8**) local_unnamed_addr
# CHECK-NOT-L: declare void @subkernel_send_message(i32, i1, i8, i8, { i8*, i32 }*, i8**) local_unnamed_addr
4 changes: 2 additions & 2 deletions artiq/test/lit/embedding/subkernel_self_args.py
Expand Up @@ -12,7 +12,7 @@ def sk(self, a):
@kernel
def kernel_entrypoint(self):
# CHECK: call void @subkernel_load_run\(i32 1, i8 1, i1 true\), !dbg !.
# CHECK: call void @subkernel_send_message\(i32 1, i8 1, .*\), !dbg !.
# CHECK: call void @subkernel_send_message\(i32 1, i1 false, i8 1, i8 1, .*\), !dbg !.
self.sk(1)

a = A()
Expand All @@ -22,4 +22,4 @@ def entrypoint():
a.kernel_entrypoint()

# CHECK-L: declare void @subkernel_load_run(i32, i8, i1) local_unnamed_addr
# CHECK-L: declare void @subkernel_send_message(i32, i8, { i8*, i32 }*, i8**) local_unnamed_addr
# CHECK-L: declare void @subkernel_send_message(i32, i1, i8, i8, { i8*, i32 }*, i8**) local_unnamed_addr
4 changes: 2 additions & 2 deletions artiq/test/lit/embedding/subkernel_with_arg.py
Expand Up @@ -7,12 +7,12 @@
@kernel
def entrypoint():
# CHECK: call void @subkernel_load_run\(i32 1, i8 1, i1 true\), !dbg !.
# CHECK: call void @subkernel_send_message\(i32 ., i8 1, .*\), !dbg !.
# CHECK: call void @subkernel_send_message\(i32 ., i1 false, i8 1, i8 1, .*\), !dbg !.
accept_arg(1)


# CHECK-L: declare void @subkernel_load_run(i32, i8, i1) local_unnamed_addr
# CHECK-L: declare void @subkernel_send_message(i32, i8, { i8*, i32 }*, i8**) local_unnamed_addr
# CHECK-L: declare void @subkernel_send_message(i32, i1, i8, i8, { i8*, i32 }*, i8**) local_unnamed_addr
@subkernel(destination=1)
def accept_arg(arg: TInt32) -> TNone:
pass
6 changes: 3 additions & 3 deletions artiq/test/lit/embedding/subkernel_with_opt_arg.py
Expand Up @@ -7,15 +7,15 @@
@kernel
def entrypoint():
# CHECK: call void @subkernel_load_run\(i32 1, i8 1, i1 true\), !dbg !.
# CHECK: call void @subkernel_send_message\(i32 ., i8 1, .*\), !dbg !.
# CHECK: call void @subkernel_send_message\(i32 ., i1 false, i8 1, i8 1, .*\), !dbg !.
accept_arg(1)
# CHECK: call void @subkernel_load_run\(i32 1, i8 1, i1 true\), !dbg !.
# CHECK: call void @subkernel_send_message\(i32 ., i8 2, .*\), !dbg !.
# CHECK: call void @subkernel_send_message\(i32 ., i1 false, i8 1, i8 2, .*\), !dbg !.
accept_arg(1, 2)


# CHECK-L: declare void @subkernel_load_run(i32, i8, i1) local_unnamed_addr
# CHECK-L: declare void @subkernel_send_message(i32, i8, { i8*, i32 }*, i8**) local_unnamed_addr
# CHECK-L: declare void @subkernel_send_message(i32, i1, i8, i8, { i8*, i32 }*, i8**) local_unnamed_addr
@subkernel(destination=1)
def accept_arg(arg_a, arg_b=5) -> TNone:
pass

0 comments on commit d6566c1

Please sign in to comment.