Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
2125: Reorganised the dependencies to combat a race condition in amethyst network r=azriel91 a=Machine-Hum

## Description

Fixed race condition by reorg'ing dependency order. In addition, networking examples can now sent / receive from the client end, before the client could just send data. This makes them more comprehensive and easier for beginners to get going.

## PR Checklist

By placing an x in the boxes I certify that I have:

- [NA] Updated the content of the book if this PR would make the book outdated.
- [NA] Added a changelog entry if this will impact users, or modified more than 5 lines of Rust that wasn't a doc comment.
- [NA] Added unit tests for new code added in this PR.
- [x] Acknowledged that by making this pull request I release this code under an MIT/Apache 2.0 dual licensing scheme.

If this modified or created any rs files:

- [x] Ran `cargo +stable fmt --all`
- [x] Ran `cargo clippy --all --features "empty"`
- [x] Ran `cargo test --all --features "empty"`


Co-authored-by: machine-hum <machinehum@wlkr.wlker>
  • Loading branch information
bors[bot] and machine-hum committed Feb 5, 2020
2 parents 6d1e537 + 66cf064 commit debca21
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 24 deletions.
36 changes: 28 additions & 8 deletions amethyst_network/src/simulation/transport/tcp.rs
Expand Up @@ -49,26 +49,46 @@ impl<'a, 'b> SystemBundle<'a, 'b> for TcpNetworkBundle {
world: &mut World,
builder: &mut DispatcherBuilder<'_, '_>,
) -> Result<(), Error> {
builder.add(TcpNetworkSendSystem, NETWORK_SEND_SYSTEM_NAME, &[]);
builder.add(TcpNetworkRecvSystem, NETWORK_RECV_SYSTEM_NAME, &[]);
// NetworkSimulationTime should run first
// followed by TcpConnectionListenerSystem and TcpStreamManagementSystem
// then TcpNetworkSendSystem and TcpNetworkRecvSystem

builder.add(
TcpStreamManagementSystem,
STREAM_MANAGEMENT_SYSTEM_NAME,
&[NETWORK_SEND_SYSTEM_NAME, NETWORK_RECV_SYSTEM_NAME],
NetworkSimulationTimeSystem,
NETWORK_SIM_TIME_SYSTEM_NAME,
&[],
);

builder.add(
TcpConnectionListenerSystem,
CONNECTION_LISTENER_SYSTEM_NAME,
&[NETWORK_SEND_SYSTEM_NAME, NETWORK_RECV_SYSTEM_NAME],
&[NETWORK_SIM_TIME_SYSTEM_NAME],
);

builder.add(
NetworkSimulationTimeSystem,
NETWORK_SIM_TIME_SYSTEM_NAME,
TcpStreamManagementSystem,
STREAM_MANAGEMENT_SYSTEM_NAME,
&[NETWORK_SIM_TIME_SYSTEM_NAME],
);

builder.add(
TcpNetworkSendSystem,
NETWORK_SEND_SYSTEM_NAME,
&[
STREAM_MANAGEMENT_SYSTEM_NAME,
CONNECTION_LISTENER_SYSTEM_NAME,
],
);

builder.add(
TcpNetworkRecvSystem,
NETWORK_RECV_SYSTEM_NAME,
&[
STREAM_MANAGEMENT_SYSTEM_NAME,
CONNECTION_LISTENER_SYSTEM_NAME,
],
);

world.insert(TcpNetworkResource::new(
self.listener,
self.recv_buffer_size_bytes,
Expand Down
80 changes: 64 additions & 16 deletions examples/net_client/main.rs
@@ -1,13 +1,18 @@
// CLIENT
use std::time::Duration;

use amethyst::{
core::{frame_limiter::FrameRateLimitStrategy, Time},
ecs::{Read, System, Write},
network::simulation::{tcp::TcpNetworkBundle, NetworkSimulationTime, TransportResource},
core::{bundle::SystemBundle, frame_limiter::FrameRateLimitStrategy, SystemDesc, Time},
ecs::{DispatcherBuilder, Read, System, SystemData, World, Write},
network::simulation::{
tcp::TcpNetworkBundle, NetworkSimulationEvent, NetworkSimulationTime, TransportResource,
},
prelude::*,
shrev::{EventChannel, ReaderId},
utils::application_root_dir,
Result,
};
use log::info;
use std::time::Duration;
use log::{error, info};

fn main() -> Result<()> {
amethyst::start_logger(Default::default());
Expand All @@ -30,7 +35,8 @@ fn main() -> Result<()> {
.with_bundle(TcpNetworkBundle::new(None, 2048))?
// // Laminar
// .with_bundle(LaminarNetworkBundle::new(Some(socket)))?
.with(SpamSystem::new(), "spam", &[]);
.with_bundle(SpamBundle)?;

let mut game = Application::build(assets_dir, GameState)?
.with_frame_limit(
FrameRateLimitStrategy::SleepAndYield(Duration::from_millis(2)),
Expand All @@ -40,17 +46,47 @@ fn main() -> Result<()> {
game.run();
Ok(())
}

/// Default empty state
pub struct GameState;
impl SimpleState for GameState {}

/// A simple system that sends a ton of messages to all connections.
/// In this case, only the server is connected.
struct SpamSystem;
#[derive(Debug)]
struct SpamBundle;

impl<'a, 'b> SystemBundle<'a, 'b> for SpamBundle {
fn build(self, world: &mut World, builder: &mut DispatcherBuilder<'a, 'b>) -> Result<()> {
builder.add(SpamSystemDesc::default().build(world), "spam_system", &[]);
Ok(())
}
}

#[derive(Default, Debug)]
pub struct SpamSystemDesc;

impl<'a, 'b> SystemDesc<'a, 'b, SpamSystem> for SpamSystemDesc {
fn build(self, world: &mut World) -> SpamSystem {
// Creates the EventChannel<NetworkEvent> managed by the ECS.
<SpamSystem as System<'_>>::SystemData::setup(world);
// Fetch the change we just created and call `register_reader` to get a
// ReaderId<NetworkEvent>. This reader id is used to fetch new events from the network event
// channel.
let reader = world
.fetch_mut::<EventChannel<NetworkSimulationEvent>>()
.register_reader();

SpamSystem::new(reader)
}
}

/// A simple system that receives a ton of network events.
struct SpamSystem {
reader: ReaderId<NetworkSimulationEvent>,
}

impl SpamSystem {
pub fn new() -> Self {
SpamSystem {}
pub fn new(reader: ReaderId<NetworkSimulationEvent>) -> Self {
Self { reader }
}
}

Expand All @@ -59,12 +95,9 @@ impl<'a> System<'a> for SpamSystem {
Read<'a, NetworkSimulationTime>,
Read<'a, Time>,
Write<'a, TransportResource>,
Read<'a, EventChannel<NetworkSimulationEvent>>,
);
fn run(&mut self, (sim_time, time, mut net): Self::SystemData) {
// Use method `sim_time.sim_frames_to_run()` to determine if the system should send a
// message this frame. If, for example, the ECS frame rate is slower than the simulation
// frame rate, this code block will run until it catches up with the expected simulation
// frame number.
fn run(&mut self, (sim_time, time, mut net, event /*, tx*/): Self::SystemData) {
let server_addr = "127.0.0.1:3457".parse().unwrap();
for frame in sim_time.sim_frames_to_run() {
info!("Sending message for sim frame {}.", frame);
Expand All @@ -75,5 +108,20 @@ impl<'a> System<'a> for SpamSystem {
);
net.send(server_addr, payload.as_bytes());
}

for event in event.read(&mut self.reader) {
match event {
NetworkSimulationEvent::Message(_addr, payload) => info!("Payload: {:?}", payload),
NetworkSimulationEvent::Connect(addr) => info!("New client connection: {}", addr),
NetworkSimulationEvent::Disconnect(addr) => info!("Server Disconnected: {}", addr),
NetworkSimulationEvent::RecvError(e) => {
error!("Recv Error: {:?}", e);
}
NetworkSimulationEvent::SendError(e, msg) => {
error!("Send Error: {:?}, {:?}", e, msg);
}
_ => {}
}
}
}
}
2 changes: 2 additions & 0 deletions examples/net_server/main.rs
@@ -1,3 +1,4 @@
// SERVER
use std::time::Duration;

use amethyst::{
Expand Down Expand Up @@ -36,6 +37,7 @@ fn main() -> Result<()> {
// // Laminar
// .with_bundle(LaminarNetworkBundle::new(Some(socket)))?
.with_bundle(SpamReceiveBundle)?;

let mut game = Application::build(assets_dir, GameState)?
.with_frame_limit(
FrameRateLimitStrategy::SleepAndYield(Duration::from_millis(2)),
Expand Down

0 comments on commit debca21

Please sign in to comment.