Skip to content
This repository has been archived by the owner on Aug 19, 2018. It is now read-only.

add service registry events allowing to listen on service references #237

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
6b3330d
add service registry events allowing to listen on service references
ronenhamias Aug 6, 2018
fc05b3b
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 6, 2018
df08470
update remove events sink
ronenhamias Aug 6, 2018
1c6d0ad
update remove
ronenhamias Aug 6, 2018
9017494
Merge branch 'support_service_registry_events' of git@github.com:scal…
ronenhamias Aug 6, 2018
017084e
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 8, 2018
cca793a
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 9, 2018
ee54a5a
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 10, 2018
8b8572a
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 13, 2018
92518e0
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 14, 2018
f10e520
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 14, 2018
c9fac14
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 15, 2018
e295549
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 15, 2018
f19ba4f
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 15, 2018
3274f21
fix code review comments
ronenhamias Aug 16, 2018
701f58d
Merge branch 'develop' into support_service_registry_events
artem-v Aug 16, 2018
1cf25b7
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 17, 2018
456423a
fix checkstyle
ronenhamias Aug 17, 2018
13d1285
Merge branch 'develop' into support_service_registry_events
ronenhamias Aug 18, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.scalecube.services.registry.api;

import io.scalecube.services.ServiceReference;
import io.scalecube.transport.Address;

public class RegistryEvent {

public enum Type {
ADDED, REMOVED;
}

private ServiceReference serviceReference;
private Type type;

public RegistryEvent(Type type, ServiceReference serviceReference) {
this.serviceReference = serviceReference;
this.type = type;
}

public RegistryEvent(RegistryEvent e) {
this.serviceReference = e.serviceReference;
this.type = e.type;
}

public ServiceReference serviceReference() {
return this.serviceReference;
}

public boolean isAdded() {
return Type.ADDED.equals(type);
}

public boolean isRemoved() {
return Type.REMOVED.equals(type);
}

public Type type() {
return this.type;
}

public Address address() {
return Address.create(this.serviceReference.host(), this.serviceReference.port());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.scalecube.services.ServiceReference;
import io.scalecube.services.api.ServiceMessage;
import java.util.List;
import reactor.core.publisher.Flux;

/**
* Service registry interface provides API to register/unregister services in the system and make
Expand All @@ -20,4 +21,6 @@ public interface ServiceRegistry {
boolean registerService(ServiceEndpoint serviceEndpoint);

ServiceEndpoint unregisterService(String endpointId);

Flux<RegistryEvent> listen();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io.scalecube.services.ServiceEndpoint;
import io.scalecube.services.ServiceReference;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.registry.api.RegistryEvent;
import io.scalecube.services.registry.api.RegistryEvent.Type;
import io.scalecube.services.registry.api.ServiceRegistry;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -13,6 +15,10 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jctools.maps.NonBlockingHashMap;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;

public class ServiceRegistryImpl implements ServiceRegistry {

Expand All @@ -21,6 +27,11 @@ public class ServiceRegistryImpl implements ServiceRegistry {
private final Map<String, List<ServiceReference>> referencesByQualifier =
new NonBlockingHashMap<>();

private final FluxProcessor<RegistryEvent, RegistryEvent> events =
DirectProcessor.<RegistryEvent>create();

private final FluxSink<RegistryEvent> sink = events.serialize().sink();

@Override
public List<ServiceEndpoint> listServiceEndpoints() {
// todo how to collect tags correctly?
Expand Down Expand Up @@ -52,18 +63,14 @@ public boolean registerService(ServiceEndpoint serviceEndpoint) {
serviceEndpoint
.serviceRegistrations()
.stream()
.flatMap(
serviceRegistration ->
serviceRegistration
.methods()
.stream()
.map(sm -> new ServiceReference(sm, serviceRegistration, serviceEndpoint)))
.forEach(
serviceReference ->
referencesByQualifier
.computeIfAbsent(
serviceReference.qualifier(), key -> new CopyOnWriteArrayList<>())
.add(serviceReference));
.flatMap(serviceRegistration -> serviceRegistration.methods().stream()
.map(sm -> new ServiceReference(sm, serviceRegistration, serviceEndpoint)))
.forEach(serviceReference -> {
referencesByQualifier
.computeIfAbsent(serviceReference.qualifier(), key -> new CopyOnWriteArrayList<>())
.add(serviceReference);
sink.next(new RegistryEvent(Type.ADDED, serviceReference));
});
}
return success;
}
Expand All @@ -72,14 +79,29 @@ public boolean registerService(ServiceEndpoint serviceEndpoint) {
public ServiceEndpoint unregisterService(String endpointId) {
ServiceEndpoint serviceEndpoint = serviceEndpoints.remove(endpointId);
if (serviceEndpoint != null) {
referencesByQualifier
.values()
.forEach(list -> list.removeIf(sr -> sr.endpointId().equals(endpointId)));
referencesByQualifier.values()
.forEach(list -> {
list.stream().filter(sr -> sr.endpointId().equals(endpointId))
.forEach(sr -> {
list.remove(sr);
sink.next(new RegistryEvent(Type.REMOVED, sr));
});
});
}

return serviceEndpoint;
}

Stream<ServiceReference> serviceReferenceStream() {
return referencesByQualifier.values().stream().flatMap(Collection::stream);
}

/**
* listen on service registry events.
*/
public Flux<RegistryEvent> listen() {
return Flux.fromIterable(referencesByQualifier.values()).flatMap(Flux::fromIterable)
.map(sr -> new RegistryEvent(Type.ADDED, sr))
.concatWith(events);
}
}