Skip to content

Commit

Permalink
adding additional worker tests (apache#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and sijie committed Mar 4, 2018
1 parent 6c380c4 commit f50a68e
Show file tree
Hide file tree
Showing 3 changed files with 806 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.pulsar.functions.worker;

import lombok.*;
import lombok.experimental.Accessors;

@Data
@Setter
@Getter
@EqualsAndHashCode
@ToString
@Accessors(chain = true)
public class FunctionAction {

public enum Action {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;

import com.google.protobuf.ByteString;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -57,46 +58,49 @@
public class FunctionRuntimeManager implements AutoCloseable {

// tenant -> namespace -> (function name, FunctionRuntimeInfo)
private final Map<String, Map<String, Map<String, FunctionRuntimeInfo>>> functionMap = new ConcurrentHashMap<>();
final Map<String, Map<String, Map<String, FunctionRuntimeInfo>>> functionMap = new ConcurrentHashMap<>();

// A map in which the key is the service request id and value is the service request
private final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();
final Map<String, ServiceRequestInfo> pendingServiceRequests = new ConcurrentHashMap<>();

private final PulsarClient pulsarClient;
final PulsarClient pulsarClient;

private final ServiceRequestManager serviceRequestManager;
final ServiceRequestManager serviceRequestManager;

private final WorkerConfig workerConfig;
final WorkerConfig workerConfig;

private LinkedBlockingQueue<FunctionAction> actionQueue;
LinkedBlockingQueue<FunctionAction> actionQueue;

private boolean initializePhase = true;
private final String initializeMarkerRequestId = UUID.randomUUID().toString();
boolean initializePhase = true;
final String initializeMarkerRequestId = UUID.randomUUID().toString();

// The message id of the last messaged processed by function runtime manager
private MessageId lastProcessedMessageId;
MessageId lastProcessedMessageId = MessageId.earliest;

private PulsarAdmin pulsarAdminClient;
PulsarAdmin pulsarAdminClient;

public FunctionRuntimeManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
LinkedBlockingQueue<FunctionAction> actionQueue) throws PulsarClientException {
this.workerConfig = workerConfig;
this.pulsarClient = pulsarClient;
this.serviceRequestManager = new ServiceRequestManager(
this.pulsarClient.createProducer(this.workerConfig.getFunctionMetadataTopic()));
this.serviceRequestManager = getServiceRequestManager(this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
this.actionQueue = actionQueue;
}

public boolean isInitializePhase() {
ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException {
return new ServiceRequestManager(pulsarClient.createProducer(functionMetadataTopic));
}

boolean isInitializePhase() {
return initializePhase;
}

public void setInitializePhase(boolean initializePhase) {
void setInitializePhase(boolean initializePhase) {
this.initializePhase = initializePhase;
}

public void sendIntializationMarker() {
void sendIntializationMarker() {
log.info("Sending Initialize message...");
this.serviceRequestManager.submitRequest(
ServiceRequestUtils.getIntializationRequest(
Expand Down Expand Up @@ -129,7 +133,7 @@ public FunctionRuntimeInfo getFunction(FunctionMetaData functionMetaData) {
return getFunction(functionMetaData.getFunctionConfig());
}

public List<FunctionRuntimeInfo> getAllFunctions() {
List<FunctionRuntimeInfo> getAllFunctions() {
List<FunctionRuntimeInfo> ret = new LinkedList<>();
for (Map<String, Map<String, FunctionRuntimeInfo>> i : this.functionMap.values()) {
for (Map<String, FunctionRuntimeInfo> j : i.values()) {
Expand Down Expand Up @@ -204,7 +208,7 @@ public boolean containsFunction(FunctionMetaData functionMetaData) {
return containsFunction(functionMetaData.getFunctionConfig());
}

private boolean containsFunction(FunctionConfig functionConfig) {
boolean containsFunction(FunctionConfig functionConfig) {
return containsFunction(
functionConfig.getTenant(), functionConfig.getNamespace(), functionConfig.getName());
}
Expand All @@ -220,7 +224,7 @@ public boolean containsFunction(String tenant, String namespace, String function
return false;
}

private CompletableFuture<RequestResult> submit(ServiceRequest serviceRequest) {
CompletableFuture<RequestResult> submit(ServiceRequest serviceRequest) {
ServiceRequestInfo serviceRequestInfo = ServiceRequestInfo.of(serviceRequest);
CompletableFuture<MessageId> messageIdCompletableFuture = this.serviceRequestManager.submitRequest(serviceRequest);

Expand All @@ -234,7 +238,7 @@ private CompletableFuture<RequestResult> submit(ServiceRequest serviceRequest) {
return requestResultCompletableFuture;
}

public void processRequest(MessageId messageId, ServiceRequest serviceRequest) {
void processRequest(MessageId messageId, ServiceRequest serviceRequest) {
// make sure that snapshotting and processing requests don't happen simultaneously
synchronized (this) {
switch (serviceRequest.getServiceRequestType()) {
Expand Down Expand Up @@ -272,11 +276,11 @@ private void completeRequest(ServiceRequest serviceRequest, boolean isSuccess, S
}
}

private void completeRequest(ServiceRequest serviceRequest, boolean isSuccess) {
void completeRequest(ServiceRequest serviceRequest, boolean isSuccess) {
completeRequest(serviceRequest, isSuccess, null);
}

public void proccessDeregister(ServiceRequest deregisterRequest) {
void proccessDeregister(ServiceRequest deregisterRequest) {

FunctionMetaData deregisterRequestFs = deregisterRequest.getFunctionMetaData();
String functionName = deregisterRequestFs.getFunctionConfig().getName();
Expand Down Expand Up @@ -307,7 +311,7 @@ public void proccessDeregister(ServiceRequest deregisterRequest) {
}
}

public void processUpdate(ServiceRequest updateRequest) {
void processUpdate(ServiceRequest updateRequest) {

log.debug("Process update request: {}", updateRequest);

Expand Down Expand Up @@ -352,7 +356,7 @@ public void processUpdate(ServiceRequest updateRequest) {
* Restores the latest snapshot into in memory state
* @return the message Id associated with the latest snapshot
*/
public MessageId restore() {
MessageId restore() {
List<Integer> snapshots = getSnapshotTopics();
if (snapshots.isEmpty()) {
// if no snapshot that go to earliest message in fmt
Expand Down Expand Up @@ -386,7 +390,7 @@ latestsSnapshot, MessageId.earliest, new ReaderConfiguration())){
/**
* Snap shots the current state and puts it in a topic. Only one worker should execute this at a time
*/
public void snapshot() {
void snapshot() {
Snapshot.Builder snapshotBuilder = Snapshot.newBuilder();

List<Integer> snapshots = getSnapshotTopics();
Expand All @@ -412,12 +416,8 @@ public void snapshot() {
log.info("Writing snapshot to {} with last message id {}", nextSnapshotTopic, this.lastProcessedMessageId);
snapshotBuilder.setLastAppliedMessageId(ByteString.copyFrom(this.lastProcessedMessageId.toByteArray()));
}
try (Producer producer = this.pulsarClient.createProducer(nextSnapshotTopic)){
producer.send(snapshotBuilder.build().toByteArray());
} catch (PulsarClientException e) {
log.error("Failed to write snapshot", e);
throw new RuntimeException(e);
}

this.writeSnapshot(nextSnapshotTopic, snapshotBuilder.build());

// deleting older snapshots
for (Integer snapshotIndex : snapshots) {
Expand All @@ -430,7 +430,16 @@ public void snapshot() {
}
}

private void deleteSnapshot(String snapshotTopic) {
void writeSnapshot(String topic, Snapshot snapshot) {
try (Producer producer = this.pulsarClient.createProducer(topic)){
producer.send(snapshot.toByteArray());
} catch (PulsarClientException e) {
log.error("Failed to write snapshot", e);
throw new RuntimeException(e);
}
}

void deleteSnapshot(String snapshotTopic) {
PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
try {
pulsarAdmin.persistentTopics().delete(snapshotTopic);
Expand All @@ -440,7 +449,7 @@ private void deleteSnapshot(String snapshotTopic) {
}
}

private List<Integer> getSnapshotTopics() {
List<Integer> getSnapshotTopics() {
PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
String namespace = workerConfig.getPulsarFunctionsNamespace();
String snapshotsTopicPath = workerConfig.getFunctionMetadataSnapshotsTopicPath();
Expand All @@ -463,7 +472,7 @@ private List<Integer> getSnapshotTopics() {
return ret;
}

private void addFunctionToFunctionMap(FunctionMetaData functionMetaData) {
void addFunctionToFunctionMap(FunctionMetaData functionMetaData) {
FunctionConfig functionConfig = functionMetaData.getFunctionConfig();
if (!this.functionMap.containsKey(functionConfig.getTenant())) {
this.functionMap.put(functionConfig.getTenant(), new ConcurrentHashMap<>());
Expand Down Expand Up @@ -493,7 +502,7 @@ private boolean isSendByMe(ServiceRequest serviceRequest) {
return this.workerConfig.getWorkerId().equals(serviceRequest.getWorkerId());
}

private void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase()) {
FunctionAction functionAction = new FunctionAction();
functionAction.setAction(FunctionAction.Action.STOP);
Expand All @@ -506,7 +515,7 @@ private void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
}
}

private void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase()) {
FunctionAction functionAction = new FunctionAction();
functionAction.setAction(FunctionAction.Action.START);
Expand All @@ -523,7 +532,7 @@ private boolean isMyInitializeMarkerRequest(ServiceRequest serviceRequest) {
return isSendByMe(serviceRequest) && this.initializeMarkerRequestId.equals(serviceRequest.getRequestId());
}

public void processInitializeMarker(ServiceRequest serviceRequest) {
void processInitializeMarker(ServiceRequest serviceRequest) {
if (isMyInitializeMarkerRequest(serviceRequest)) {
this.setInitializePhase(false);
log.info("Initializing Metadata state done!");
Expand All @@ -542,7 +551,7 @@ public void processInitializeMarker(ServiceRequest serviceRequest) {
}
}

public PulsarAdmin getPulsarAdminClient() {
private PulsarAdmin getPulsarAdminClient() {
if (this.pulsarAdminClient == null) {
this.pulsarAdminClient = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl());
}
Expand Down
Loading

0 comments on commit f50a68e

Please sign in to comment.