Skip to content

Commit

Permalink
Merge pull request #34 from areklalo/SYNCT-30
Browse files Browse the repository at this point in the history
SYNCT-30: The logic for retry messages
  • Loading branch information
pkornowski committed Dec 18, 2017
2 parents 127f2d4 + 59d9232 commit e1f1993
Show file tree
Hide file tree
Showing 21 changed files with 264 additions and 188 deletions.
Expand Up @@ -7,6 +7,7 @@
import org.openmrs.module.sync2.SyncModuleConfig;
import org.openmrs.module.sync2.api.model.audit.AuditMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;

public interface SyncAuditService extends OpenmrsService {
Expand All @@ -24,12 +25,10 @@ public interface SyncAuditService extends OpenmrsService {
String getPaginatedMessages(Integer page, Integer pageSize, Boolean success, String action, String resourceName) throws APIException;

@Authorized(SyncModuleConfig.SYNC_AUDIT_PRIVILEGE)
AuditMessage saveSuccessfulAudit(String resourceName, String resourceUrl, String action, String error) throws APIException;
@Transactional
AuditMessage saveAuditMessage(AuditMessage auditMessage) throws APIException;

@Authorized(SyncModuleConfig.SYNC_AUDIT_PRIVILEGE)
AuditMessage saveFailedAudit(String resourceName, String resourceUrl, String action, String error) throws APIException;

@Authorized(SyncModuleConfig.SYNC_AUDIT_PRIVILEGE)
@Transactional
AuditMessage saveAuditMessage(AuditMessage auditMessage);
AuditMessage setNextAudit(AuditMessage current, AuditMessage next) throws APIException;
}
@@ -1,8 +1,10 @@
package org.openmrs.module.sync2.api;

import org.openmrs.module.sync2.api.model.audit.AuditMessage;

import java.util.Map;

public interface SyncPullService {

void pullDataFromParentAndSave(String category, Map<String, String> resourceLinks, String address, String action);
AuditMessage pullDataFromParentAndSave(String category, Map<String, String> resourceLinks, String address, String action);
}
@@ -1,8 +1,10 @@
package org.openmrs.module.sync2.api;

import org.openmrs.module.sync2.api.model.audit.AuditMessage;

import java.util.Map;

public interface SyncPushService {

void readDataAndPushToParent(String category, Map<String, String> resourceLinks, String address, String action);
AuditMessage readDataAndPushToParent(String category, Map<String, String> resourceLinks, String address, String action);
}
@@ -0,0 +1,12 @@
package org.openmrs.module.sync2.api;

import org.openmrs.annotation.Authorized;
import org.openmrs.api.APIException;
import org.openmrs.module.sync2.SyncModuleConfig;
import org.openmrs.module.sync2.api.model.audit.AuditMessage;

public interface SyncRetryService {

@Authorized(SyncModuleConfig.SYNC_AUDIT_PRIVILEGE)
AuditMessage retryMessage(AuditMessage message) throws APIException;
}
Expand Up @@ -4,7 +4,6 @@
import org.hibernate.Criteria;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.criterion.Order;
import org.hibernate.criterion.Projections;
import org.hibernate.criterion.Restrictions;
import org.openmrs.api.db.hibernate.DbSession;
Expand All @@ -15,7 +14,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import java.time.LocalDate;
import java.util.List;


Expand Down Expand Up @@ -54,24 +52,7 @@ public Long getCountOfMessages() {
}

public AuditMessage saveItem(AuditMessage auditMessage) {
Session session = sessionFactory.getHibernateSessionFactory().openSession();
Transaction tx = null;
try {
tx = session.beginTransaction();
session.saveOrUpdate(auditMessage);
tx.commit();
}
catch (Exception ex) {
if (tx != null) {
tx.rollback();
}
throw ex;
}
finally {
if (session != null) {
session.close();
}
}
getSession().saveOrUpdate(auditMessage);
return auditMessage;
}

Expand Down
Expand Up @@ -48,38 +48,6 @@ public String getPaginatedMessages(Integer page, Integer pageSize, Boolean succe
AuditMessageList result = new AuditMessageList(dao.getCountOfMessages(), page, pageSize, auditMessageList);
return serializeResults(result);
}

@Override
public AuditMessage saveSuccessfulAudit(String resourceName, String resourceUrl, String operation, String details) throws APIException {
if (configuration.getSyncConfiguration().getGeneral().isPersistSuccessAudit()) {
AuditMessage newItem = new AuditMessage();
newItem.setSuccess(true);
newItem.setTimestamp(new Timestamp(System.currentTimeMillis()));
newItem.setResourceName(resourceName);
newItem.setUsedResourceUrl(resourceUrl);
newItem.setOperation(operation);
newItem.setDetails(details);

return dao.saveItem(newItem);
}
return null;
}

@Override
public AuditMessage saveFailedAudit(String resourceName, String resourceUrl, String operation, String details) throws APIException {
if (configuration.getSyncConfiguration().getGeneral().isPersistFailureAudit()) {
AuditMessage newItem = new AuditMessage();
newItem.setSuccess(false);
newItem.setTimestamp(new Timestamp(System.currentTimeMillis()));
newItem.setResourceName(resourceName);
newItem.setUsedResourceUrl(resourceUrl);
newItem.setOperation(operation);
newItem.setDetails(details);

return dao.saveItem(newItem);
}
return null;
}

@Override
public AuditMessage saveAuditMessage(AuditMessage auditMessage) {
Expand All @@ -95,7 +63,16 @@ public AuditMessage saveAuditMessage(AuditMessage auditMessage) {
}
return null;
}


@Override
public AuditMessage setNextAudit(AuditMessage current, AuditMessage next) throws APIException {
if (current == null || next == null) {
return null;
}
current.setNextMessage(next.getId());
return dao.saveItem(current);
}

private String serializeResults(AuditMessageList results) {
GsonBuilder gsonBuilder = new GsonBuilder().serializeNulls();
gsonBuilder.registerTypeAdapter(AuditMessage.class, new AuditMessage.AuditMessageSerializer());
Expand Down
Expand Up @@ -11,13 +11,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

import java.sql.Timestamp;
import java.util.Map;

import static org.openmrs.module.sync2.SyncConstants.PULL_OPERATION;
import static org.openmrs.module.sync2.SyncConstants.PULL_SUCCESS_MESSAGE;
import static org.openmrs.module.sync2.api.utils.SyncUtils.getPreferredClient;
import static org.openmrs.module.sync2.api.utils.SyncUtils.getPreferredUrl;

@Component("sync2.syncPullService")
Expand All @@ -29,20 +31,18 @@ public class SyncPullServiceImpl implements SyncPullService {
private SyncConfigurationService configurationService;

@Autowired
private SyncAuditService auditService;
private SyncAuditService syncAuditService;

private SyncClient syncClient = new SyncClient();
private SyncPersistence syncPersistence = new SyncPersistence();

public void pullDataFromParentAndSave(String category, Map<String, String> resourceLinks, String address, String action) {
public AuditMessage pullDataFromParentAndSave(String category, Map<String, String> resourceLinks, String address, String action) {
LOGGER.info(String.format("Pull category: %s, address: %s, action: %s", category, address, action));

AuditMessage auditMessage = prepareBaseAuditMessage();
auditMessage.setResourceName(category);
auditMessage.setUsedResourceUrl(getPreferredUrl(resourceLinks));
auditMessage.setAvailableResourceUrls(SyncUtils.serializeMap(resourceLinks));
auditMessage.setParentUrl(getParentUri());
auditMessage.setLocalUrl(getLocalUri());
auditMessage.setAction(action);

try {
Expand All @@ -56,8 +56,9 @@ public void pullDataFromParentAndSave(String category, Map<String, String> resou
auditMessage.setSuccess(false);
auditMessage.setDetails(ExceptionUtils.getFullStackTrace(e));
} finally {
auditService.saveAuditMessage(auditMessage);
auditMessage = syncAuditService.saveAuditMessage(auditMessage);
}
return auditMessage;
}

private String getParentUri() {
Expand All @@ -72,6 +73,9 @@ private AuditMessage prepareBaseAuditMessage() {
AuditMessage auditMessage = new AuditMessage();
auditMessage.setTimestamp(new Timestamp(System.currentTimeMillis()));
auditMessage.setOperation(PULL_OPERATION);
auditMessage.setParentUrl(getParentUri());
auditMessage.setLocalUrl(getLocalUri());
auditMessage.setLinkType(getPreferredClient());
return auditMessage;
}
}
Expand Up @@ -19,6 +19,7 @@

import static org.openmrs.module.sync2.SyncConstants.PUSH_OPERATION;
import static org.openmrs.module.sync2.SyncConstants.PUSH_SUCCESS_MESSAGE;
import static org.openmrs.module.sync2.api.utils.SyncUtils.getPreferredClient;
import static org.openmrs.module.sync2.api.utils.SyncUtils.getPreferredUrl;

@Component("sync2.syncPushService")
Expand All @@ -31,37 +32,35 @@ public class SyncPushServiceImpl implements SyncPushService {
private SyncConfigurationService configurationService;

@Autowired
private SyncAuditService auditService;
private SyncAuditService syncAuditService;

private SyncClient syncClient = new SyncClient();
private SyncPersistence syncPersistence = new SyncPersistence();

@Override
public void readDataAndPushToParent(String category, Map<String, String> resourceLinks, String address, String action) {
public AuditMessage readDataAndPushToParent(String category, Map<String, String> resourceLinks, String address, String action) {
LOGGER.info(String.format("SyncPushService category: %s, address: %s, action: %s", category, address, action));

AuditMessage auditMessage = prepareBaseAuditMessage();
auditMessage.setResourceName(category);
auditMessage.setUsedResourceUrl(getPreferredUrl(resourceLinks));
auditMessage.setAvailableResourceUrls(SyncUtils.serializeMap(resourceLinks));
auditMessage.setParentUrl(getParentUri());
auditMessage.setLocalUrl(getLocalUri());
auditMessage.setAction(action);

try {
String uuid = SyncUtils.extractUUIDFromResourceLinks(resourceLinks);
Object data = syncPersistence.retrieveData(getPreferredClient(), category, uuid);
syncClient.pushDataToParent(data, resourceLinks, getParentUri());

auditMessage.setSuccess(true);
auditMessage.setDetails(PUSH_SUCCESS_MESSAGE);
} catch (Exception e) {
LOGGER.error("Problem with pushing to parent", e);
auditMessage.setSuccess(false);
auditMessage.setDetails(ExceptionUtils.getFullStackTrace(e));
} finally {
auditService.saveAuditMessage(auditMessage);
auditMessage = syncAuditService.saveAuditMessage(auditMessage);
}
return auditMessage;
}

private String getPreferredClient() {
Expand All @@ -80,6 +79,9 @@ private AuditMessage prepareBaseAuditMessage() {
AuditMessage auditMessage = new AuditMessage();
auditMessage.setTimestamp(new Timestamp(System.currentTimeMillis()));
auditMessage.setOperation(PUSH_OPERATION);
auditMessage.setParentUrl(getParentUri());
auditMessage.setLocalUrl(getLocalUri());
auditMessage.setLinkType(getPreferredClient());
return auditMessage;
}
}
@@ -0,0 +1,70 @@
package org.openmrs.module.sync2.api.impl;

import org.openmrs.api.APIException;
import org.openmrs.module.sync2.api.SyncAuditService;
import org.openmrs.module.sync2.api.SyncConfigurationService;
import org.openmrs.module.sync2.api.SyncPullService;
import org.openmrs.module.sync2.api.SyncPushService;
import org.openmrs.module.sync2.api.SyncRetryService;
import org.openmrs.module.sync2.api.model.audit.AuditMessage;
import org.openmrs.module.sync2.api.utils.SyncUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

import static org.openmrs.module.sync2.SyncConstants.PULL_OPERATION;
import static org.openmrs.module.sync2.SyncConstants.PUSH_OPERATION;


@Component("sync2.SyncRetryService")
public class SyncRetryServiceImpl implements SyncRetryService {

@Autowired
private SyncPullService syncPullService;

@Autowired
private SyncPushService syncPushService;

@Autowired
private SyncAuditService syncAuditService;

@Autowired
private SyncConfigurationService configuration;

@Override
public AuditMessage retryMessage(AuditMessage message) throws APIException {
switch(message.getOperation()) {
case PULL_OPERATION:
return retryPull(message);
case PUSH_OPERATION:
return retryPush(message);
}
return null;
}

private AuditMessage retryPush(AuditMessage message) {
String parentAddress = configuration.getSyncConfiguration().getGeneral().getParentFeedLocation();
parentAddress = SyncUtils.getBaseUrl(parentAddress);

Map<String, String> map = new HashMap<>();
map.put(message.getLinkType(), message.getUsedResourceUrl());

AuditMessage newMesssage = syncPushService.readDataAndPushToParent(message.getResourceName(), map, parentAddress, message.getAction());
syncAuditService.setNextAudit(message, newMesssage);
return newMesssage;
}

private AuditMessage retryPull(AuditMessage message) {
String parentAddress = configuration.getSyncConfiguration().getGeneral().getParentFeedLocation();
parentAddress = SyncUtils.getBaseUrl(parentAddress);

Map<String, String> map = new HashMap<>();
map.put(message.getLinkType(), message.getUsedResourceUrl());

AuditMessage newMesssage = syncPullService.pullDataFromParentAndSave(message.getResourceName(), map, parentAddress, message.getAction());
syncAuditService.setNextAudit(message, newMesssage);
return newMesssage;
}
}

0 comments on commit e1f1993

Please sign in to comment.