Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply fixes for #114 liveness collisions #115

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions application/lib/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ type RegistrationManager struct {

func NewRegistrationManager() *RegistrationManager {
logger := log.New(os.Stdout, "[REG] ", log.Ldate|log.Lmicroseconds)
var ult *lt.UncachedLivenessTester
ult = new(lt.UncachedLivenessTester)
var ult *lt.UncachedLivenessTester = new(lt.UncachedLivenessTester)
p, err := NewPhantomIPSelector()
if err != nil {
// fmt.Errorf("failed to create the PhantomIPSelector object: %v", err)
Expand Down
17 changes: 15 additions & 2 deletions application/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,20 @@ func get_zmq_updates(connectAddr string, regManager *cj.RegistrationManager, con
}

if !reg.PreScanned() {

// Do not run a liveness scan for a registration for ip
// versions that are not enabled in config.
if reg.DarkDecoy.To4() != nil && !conf.EnableIPv4 {
continue
} else if reg.DarkDecoy.To4() == nil && !conf.EnableIPv6 {
continue
}

// New registration received over channel that requires liveness scan for the phantom
liveness, response := regManager.PhantomIsLive(reg.DarkDecoy.String(), 443)

if liveness == true {
logger.Printf("Dropping registration %v -- live phantom: %v\n", reg.IDString(), response)
if liveness {
logger.Printf("Dropping registration %v -- live phantom (%s): %v\n", reg.IDString(), reg.DarkDecoy, response)
if response.Error() == lt.CACHED_PHANTOM_MSG {
cj.Stat().AddLivenessCached()
}
Expand Down Expand Up @@ -393,6 +402,10 @@ func main() {
flag.Parse()

regManager := cj.NewRegistrationManager()
if regManager == nil {
fmt.Println("error occurred while setting up registration manager")
os.Exit(1)
}
logger = regManager.Logger

// Should we log client IP addresses
Expand Down
53 changes: 26 additions & 27 deletions src/process_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,22 +207,17 @@ impl PerCoreGlobal {
let dd_flow = FlowNoSrcPort::from_flow(&flow);
if self.flow_tracker.is_phantom_session(&dd_flow) {
// Handle packet destined for registered IP
match self.filter_station_traffic(flow.src_ip.to_string()) {
// traffic was sent by another station, likely liveness testing.
None => {}

if !self.is_station_traffic(flow.src_ip.to_string()) {
// Non station traffic, forward to application to handle
Some(_) => {
if (tcp_flags & TcpFlags::SYN) != 0 && (tcp_flags & TcpFlags::ACK) == 0 {
debug!("Connection for registered Phantom {}", flow);
}
// Update expire time if necessary
self.flow_tracker.update_phantom_flow(&dd_flow);
// Forward packet...
self.forward_pkt(&ip_pkt);
// TODO: if it was RST or FIN, close things
return;
if (tcp_flags & TcpFlags::SYN) != 0 && (tcp_flags & TcpFlags::ACK) == 0 {
debug!("Connection for registered Phantom {}", flow);
}
// Update expire time if necessary
self.flow_tracker.update_phantom_flow(&dd_flow);
// Forward packet...
self.forward_pkt(&ip_pkt);
// TODO: if it was RST or FIN, close things
return;
}
}

Expand Down Expand Up @@ -354,23 +349,23 @@ impl PerCoreGlobal {
/// # Examples
///
/// ```compile_fail
/// let flow_src_station = String::from("192.122.200.231");
/// let flow_src_client = String::from("128.138.89.172");
/// # extern crate rust_dark_decoy;
/// # use rust_dark_decoy::PerCoreGlobal;
/// # fn main() {
/// # ::std::env::set_var("CJ_STATION_CONFIG", "./application/config.toml");
/// let s = crate::PerCoreGlobal{};
/// let flow_src_station = String::from("10.0.0.1");
/// let flow_src_client = String::from("172.16.0.1");
///
/// let station = filter_station_traffic(flow_src_station);
/// let client = filter_station_traffic(flow_src_client);
/// let station = s.is_station_traffic(flow_src_station);
/// let client = s.is_station_traffic(flow_src_client);
///
/// assert_eq!(None, station);
/// assert_eq!(Some(()), client);
/// # }
/// ```
fn filter_station_traffic(&mut self, src: String) -> Option<()> {
for addr in self.filter_list.iter() {
if src == *addr {
return None;
}
}

Some(())
fn is_station_traffic(&mut self, src: String) -> bool {
self.filter_list.contains(&src)
}
} // impl PerCoreGlobal

Expand All @@ -382,7 +377,7 @@ mod tests {
use StationConfig;

#[test]
fn test_filter_station_traffic() {
fn test_is_station_traffic() {
env::set_var("CJ_STATION_CONFIG", "./application/config.toml");

// --
Expand All @@ -399,5 +394,9 @@ mod tests {
for net in nets.iter() {
println!("{}", net);
}

assert!(nets.contains(&String::from("127.0.0.1")));
assert!(nets.contains(&String::from("::1")));
assert!(!nets.contains(&String::from("127.0.0.2")))
}
}
11 changes: 2 additions & 9 deletions src/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,9 @@ impl fmt::Display for SessionDetails {
true => write!(
f,
"{} -> {} ({}ns)",
self.client_ip.to_string(),
self.phantom_ip.to_string(),
self.timeout
),
false => write!(
f,
"_ -> {} ({}ns)",
self.phantom_ip.to_string(),
self.timeout
self.client_ip, self.phantom_ip, self.timeout
),
false => write!(f, "_ -> {} ({}ns)", self.phantom_ip, self.timeout),
}
}
}
Expand Down