Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sendEvents Async #7138

Merged
merged 14 commits into from
Jan 24, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
Expand Down Expand Up @@ -209,11 +208,10 @@ protected void upsertExternalRelationship(String userId, String firstGUID, Strin

TypeDef relationshipTypeDef = repositoryHelper.getTypeDefByName(userId, relationshipTypeName);

genericHandler.linkElementToElement(userId, externalSourceGUID, externalSourceName, firstGUID,
genericHandler.uncheckedLinkElementToElement(userId, externalSourceGUID, externalSourceName, firstGUID,
CommonMapper.GUID_PROPERTY_NAME, firstEntityTypeName, secondGUID, CommonMapper.GUID_PROPERTY_NAME,
secondEntityTypeName, false, false, null,
relationshipTypeDef.getGUID(), relationshipTypeName, relationshipProperties, null,
null, getNow(), methodName);
relationshipTypeDef.getGUID(), relationshipTypeName, relationshipProperties, getNow(), methodName);
} else {
Relationship originalRelationship = relationship.get();
String relationshipGUID = originalRelationship.getGUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ void upsertExternalRelationship() throws InvalidParameterException, PropertyServ
ENTITY_TYPE_NAME, EXTERNAL_SOURCE_DE_QUALIFIED_NAME, null);

verify(invalidParameterHandler, times(1)).validateUserId(USER, methodName);
verify(genericHandler, times(1)).linkElementToElement(USER, EXTERNAL_SOURCE_DE_GUID, EXTERNAL_SOURCE_DE_QUALIFIED_NAME, FIRST_GUID,
verify(genericHandler, times(1)).uncheckedLinkElementToElement(USER, EXTERNAL_SOURCE_DE_GUID, EXTERNAL_SOURCE_DE_QUALIFIED_NAME, FIRST_GUID,
CommonMapper.GUID_PROPERTY_NAME, ENTITY_TYPE_NAME, SECOND_GUID, CommonMapper.GUID_PROPERTY_NAME,
ENTITY_TYPE_NAME, false, false, null,
RELATIONSHIP_TYPE_GUID, RELATIONSHIP_TYPE_NAME, null, null, null, null, methodName);
RELATIONSHIP_TYPE_GUID, RELATIONSHIP_TYPE_NAME, null, null, methodName);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.odpi.openmetadata.repositoryservices.events.OMRSRegistryEvent;
import org.odpi.openmetadata.repositoryservices.events.OMRSTypeDefEvent;

import java.util.concurrent.CompletableFuture;

/**
* OMRSTopic defines the interface to the messaging Topic for OMRS Events.
* It implemented by the OMRSTopicConnector.
Expand Down Expand Up @@ -61,16 +63,17 @@ void registerListener(OMRSTopicRepositoryEventListener newListener,
* @param event OMRSRegistryEvent object containing the event properties.
* @throws ConnectorCheckedException the connector is not able to communicate with the event bus
*/
void sendRegistryEvent(OMRSRegistryEvent event) throws ConnectorCheckedException;
CompletableFuture<Boolean> sendRegistryEvent(OMRSRegistryEvent event) throws ConnectorCheckedException;


/**
* Sends the supplied event to the topic.
*
* @param event OMRSTypeDefEvent object containing the event properties.
* @return a future that has the result (boolean) of sendEvent
* @throws ConnectorCheckedException the connector is not able to communicate with the event bus
*/
void sendTypeDefEvent(OMRSTypeDefEvent event) throws ConnectorCheckedException;
CompletableFuture<Boolean> sendTypeDefEvent(OMRSTypeDefEvent event) throws ConnectorCheckedException;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.odpi.openmetadata.frameworks.connectors.ConnectorBase;
import org.odpi.openmetadata.frameworks.connectors.VirtualConnectorExtension;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditingComponent;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicConnector;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicListener;
Expand All @@ -20,13 +19,16 @@
import org.odpi.openmetadata.repositoryservices.events.OMRSTypeDefEvent;
import org.odpi.openmetadata.repositoryservices.events.beans.OMRSEventBean;
import org.odpi.openmetadata.repositoryservices.events.beans.v1.OMRSEventV1;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode;
import org.odpi.openmetadata.repositoryservices.ffdc.exception.OMRSLogicErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;


/**
Expand Down Expand Up @@ -335,44 +337,48 @@ private void handleUnsupportedEventVersion(String methodName) throws ConnectorCh
/**
* Send the registry event to the OMRS Topic connector and manage errors
*
* @param registryEvent properties of the event to send
* @param registryEvent properties of the event to send
* @return a future that contains the result of send event
* @throws ConnectorCheckedException the connector is not able to communicate with the event bus
*/
@Override
public void sendRegistryEvent(OMRSRegistryEvent registryEvent) throws ConnectorCheckedException
public CompletableFuture<Boolean> sendRegistryEvent(OMRSRegistryEvent registryEvent) throws ConnectorCheckedException
{
final String methodName = "sendRegistryEvent";

if (eventProtocolVersion == OMRSEventProtocolVersion.V1)
{
this.sendEvent(registryEvent.getOMRSEventV1(), true);
return this.sendEvent(registryEvent.getOMRSEventV1(), true);
}
else
{
this.handleUnsupportedEventVersion(methodName);
}
return CompletableFuture.completedFuture(false);
}


/**
* Send the TypeDef event to the OMRS Topic connector (providing TypeDef Events are enabled).
*
* @param typeDefEvent properties of the event to send
* @param typeDefEvent properties of the event to send
* @return a future that contains the result of sendEvent
* @throws ConnectorCheckedException the connector is not able to communicate with the event bus
*/
@Override
public void sendTypeDefEvent(OMRSTypeDefEvent typeDefEvent) throws ConnectorCheckedException
public CompletableFuture<Boolean> sendTypeDefEvent(OMRSTypeDefEvent typeDefEvent) throws ConnectorCheckedException
{
final String methodName = "sendTypeDefEvent";

if (eventProtocolVersion == OMRSEventProtocolVersion.V1)
{
this.sendEvent(typeDefEvent.getOMRSEventV1(), false);
return this.sendEvent(typeDefEvent.getOMRSEventV1(), false);
}
else
{
this.handleUnsupportedEventVersion(methodName);
}
return CompletableFuture.completedFuture(false);
}


Expand Down Expand Up @@ -402,65 +408,64 @@ public void sendInstanceEvent(OMRSInstanceEvent instanceEvent) throws ConnectorC
/**
* Sends the supplied event outbound to the OMRSTopicListeners using the event bus connectors.
*
* @param event OMRSEvent object containing the event properties
* @param event OMRSEvent object containing the event properties
* @param logEvent should an audit log message be created?
* @throws ConnectorCheckedException the connector is not able to communicate with the event bus
*/
private void sendEvent(OMRSEventV1 event,
boolean logEvent) throws ConnectorCheckedException
private CompletableFuture<Boolean> sendEvent(OMRSEventV1 event,
boolean logEvent)
{
final String methodName = "send";
marius-patrascu marked this conversation as resolved.
Show resolved Hide resolved

if (event != null)
{
try
{
ObjectMapper objectMapper = new ObjectMapper();
return CompletableFuture.supplyAsync(() -> sendEventTask(event, logEvent));
}
else
{
log.debug("Unable to send null events");

String eventString = objectMapper.writeValueAsString(event);
throw new OMRSLogicErrorException(OMRSErrorCode.OMRS_TOPIC_SEND_NULL_EVENT.getMessageDefinition(connectionName),
this.getClass().getName(),
methodName);
}
}

if ((auditLog != null) && (logEvent))
{
auditLog.logMessage(methodName,
OMRSAuditCode.OUTBOUND_TOPIC_EVENT.getMessageDefinition(event.getEventCategory().getName(),
topicName),
eventString);
}
private boolean sendEventTask(OMRSEventV1 event,
boolean logEvent)
{
final String methodName = "send";
marius-patrascu marked this conversation as resolved.
Show resolved Hide resolved
try
{
ObjectMapper objectMapper = new ObjectMapper();
lpalashevski marked this conversation as resolved.
Show resolved Hide resolved

for (OpenMetadataTopicConnector eventBusConnector : eventBusConnectors)
{
if (eventBusConnector != null)
{
eventBusConnector.sendEvent(eventString);
}
}
}
catch (ConnectorCheckedException exc)
{
log.debug("Unable to send event: " + exc.getMessage());
String eventString = objectMapper.writeValueAsString(event);

throw exc;
if ((auditLog != null) && logEvent)
{
auditLog.logMessage(methodName,
OMRSAuditCode.OUTBOUND_TOPIC_EVENT.getMessageDefinition(event.getEventCategory().getName(),
topicName),
eventString);
}
catch (Exception exc)

for (OpenMetadataTopicConnector eventBusConnector : eventBusConnectors)
{
log.debug("Unexpected error sending event: " + exc.getMessage());

throw new ConnectorCheckedException(OMRSErrorCode.OMRS_TOPIC_SEND_EVENT_FAILED.getMessageDefinition(connectionName,
event.toString(),
exc.getMessage()),
this.getClass().getName(),
methodName,
exc);
if (eventBusConnector != null)
{
eventBusConnector.sendEvent(eventString);
}
}
}
else
catch (ConnectorCheckedException exc)
{
log.debug("Unable to send null events");

throw new OMRSLogicErrorException(OMRSErrorCode.OMRS_TOPIC_SEND_NULL_EVENT.getMessageDefinition(connectionName),
this.getClass().getName(),
methodName);
log.debug("Unable to send event: " + exc.getMessage());
throw new CompletionException(exc);
}
catch (Exception exc)
{
log.debug("Unexpected error sending event: " + exc.getMessage());
throw new CompletionException(exc);
}
return true;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import org.odpi.openmetadata.frameworks.connectors.ffdc.InvalidParameterException;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

/**
* OpenMetadataTopicConnectorBase is a base class to topic connectors that only send
* events on the embedded event bus connector
Expand Down Expand Up @@ -40,9 +43,25 @@ protected void sendEvent(String event) throws InvalidParameterException,
/*
* Each of the event bus connectors need to be passed the new event.
*/
for (OpenMetadataTopicConnector eventBusConnector : eventBusConnectors)
{
eventBusConnector.sendEvent(event);
try {
CompletableFuture.runAsync(() -> {
marius-patrascu marked this conversation as resolved.
Show resolved Hide resolved
for (OpenMetadataTopicConnector eventBusConnector : eventBusConnectors) {
try {
eventBusConnector.sendEvent(event);
} catch (ConnectorCheckedException e) {
throw new CompletionException(e);
}
}
});
// exceptions from sendEvent are wrapped in CompletionException
} catch (CompletionException exception) {
if (exception.getCause() instanceof ConnectorCheckedException) {
throw (ConnectorCheckedException) exception.getCause();
} else if (exception.getCause() instanceof InvalidParameterException) {
throw (InvalidParameterException) exception.getCause();
} else {
throw exception;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@

import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.odpi.openmetadata.frameworks.connectors.properties.beans.Connection;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode;
import org.odpi.openmetadata.repositoryservices.connectors.omrstopic.OMRSTopicConnector;
import org.odpi.openmetadata.repositoryservices.events.*;
import org.odpi.openmetadata.repositoryservices.events.OMRSEventOriginator;
import org.odpi.openmetadata.repositoryservices.events.OMRSRegistryEvent;
import org.odpi.openmetadata.repositoryservices.events.OMRSRegistryEventErrorCode;
import org.odpi.openmetadata.repositoryservices.events.OMRSRegistryEventProcessor;
import org.odpi.openmetadata.repositoryservices.events.OMRSRegistryEventType;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSAuditCode;
import org.odpi.openmetadata.repositoryservices.ffdc.OMRSErrorCode;
import org.odpi.openmetadata.repositoryservices.ffdc.exception.OMRSLogicErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletionException;


/**
Expand Down Expand Up @@ -90,10 +95,18 @@ private boolean sendRegistryEvent(OMRSRegistryEvent registryEvent)
for (OMRSTopicConnector omrsTopicConnector : omrsTopicConnectors)
{
log.debug("topicConnector: " + omrsTopicConnector);
omrsTopicConnector.sendRegistryEvent(registryEvent);
successFlag = successFlag && omrsTopicConnector.sendRegistryEvent(registryEvent).get();
}
}
// exceptions from sendEvent are wrapped in CompletionException
catch (CompletionException exception)
{
auditLog.logException(actionDescription,
OMRSAuditCode.SEND_REGISTRY_EVENT_ERROR.getMessageDefinition(publisherName),
"registryEvent : " + registryEvent,
exception.getCause());

successFlag = true;
log.debug("Exception: " + exception.getCause() + "; Registry Event: " + registryEvent);
}
catch (Exception error)
{
Expand Down
Loading