Skip to content

Commit

Permalink
Merge pull request #290 from scalecube/feature/issue-283-cluster-code…
Browse files Browse the repository at this point in the history
…c-artifact-CR2arvy

[CR287] Feature/issue 283 cluster codec artifact
  • Loading branch information
artem-v committed Jan 6, 2020
2 parents 69b9fd6 + 13aa047 commit f5c4505
Show file tree
Hide file tree
Showing 19 changed files with 401 additions and 77 deletions.
38 changes: 35 additions & 3 deletions cluster-api/src/main/java/io/scalecube/cluster/Member.java
@@ -1,21 +1,26 @@
package io.scalecube.cluster;

import io.scalecube.net.Address;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Objects;
import java.util.UUID;

/**
* Cluster member which represents node in the cluster and contains its id and address. This class
* is essentially immutable.
*/
public final class Member {
public final class Member implements Externalizable {

private static final long serialVersionUID = 1L;

private String id;
private String alias;
private Address address;

/** Instantiates empty member for deserialization purpose. */
Member() {}
public Member() {}

/**
* Constructor.
Expand Down Expand Up @@ -63,6 +68,33 @@ public int hashCode() {
return Objects.hash(id, address);
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
// id
out.writeUTF(id);
// alias
boolean aliasNotNull = alias != null;
out.writeBoolean(aliasNotNull);
if (aliasNotNull) {
out.writeUTF(alias);
}
// address
out.writeUTF(address.toString());
}

@Override
public void readExternal(ObjectInput in) throws IOException {
// id
id = in.readUTF();
// alias
boolean aliasNotNull = in.readBoolean();
if (aliasNotNull) {
alias = in.readUTF();
}
// address
address = Address.from(in.readUTF());
}

@Override
public String toString() {
if (alias == null) {
Expand Down
@@ -0,0 +1,33 @@
package io.scalecube.cluster.metadata;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import reactor.core.Exceptions;

public class DefaultMetadataCodec implements MetadataCodec {

@Override
public Object deserialize(ByteBuffer buffer) {
byte[] bytes = buffer.array();
try (ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
return is.readObject();
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}

@Override
public ByteBuffer serialize(Object metadata) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream os = new ObjectOutputStream(baos)) {
os.writeObject(metadata);
os.flush();
return ByteBuffer.wrap(baos.toByteArray());
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
}
Expand Up @@ -6,7 +6,8 @@
/** Contains methods for metadata serializing/deserializing logic. */
public interface MetadataCodec {

MetadataCodec INSTANCE = ServiceLoaderUtil.findFirst(MetadataCodec.class).orElse(null);
MetadataCodec INSTANCE =
ServiceLoaderUtil.findFirst(MetadataCodec.class).orElseGet(DefaultMetadataCodec::new);

/**
* Deserializes metadata from buffer.
Expand Down
5 changes: 0 additions & 5 deletions cluster-testlib/pom.xml
Expand Up @@ -22,11 +22,6 @@
<artifactId>scalecube-cluster-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-codec-jackson</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
6 changes: 0 additions & 6 deletions cluster/pom.xml
Expand Up @@ -51,12 +51,6 @@
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-codec-jackson</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
41 changes: 21 additions & 20 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Expand Up @@ -20,6 +20,8 @@
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.TransportImpl;
import io.scalecube.utils.ServiceLoaderUtil;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.Collection;
Expand Down Expand Up @@ -225,11 +227,7 @@ public Cluster startAwait() {
}

private Mono<Cluster> doStart() {
return Mono.defer(
() -> {
validateConfiguration();
return doStart0();
});
return Mono.fromRunnable(this::validateConfiguration).then(Mono.defer(this::doStart0));
}

private Mono<Cluster> doStart0() {
Expand Down Expand Up @@ -295,28 +293,31 @@ private Mono<Cluster> doStart0() {
}

private void validateConfiguration() {
MetadataDecoder metadataDecoder = config.metadataDecoder();
MetadataEncoder metadataEncoder = config.metadataEncoder();
MetadataCodec metadataCodec = config.metadataCodec();

if (metadataDecoder == null && metadataEncoder == null && metadataCodec == null) {
throw new IllegalArgumentException("Invalid cluster config");
}
final MetadataDecoder metadataDecoder = config.metadataDecoder();
final MetadataEncoder metadataEncoder = config.metadataEncoder();
final MetadataCodec metadataCodec =
ServiceLoaderUtil.findFirst(MetadataCodec.class).orElse(null);

if (metadataDecoder != null && metadataEncoder != null && metadataCodec != null) {
throw new IllegalArgumentException("Invalid cluster config");
throw new IllegalArgumentException(
"Invalid cluster config: either pair of [metadataDecoder, metadataEncoder] "
+ "or metadataCodec must be specified, not both");
}

if (metadataCodec == null) {
Objects.requireNonNull(
metadataDecoder, "Invalid cluster config: metadataDecoder must be specified");
Objects.requireNonNull(
metadataEncoder, "Invalid cluster config: metadataEncoder must be specified");
if ((metadataDecoder == null && metadataEncoder != null)
|| (metadataDecoder != null && metadataEncoder == null)) {
throw new IllegalArgumentException(
"Invalid cluster config: both of [metadataDecoder, metadataEncoder] must be specified");
}

if (metadataDecoder == null && metadataEncoder == null) {
Objects.requireNonNull(
metadataCodec, "Invalid cluster config: metadataCodec must be specified");
if (metadataCodec == null) {
Object metadata = config.metadata();
if (metadata != null && !(metadata instanceof Serializable)) {
throw new IllegalArgumentException(
"Invalid cluster config: metadata must be Serializable");
}
}
}

Objects.requireNonNull(
Expand Down
35 changes: 32 additions & 3 deletions cluster/src/main/java/io/scalecube/cluster/fdetector/PingData.java
@@ -1,10 +1,16 @@
package io.scalecube.cluster.fdetector;

import io.scalecube.cluster.Member;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.StringJoiner;

/** DTO class. Supports FailureDetector messages (Ping, Ack, PingReq). */
final class PingData {
final class PingData implements Externalizable {

private static final long serialVersionUID = 1L;

enum AckType {

Expand Down Expand Up @@ -32,8 +38,7 @@ enum AckType {
/** Ping response type. */
private AckType ackType;

/** Instantiates empty ping data for deserialization purpose. */
PingData() {}
public PingData() {}

private PingData(PingData other) {
this.from = other.from;
Expand Down Expand Up @@ -78,6 +83,30 @@ public PingData withAckType(AckType ackType) {
return p;
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
// from
out.writeObject(from);
// to
out.writeObject(to);
// originalIssuer
out.writeObject(originalIssuer);
// ackType
out.writeObject(ackType);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
// from
from = (Member) in.readObject();
// to
to = (Member) in.readObject();
// originalIssuer
originalIssuer = (Member) in.readObject();
// ackType
ackType = (AckType) in.readObject();
}

@Override
public String toString() {
return new StringJoiner(", ", PingData.class.getSimpleName() + "[", "]")
Expand Down
31 changes: 28 additions & 3 deletions cluster/src/main/java/io/scalecube/cluster/gossip/Gossip.java
@@ -1,19 +1,24 @@
package io.scalecube.cluster.gossip;

import io.scalecube.cluster.transport.api.Message;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Objects;
import java.util.StringJoiner;

/** Data model for gossip, include gossip id, qualifier and object need to disseminate. */
final class Gossip {
final class Gossip implements Externalizable {

private static final long serialVersionUID = 1L;

private String gossiperId;
private Message message;
// incremented counter
private long sequenceId;

/** Instantiates empty gossip for deserialization purpose. */
Gossip() {}
public Gossip() {}

public Gossip(String gossiperId, Message message, long sequenceId) {
this.gossiperId = Objects.requireNonNull(gossiperId);
Expand Down Expand Up @@ -56,6 +61,26 @@ public int hashCode() {
return Objects.hash(gossiperId, message, sequenceId);
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
// gossiperId
out.writeUTF(gossiperId);
// message
out.writeObject(message);
// sequenceId
out.writeLong(sequenceId);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
// gossiperId
gossiperId = in.readUTF();
// message
message = (Message) in.readObject();
// sequenceId
sequenceId = in.readLong();
}

@Override
public String toString() {
return new StringJoiner(", ", Gossip.class.getSimpleName() + "[", "]")
Expand Down
@@ -1,24 +1,32 @@
package io.scalecube.cluster.gossip;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;

/** Gossip request which be transmitted through the network, contains list of gossips. */
final class GossipRequest {
final class GossipRequest implements Externalizable {

private static final long serialVersionUID = 1L;

private List<Gossip> gossips;
private String from;

/** Instantiates empty gossip request for deserialization purpose. */
GossipRequest() {}
public GossipRequest() {}

public GossipRequest(Gossip gossip, String from) {
this(Collections.singletonList(gossip), from);
}

public GossipRequest(List<Gossip> gossips, String from) {
Objects.requireNonNull(gossips);
Objects.requireNonNull(from);
this.gossips = new ArrayList<>(gossips);
this.from = from;
}
Expand All @@ -31,6 +39,29 @@ public String from() {
return from;
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
// gossips
out.writeInt(gossips.size());
for (Gossip gossip : gossips) {
out.writeObject(gossip);
}
// from
out.writeUTF(from);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
// gossips
int size = in.readInt();
gossips = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
gossips.add((Gossip) in.readObject());
}
// from
from = in.readUTF();
}

@Override
public String toString() {
return new StringJoiner(", ", GossipRequest.class.getSimpleName() + "[", "]")
Expand Down
Expand Up @@ -493,7 +493,10 @@ private void schedulePeriodicSync() {
private Message prepareSyncDataMsg(String qualifier, String cid) {
List<MembershipRecord> membershipRecords = new ArrayList<>(membershipTable.values());
SyncData syncData = new SyncData(membershipRecords, membershipConfig.syncGroup());
return Message.withData(syncData).qualifier(qualifier).correlationId(cid).build();
return Message.withData(syncData)
.qualifier(qualifier)
.correlationId(Optional.ofNullable(cid).orElse("null"))
.build();
}

private Mono<Void> syncMembership(SyncData syncData, boolean onStart) {
Expand Down

0 comments on commit f5c4505

Please sign in to comment.