Skip to content

Commit

Permalink
Refactor configuration (#2997)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Klish <aklish@gmail.com>
  • Loading branch information
justin-tay and aklish committed May 30, 2023
1 parent e570df5 commit 30816cb
Show file tree
Hide file tree
Showing 53 changed files with 555 additions and 350 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ public class CSVExportFormatter implements TableExportFormatter {
private static final String COMMA = ",";
private static final String DOUBLE_QUOTES = "\"";

private boolean skipCSVHeader = false;
private boolean writeHeader = true;
private ObjectMapper mapper;

public CSVExportFormatter(Elide elide, boolean skipCSVHeader) {
this.skipCSVHeader = skipCSVHeader;
public CSVExportFormatter(Elide elide, boolean writeHeader) {
this.writeHeader = writeHeader;
this.mapper = elide.getMapper().getObjectMapper();
}

Expand Down Expand Up @@ -91,7 +91,7 @@ private String generateCSVHeader(EntityProjection projection) {

@Override
public String preFormat(EntityProjection projection, TableExport query) {
if (projection == null || skipCSVHeader) {
if (projection == null || !writeHeader) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.Data;

import java.security.Principal;
import java.time.Duration;
import java.util.concurrent.Callable;

/**
Expand All @@ -32,11 +33,11 @@
@Data
public abstract class AsyncAPIHook<T extends AsyncAPI> implements LifeCycleHook<T> {
private final AsyncExecutorService asyncExecutorService;
private final Integer maxAsyncAfterSeconds;
private final long maxAsyncAfterSeconds;

public AsyncAPIHook(AsyncExecutorService asyncExecutorService, Integer maxAsyncAfterSeconds) {
protected AsyncAPIHook(AsyncExecutorService asyncExecutorService, Duration maxAsyncAfter) {
this.asyncExecutorService = asyncExecutorService;
this.maxAsyncAfterSeconds = maxAsyncAfterSeconds;
this.maxAsyncAfterSeconds = maxAsyncAfter.toSeconds();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.yahoo.elide.core.security.RequestScope;
import com.yahoo.elide.graphql.QueryRunner;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;

Expand All @@ -27,8 +28,8 @@
*/
public class AsyncQueryHook extends AsyncAPIHook<AsyncQuery> {

public AsyncQueryHook (AsyncExecutorService asyncExecutorService, Integer maxAsyncAfterSeconds) {
super(asyncExecutorService, maxAsyncAfterSeconds);
public AsyncQueryHook (AsyncExecutorService asyncExecutorService, Duration maxAsyncAfter) {
super(asyncExecutorService, maxAsyncAfter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@
import com.yahoo.elide.core.security.ChangeSpec;
import com.yahoo.elide.core.security.RequestScope;

import lombok.EqualsAndHashCode;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

/**
* LifeCycle Hook for execution of TableExpoer.
* LifeCycle Hook for execution of TableExport.
*/
@EqualsAndHashCode(callSuper = true)
public class TableExportHook extends AsyncAPIHook<TableExport> {
Map<ResultType, TableExportFormatter> supportedFormatters;
ResultStorageEngine engine;
private final Map<ResultType, TableExportFormatter> supportedFormatters;
private final ResultStorageEngine engine;

public TableExportHook (AsyncExecutorService asyncExecutorService, Integer maxAsyncAfterSeconds,
public TableExportHook (AsyncExecutorService asyncExecutorService, Duration maxAsyncAfter,
Map<ResultType, TableExportFormatter> supportedFormatters, ResultStorageEngine engine) {
super(asyncExecutorService, maxAsyncAfterSeconds);
super(asyncExecutorService, maxAsyncAfter);
this.supportedFormatters = supportedFormatters;
this.engine = engine;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

Expand All @@ -44,7 +45,7 @@ public class ExportApiEndpoint {
@AllArgsConstructor
public static class ExportApiProperties {
private ExecutorService executor;
private Integer maxDownloadTimeSeconds;
private Duration maxDownloadTime;
}

@Inject
Expand All @@ -65,7 +66,7 @@ public ExportApiEndpoint(
@Path("/{asyncQueryId}")
public void get(@PathParam("asyncQueryId") String asyncQueryId, @Context HttpServletResponse httpServletResponse,
@Suspended final AsyncResponse asyncResponse) {
asyncResponse.setTimeout(exportApiProperties.getMaxDownloadTimeSeconds(), TimeUnit.SECONDS);
asyncResponse.setTimeout(exportApiProperties.getMaxDownloadTime().toSeconds(), TimeUnit.SECONDS);
asyncResponse.setTimeoutHandler(async -> {
ResponseBuilder resp = Response.status(Response.Status.REQUEST_TIMEOUT).entity("Timed out.");
async.resume(resp.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;

import java.time.Clock;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -26,58 +28,60 @@
@Slf4j
public class AsyncCleanerService {

private final int defaultCleanupDelayMinutes = 120;
private final int maxInitialDelayMinutes = 100;
private static final int DEFAULT_CLEANUP_DELAY_MINUTES = 120;
private static final int MAX_INITIAL_DELAY_MINUTES = 100;
private static AsyncCleanerService asyncCleanerService = null;

@Inject
private AsyncCleanerService(Elide elide, Integer maxRunTimeSeconds, Integer queryCleanupDays,
Integer cancelDelaySeconds, AsyncAPIDAO asyncQueryDao) {
private AsyncCleanerService(Elide elide, Duration queryMaxRunTime, Duration queryRetentionDuration,
Duration queryCancellationCheckInterval, AsyncAPIDAO asyncQueryDao) {

//If query is still running for twice than maxRunTime, then interrupt did not work due to host/app crash.
int queryRunTimeThresholdMinutes = (int) TimeUnit.SECONDS.toMinutes(maxRunTimeSeconds * 2 + 30L);
Duration queryRunTimeThreshold = Duration.ofSeconds(queryMaxRunTime.getSeconds() * 2 + 30L);

// Setting up query cleaner that marks long running query as TIMEDOUT.
ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
AsyncAPICleanerRunnable cleanUpTask = new AsyncAPICleanerRunnable(
queryRunTimeThresholdMinutes, elide, queryCleanupDays, asyncQueryDao, new DateUtil());
queryRunTimeThreshold, elide, queryRetentionDuration, asyncQueryDao,
Clock.systemUTC());

// Since there will be multiple hosts running the elide service,
// setting up random delays to avoid all of them trying to cleanup at the same time.
Random random = new Random();
int initialDelayMinutes = random.ints(0, maxInitialDelayMinutes).limit(1).findFirst().getAsInt();
int initialDelayMinutes = random.ints(0, MAX_INITIAL_DELAY_MINUTES).limit(1).findFirst().getAsInt();
log.debug("Initial Delay for cleaner service is {}", initialDelayMinutes);

//Having a delay of at least DEFAULT_CLEANUP_DELAY between two cleanup attempts.
//Or maxRunTimeMinutes * 2 so that this process does not coincides with query
//interrupt process.

cleaner.scheduleWithFixedDelay(cleanUpTask, initialDelayMinutes, Math.max(defaultCleanupDelayMinutes,
queryRunTimeThresholdMinutes), TimeUnit.MINUTES);
cleaner.scheduleWithFixedDelay(cleanUpTask, initialDelayMinutes, Math.max(DEFAULT_CLEANUP_DELAY_MINUTES,
queryRunTimeThreshold.toMinutes()), TimeUnit.MINUTES);

//Setting up query cancel service that cancels long running queries based on status or runtime
ScheduledExecutorService cancellation = Executors.newSingleThreadScheduledExecutor();

AsyncAPICancelRunnable cancelTask = new AsyncAPICancelRunnable(maxRunTimeSeconds,
AsyncAPICancelRunnable cancelTask = new AsyncAPICancelRunnable(queryMaxRunTime,
elide, asyncQueryDao);

cancellation.scheduleWithFixedDelay(cancelTask, 0, cancelDelaySeconds, TimeUnit.SECONDS);
cancellation.scheduleWithFixedDelay(cancelTask, 0, queryCancellationCheckInterval.toSeconds(),
TimeUnit.SECONDS);
}

/**
* Initialize the singleton AsyncCleanerService object.
* If already initialized earlier, no new object is created.
* @param elide Elide Instance
* @param maxRunTimeSeconds max run times in seconds
* @param queryCleanupDays Async Query Clean up days
* @param cancelDelaySeconds Async Query Transaction cancel delay
* @param queryMaxRunTime max run times in seconds
* @param queryRetentionDuration Async Query Clean up days
* @param queryCancellationCheckInterval Async Query Transaction cancel delay
* @param asyncQueryDao DAO Object
*/
public static void init(Elide elide, Integer maxRunTimeSeconds, Integer queryCleanupDays,
Integer cancelDelaySeconds, AsyncAPIDAO asyncQueryDao) {
public static void init(Elide elide, Duration queryMaxRunTime, Duration queryRetentionDuration,
Duration queryCancellationCheckInterval, AsyncAPIDAO asyncQueryDao) {
if (asyncCleanerService == null) {
asyncCleanerService = new AsyncCleanerService(elide, maxRunTimeSeconds, queryCleanupDays,
cancelDelaySeconds, asyncQueryDao);
asyncCleanerService = new AsyncCleanerService(elide, queryMaxRunTime, queryRetentionDuration,
queryCancellationCheckInterval, asyncQueryDao);
} else {
log.debug("asyncCleanerService is already initialized.");
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@

import jakarta.ws.rs.core.MultivaluedHashMap;
import jakarta.ws.rs.core.MultivaluedMap;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
Expand All @@ -42,13 +42,18 @@
*/
@Slf4j
@Data
@AllArgsConstructor
public class AsyncAPICancelRunnable implements Runnable {

private int maxRunTimeSeconds;
private long queryMaxRunTimeSeconds;
private Elide elide;
private AsyncAPIDAO asyncAPIDao;

public AsyncAPICancelRunnable(Duration queryMaxRunTime, Elide elide, AsyncAPIDAO asyncAPIDao) {
this.queryMaxRunTimeSeconds = queryMaxRunTime.toSeconds();
this.elide = elide;
this.asyncAPIDao = asyncAPIDao;
}

@Override
public void run() {
cancelAsyncAPI(AsyncQuery.class);
Expand Down Expand Up @@ -81,7 +86,7 @@ protected <T extends AsyncAPI> void cancelAsyncAPI(Class<T> type) {
Set<UUID> asyncTransactionUUIDs = StreamSupport.stream(asyncAPIIterable.spliterator(), false)
.filter(query -> query.getStatus() == QueryStatus.CANCELLED
|| TimeUnit.SECONDS.convert(Math.abs(new Date(System.currentTimeMillis()).getTime()
- query.getCreatedOn().getTime()), TimeUnit.MILLISECONDS) > maxRunTimeSeconds)
- query.getCreatedOn().getTime()), TimeUnit.MILLISECONDS) > queryMaxRunTimeSeconds)
.map(query -> UUID.fromString(query.getRequestId()))
.collect(Collectors.toSet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.yahoo.elide.async.models.AsyncAPI;
import com.yahoo.elide.async.models.AsyncQuery;
import com.yahoo.elide.async.models.QueryStatus;
import com.yahoo.elide.async.service.DateUtil;
import com.yahoo.elide.async.service.dao.AsyncAPIDAO;
import com.yahoo.elide.core.Path.PathElement;
import com.yahoo.elide.core.filter.expression.AndFilterExpression;
Expand All @@ -22,7 +21,9 @@
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.Calendar;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;

/**
Expand All @@ -35,11 +36,11 @@
@AllArgsConstructor
public class AsyncAPICleanerRunnable implements Runnable {

private int maxRunTimeMinutes;
private Duration queryMaxRunTime;
private Elide elide;
private int queryCleanupDays;
private Duration queryRetentionDuration;
private AsyncAPIDAO asyncAPIDao;
private DateUtil dateUtil = new DateUtil();
private Clock clock;

@Override
public void run() {
Expand All @@ -54,7 +55,7 @@ public void run() {
protected <T extends AsyncAPI> void deleteAsyncAPI(Class<T> type) {

try {
Date cleanupDate = dateUtil.calculateFilterDate(Calendar.DATE, queryCleanupDays);
Date cleanupDate = Date.from(Instant.now(clock).plus(queryRetentionDuration));
PathElement createdOnPathElement = new PathElement(type, Long.class, "createdOn");
FilterExpression fltDeleteExp = new LEPredicate(createdOnPathElement, cleanupDate);
asyncAPIDao.deleteAsyncAPIAndResultByFilter(fltDeleteExp, type);
Expand All @@ -71,7 +72,7 @@ protected <T extends AsyncAPI> void deleteAsyncAPI(Class<T> type) {
protected <T extends AsyncAPI> void timeoutAsyncAPI(Class<T> type) {

try {
Date filterDate = dateUtil.calculateFilterDate(Calendar.MINUTE, maxRunTimeMinutes);
Date filterDate = Date.from(Instant.now(clock).plus(queryMaxRunTime));
PathElement createdOnPathElement = new PathElement(type, Long.class, "createdOn");
PathElement statusPathElement = new PathElement(type, String.class, "status");
FilterPredicate inPredicate = new InPredicate(statusPathElement, QueryStatus.PROCESSING,
Expand Down
Loading

0 comments on commit 30816cb

Please sign in to comment.