Skip to content

Commit

Permalink
Rework cleanup process on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
pdambrauskas committed Jul 11, 2020
1 parent a633910 commit 6a07d98
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 15 deletions.
5 changes: 0 additions & 5 deletions src/main/java/com/pinterest/secor/common/FileRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ public FileWriter getOrCreateWriter(LogFilePath path, CompressionCodec codec)
mWriters.put(path, writer);
mCreationTimes.put(path, System.currentTimeMillis() / 1000L);
LOG.debug("created writer for path {}", path.getLogFilePath());
LOG.debug("Register deleteOnExit for path {}", path.getLogFilePath());
FileUtil.deleteOnExit(path.getLogFileParentDir());
FileUtil.deleteOnExit(path.getLogFileDir());
FileUtil.deleteOnExit(path.getLogFilePath());
FileUtil.deleteOnExit(path.getLogFileCrcPath());
}
return writer;
}
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/com/pinterest/secor/common/ShutdownHookRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.pinterest.secor.common;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Registry for shutdown hooks.
* Allows running shutdown hooks by specific order by executing multiple Runnables on single shutdown hook.
*
* @author Paulius Dambrauskas (p.dambrauskas@gmail.com)
*
*/
public final class ShutdownHookRegistry {
private static final Logger LOG = LoggerFactory.getLogger(ShutdownHookRegistry.class);
private static final Map<Integer, List<Runnable>> HOOKS = new ConcurrentHashMap<>();

private ShutdownHookRegistry() {
// static class cannot be initiated
}

static {
Runtime.getRuntime().addShutdownHook(new Thread(ShutdownHookRegistry::runHooks));
}

public static void registerHook(int priority, Runnable hook) {
HOOKS.computeIfAbsent(priority, key -> new ArrayList<>()).add(hook);
LOG.info("Shut down hook with priority {} added to shut down hook registry", priority);
}

public static void runHooks() {
HOOKS.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
LOG.info("Running hooks for priority {}", entry.getKey());
entry.getValue().parallelStream().forEach(Runnable::run);
});
}

public static void clear() {
HOOKS.clear();
}
}
7 changes: 2 additions & 5 deletions src/main/java/com/pinterest/secor/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
*/
package com.pinterest.secor.consumer;

import com.pinterest.secor.common.DeterministicUploadPolicyTracker;
import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.*;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.message.ParsedMessage;
import com.pinterest.secor.monitoring.MetricCollector;
Expand Down Expand Up @@ -116,7 +113,7 @@ private void init() throws Exception {
if (mDeterministicUploadPolicyTracker != null) {
throw new RuntimeException("Can't set secor.upload.on.shutdown with secor.upload.deterministic!");
}
Runtime.getRuntime().addShutdownHook(this.new FinalUploadShutdownHook());
ShutdownHookRegistry.registerHook(1, new FinalUploadShutdownHook());
}
}

Expand Down
51 changes: 51 additions & 0 deletions src/main/java/com/pinterest/secor/io/StagingDirectoryCleaner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.pinterest.secor.io;

import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Runnable used to delete staging folder content.
* Deletes folders content, while keeping folder itself.
*
* @author Paulius Dambrauskas (p.dambrauskas@gmail.com)
*
*/
public class StagingDirectoryCleaner implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(StagingDirectoryCleaner.class);

private final File mStagingDir;

public StagingDirectoryCleaner(String stagingPath) {
this.mStagingDir = new File(stagingPath);
}

@Override
public void run() {
try {
FileUtils.deleteDirectory(this.mStagingDir);
} catch (IOException e) {
LOG.error("Failed deleting file", e);
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/pinterest/secor/main/ConsumerMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import com.pinterest.secor.common.OstrichAdminService;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.ShutdownHookRegistry;
import com.pinterest.secor.consumer.Consumer;
import com.pinterest.secor.io.StagingDirectoryCleaner;
import com.pinterest.secor.tools.LogFileDeleter;
import com.pinterest.secor.util.FileUtil;
import com.pinterest.secor.util.IdUtil;
import com.pinterest.secor.util.RateLimitUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -54,6 +57,8 @@ public static void main(String[] args) {
}
try {
SecorConfig config = SecorConfig.load();
String stagingDirectoryPath = config.getLocalPath() + '/' + IdUtil.getLocalMessageDir();
ShutdownHookRegistry.registerHook(10, new StagingDirectoryCleaner(stagingDirectoryPath));
OstrichAdminService ostrichService = new OstrichAdminService(config.getOstrichPort());
ostrichService.start();
FileUtil.configure(config);
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/com/pinterest/secor/util/FileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,6 @@ public static void delete(String path) throws IOException {
}
}

public static void deleteOnExit(String path) {
File file = new File(path);
file.deleteOnExit();
}

public static void moveToCloud(String srcLocalPath, String dstCloudPath) throws IOException {
Path srcPath = new Path(srcLocalPath);
Path dstPath = new Path(dstCloudPath);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.pinterest.secor.common;

import org.junit.After;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class ShutdownHookRegistryTest {

@After
public void cleanup() {
ShutdownHookRegistry.clear();
}

@Test
public void testHookExecutionOrder() {
List<String> results = new ArrayList<>();
ShutdownHookRegistry.registerHook(9, () -> results.add("priority9"));
ShutdownHookRegistry.registerHook(1, () -> results.add("priority1"));

ShutdownHookRegistry.runHooks();

assertEquals("priority1", results.get(0));
assertEquals("priority9", results.get(1));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.pinterest.secor.io;

import java.io.IOException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import static org.junit.Assert.*;

public class StagingDirectoryCleanerTest {
@Rule
public TemporaryFolder folder = new TemporaryFolder();

@Test
public void testCleanFolderContent() throws IOException {
// Given
folder.newFolder("foo");
folder.newFolder("bar");

// When
new StagingDirectoryCleaner(folder.getRoot().getPath()).run();

// Then
assertFalse(folder.getRoot().exists());
}
}

0 comments on commit 6a07d98

Please sign in to comment.