Skip to content

Commit

Permalink
[STORE] Move to on data.path per shard
Browse files Browse the repository at this point in the history
This commit moves away from using stripe RAID-0 simumlation across multiple
data paths towards using a single path per shard. Multiple data paths are still
supported but shards and it's data is not striped across multiple paths / disks.
This will for instance prevent to loose all shards if a single disk is corrupted.

Indices that are using this features already will automatically upgraded to a single
datapath based on a simple diskspace based heuristic. In general there must be enough
diskspace to move a single shard at any time otherwise the upgrade will fail.

Closes elastic#9498
  • Loading branch information
s1monw committed Apr 7, 2015
1 parent c2f9987 commit 342cbe4
Show file tree
Hide file tree
Showing 57 changed files with 1,184 additions and 1,787 deletions.
306 changes: 306 additions & 0 deletions src/main/java/org/elasticsearch/common/util/MultiDataPathUpgrader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util;

import com.google.common.base.Charsets;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.*;

import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;

/**
*/
public class MultiDataPathUpgrader {

private final NodeEnvironment nodeEnvironment;
private final ESLogger logger = Loggers.getLogger(getClass());

public MultiDataPathUpgrader(NodeEnvironment nodeEnvironment) {
this.nodeEnvironment = nodeEnvironment;
}

/**
* Upgrades the given shard Id from multiple shard paths into the given target path.
*
* @see #pickShardPath(org.elasticsearch.index.shard.ShardId, org.elasticsearch.common.settings.Settings)
*/
public void upgrade(ShardId shard, ShardPath targetPath) throws IOException {
final Path[] paths = nodeEnvironment.shardPaths(shard); // custom data path doesn't need upgrading
if (isTargetPathConfigured(paths, targetPath) == false) {
throw new IllegalArgumentException("shard path must be one of the shards data paths");
}
if (paths.length == 1) {
logger.info("{} only one data path configured - skipping upgrade", shard);
return;
}
logger.info("{} upgrading multi data dir to {}", shard, targetPath.getDataPath());
final ShardStateMetaData loaded = ShardStateMetaData.FORMAT.loadLatestState(logger, paths);
if (loaded == null) {
throw new IllegalStateException("Can't upgrade shard without shard state");
}
logger.info("{} load shard state {}", shard, loaded);

ShardStateMetaData.FORMAT.write(loaded, loaded.version, targetPath.getShardStatePath());
Files.createDirectories(targetPath.resolveIndex());
try (SimpleFSDirectory directory = new SimpleFSDirectory(targetPath.resolveIndex())) {
try (final Lock lock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (lock.obtain(5000)) {
upgradeFiles(shard, targetPath, targetPath.resolveIndex(), ShardPath.INDEX_FOLDER_NAME, paths);
} else {
throw new IllegalStateException("Can't obtain lock on " + targetPath.resolveIndex());
}
}
}


upgradeFiles(shard, targetPath, targetPath.resolveTranslog(), ShardPath.TRANSLOG_FOLDER_NAME, paths);

logger.info("{} wipe old state files", shard);
for (Path path : paths) {
if (path.equals(targetPath.getShardStatePath()) == false) {
logger.info("{} wipe state file in {}", shard, path);
MetaDataStateFormat.deleteMetaState(path);
}
}

if (FileSystemUtils.files(targetPath.resolveIndex()).length == 0) {
throw new IllegalStateException("index folder [" + targetPath.resolveIndex() + "] is empty");
}

if (FileSystemUtils.files(targetPath.resolveTranslog()).length == 0) {
throw new IllegalStateException("translog folder [" + targetPath.resolveTranslog() + "] is empty");
}
}

/**
* Runs check-index on the target shard and throws an exception if it failed
*/
public void checkIndex(ShardPath targetPath) throws IOException {
BytesStreamOutput os = new BytesStreamOutput();
PrintStream out = new PrintStream(os, false, Charsets.UTF_8.name());
try (Directory directory = new SimpleFSDirectory(targetPath.resolveIndex());
CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.setInfoStream(out);
CheckIndex.Status status = checkIndex.checkIndex();
out.flush();
if (!status.clean) {
logger.warn("check index [failure]\n{}", new String(os.bytes().toBytes(), Charsets.UTF_8));
throw new ElasticsearchIllegalStateException("index check failure");
}
}
}

/**
* Returns true iff the given shard needs upgrading.
*/
public boolean needsUpgrading(ShardId shard, @IndexSettings Settings settings) {
final Path[] paths = nodeEnvironment.shardPaths(shard);
// custom data path doesn't need upgrading neither single path envs
if (NodeEnvironment.hasCustomDataPath(settings) == false && paths.length > 1) {
int numPathsExist = 0;
for (Path path : paths) {
if (Files.exists(path.resolve(MetaDataStateFormat.STATE_DIR_NAME))) {
numPathsExist++;
}
if (numPathsExist > 1) {
return true;
}
}
}
return false;
}

/**
* Picks a target ShardPath to allocate and upgrade the given shard to. It picks the target based on a simple
* heuristic:
* <ul>
* <li>if the smallest datapath has 2x more space available that the shards total size the datapath with the most bytes for that shard is picked to minimize the amount of bytes to copy</li>
* <li>otherwise the largest available datapath is used as the target no matter how big of a slice of the shard it already holds.</li>
* </ul>
*/
public ShardPath pickShardPath(ShardId shard, @IndexSettings Settings settings) throws IOException {
if (needsUpgrading(shard, settings) == false) {
throw new IllegalStateException("Shard doesn't need upgrading");
}
final Path[] paths = nodeEnvironment.shardPaths(shard);
// if we need upgradeing make sure we have all paths.
for (Path path : paths) {
Files.createDirectories(path);
}
final ShardFileInfo[] shardFileInfo = getShardFileInfo(shard);
long totalBytesUsedByShard = 0;
long leastUsableSpace = Long.MAX_VALUE;
long mostUsableSpace = Long.MIN_VALUE;
assert shardFileInfo.length == nodeEnvironment.shardPaths(shard).length;
for (ShardFileInfo info : shardFileInfo) {
totalBytesUsedByShard += info.spaceUsedByShard;
leastUsableSpace = Math.min(leastUsableSpace, info.usableSpace + info.spaceUsedByShard);
mostUsableSpace = Math.max(mostUsableSpace, info.usableSpace + info.spaceUsedByShard);
}

if (mostUsableSpace < totalBytesUsedByShard) {
throw new IllegalStateException("Can't upgrade path available space: " + mostUsableSpace + " bytes required space: " + totalBytesUsedByShard + " bytes");
}
ShardFileInfo target = shardFileInfo[0];
if (leastUsableSpace >= (2 * totalBytesUsedByShard)) {
for (ShardFileInfo info : shardFileInfo) {
if (info.spaceUsedByShard > target.spaceUsedByShard) {
target = info;
}
}
} else {
for (ShardFileInfo info : shardFileInfo) {
if (info.usableSpace > target.usableSpace) {
target = info;
}
}
}
return new ShardPath(target.path, target.path, settings.get(IndexMetaData.SETTING_UUID), shard);
}

private ShardFileInfo[] getShardFileInfo(ShardId shard) throws IOException {
final Path[] paths = nodeEnvironment.shardPaths(shard); // custom data path doesn't need upgrading
final ShardFileInfo[] info = new ShardFileInfo[paths.length];
for (int i = 0; i < info.length; i++) {
Path path = paths[i];
final long usableSpace = getUsabelSpace(path);
info[i] = new ShardFileInfo(path, usableSpace, getSpaceUsedByShard(path));
}
return info;
}

protected long getSpaceUsedByShard(Path path) throws IOException {
final long[] spaceUsedByShard = new long[] {0};
if (Files.exists(path)) {
Files.walkFileTree(path, new FileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (attrs.isRegularFile()) {
spaceUsedByShard[0] += attrs.size();
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return FileVisitResult.CONTINUE;
}
});
}
return spaceUsedByShard[0];
}

protected long getUsabelSpace(Path path) throws IOException {
FileStore fileStore = Files.getFileStore(path);
return fileStore.getUsableSpace();
}


static class ShardFileInfo {
final Path path;
final long usableSpace;
final long spaceUsedByShard;

ShardFileInfo(Path path, long usableSpace, long spaceUsedByShard) {
this.path = path;
this.usableSpace = usableSpace;
this.spaceUsedByShard = spaceUsedByShard;
}
}



private void upgradeFiles(ShardId shard, ShardPath targetPath, final Path targetDir, String folderName, Path[] paths) throws IOException {
List<Path> movedFiles = new ArrayList<>();
for (Path path : paths) {
if (path.equals(targetPath.getDataPath()) == false) {
final Path sourceDir = path.resolve(folderName);
if (Files.exists(sourceDir)) {
logger.info("{} upgrading {} path {} ", shard, folderName, sourceDir);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(sourceDir)) {
Files.createDirectories(targetDir);
for (Path file : stream) {
if (IndexWriter.WRITE_LOCK_NAME.equals(file.getFileName().toString())) {
continue; // skip write.lock
}
logger.info("{} move file [{}] size: [{}]", shard, file.getFileName(), Files.size(file));
final Path targetFile = targetDir.resolve(file.getFileName());
Files.move(file, targetFile);
movedFiles.add(targetFile);
}
}
}
}
}
if (movedFiles.isEmpty() == false) {
// fsync later it might be on disk already
logger.info("{} fsync files", shard);
for (Path moved : movedFiles) {
logger.info("{} syncing {}", shard, moved.getFileName());
IOUtils.fsync(moved, false);
}
logger.info("{} syncing directory {}", shard, targetDir);
IOUtils.fsync(targetDir, true);
}
}


/**
* Returns <code>true</code> iff the target path is one of the given paths.
*/
private boolean isTargetPathConfigured(final Path[] paths, ShardPath targetPath) {
for (Path path : paths) {
if (path.equals(targetPath.getDataPath())) {
return true;
}
}
return false;
}
}
19 changes: 4 additions & 15 deletions src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -447,22 +447,12 @@ public Path[] indexPaths(Index index) {
return indexPaths;
}

/**
* Returns all paths where lucene data will be stored, if a index.data_path
* setting is present, will return the custom data path to be used
*/
public Path[] shardDataPaths(ShardId shardId, @IndexSettings Settings indexSettings) {
assert indexSettings != ImmutableSettings.EMPTY;
assert assertEnvIsLocked();
if (hasCustomDataPath(indexSettings)) {
return new Path[] {resolveCustomLocation(indexSettings, shardId)};
} else {
return shardPaths(shardId);
}
}

/**
* Returns all shard paths excluding custom shard path
*
* @see #hasCustomDataPath(org.elasticsearch.common.settings.Settings)
* @see #resolveCustomLocation(org.elasticsearch.common.settings.Settings, org.elasticsearch.index.shard.ShardId)
*
*/
public Path[] shardPaths(ShardId shardId) {
assert assertEnvIsLocked();
Expand Down Expand Up @@ -640,7 +630,6 @@ public static boolean hasCustomDataPath(@IndexSettings Settings indexSettings) {
* @param indexSettings settings for the index
*/
private Path resolveCustomLocation(@IndexSettings Settings indexSettings) {
assert indexSettings != ImmutableSettings.EMPTY;
String customDataDir = indexSettings.get(IndexMetaData.SETTING_DATA_PATH);
if (customDataDir != null) {
// This assert is because this should be caught by MetaDataCreateIndexService
Expand Down
Loading

0 comments on commit 342cbe4

Please sign in to comment.