Skip to content

Commit

Permalink
Add pluggable file I/O submodule in TableOperations (#14)
Browse files Browse the repository at this point in the history
This adds FileIO that is returned by TableOperations and used to delete paths and to create InputFile and OutputFile instances. FileIO is Serializable so that it can be sent to tasks running in different JVMs and used for all file-related tasks for a table.
  • Loading branch information
mccheah authored and rdblue committed Dec 12, 2018
1 parent 521989e commit 733de8c
Show file tree
Hide file tree
Showing 20 changed files with 272 additions and 155 deletions.
10 changes: 9 additions & 1 deletion api/src/main/java/com/netflix/iceberg/Files.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Paths;

public class Files {

Expand All @@ -34,7 +35,7 @@ public static OutputFile localOutput(File file) {
}

public static OutputFile localOutput(String file) {
return localOutput(new File(file));
return localOutput(Paths.get(file).toAbsolutePath().toFile());
}

private static class LocalOutputFile implements OutputFile {
Expand All @@ -50,6 +51,13 @@ public PositionOutputStream create() {
throw new AlreadyExistsException("File already exists: %s", file);
}

if (!file.getParentFile().isDirectory() && !file.getParentFile().mkdirs()) {
throw new RuntimeIOException(
String.format(
"Failed to create the file's directory at %s.",
file.getParentFile().getAbsolutePath()));
}

try {
return new PositionFileOutputStream(new RandomAccessFile(file, "rw"));
} catch (FileNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package com.netflix.iceberg;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.hadoop.HadoopOutputFile;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.hadoop.HadoopFileIO;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.util.Tasks;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -50,6 +48,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
private static final String HIVE_LOCATION_FOLDER_NAME = "empty";

private final Configuration conf;
private final FileIO fileIo;

private TableMetadata currentMetadata = null;
private String currentMetadataLocation = null;
Expand All @@ -59,6 +58,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {

protected BaseMetastoreTableOperations(Configuration conf) {
this.conf = conf;
this.fileIo = new HadoopFileIO(conf);
}

@Override
Expand All @@ -85,22 +85,18 @@ public String hiveTableLocation() {
return String.format("%s/%s", baseLocation, HIVE_LOCATION_FOLDER_NAME);
}

public String dataLocation() {
return String.format("%s/%s", baseLocation, DATA_FOLDER_NAME);
}

protected String writeNewMetadata(TableMetadata metadata, int version) {
if (baseLocation == null) {
baseLocation = metadata.location();
}

String newFilename = newTableMetadataFilename(baseLocation, version);
OutputFile newMetadataLocation = HadoopOutputFile.fromPath(new Path(newFilename), conf);
String newTableMetadataFilePath = newTableMetadataFilePath(baseLocation, version);
OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath);

// write the new metadata
TableMetadataParser.write(metadata, newMetadataLocation);

return newFilename;
return newTableMetadataFilePath;
}

protected void refreshFromMetadataLocation(String newLocation) {
Expand All @@ -126,32 +122,21 @@ protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
}

@Override
public InputFile newInputFile(String path) {
return fromLocation(path, conf);
public String metadataFileLocation(String fileName) {
return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, fileName);
}

@Override
public OutputFile newMetadataFile(String filename) {
return HadoopOutputFile.fromPath(
new Path(newMetadataLocation(baseLocation, filename)), conf);
}

@Override
public void deleteFile(String file) {
Path path = new Path(file);
try {
getFS(path, conf).delete(path, false /* should be a file, not recursive */ );
} catch (IOException e) {
throw new RuntimeIOException(e);
}
public FileIO io() {
return fileIo;
}

@Override
public long newSnapshotId() {
return System.currentTimeMillis();
}

private String newTableMetadataFilename(String baseLocation, int newVersion) {
private String newTableMetadataFilePath(String baseLocation, int newVersion) {
return String.format("%s/%s/%05d-%s%s",
baseLocation,
METADATA_FOLDER_NAME,
Expand All @@ -160,22 +145,6 @@ private String newTableMetadataFilename(String baseLocation, int newVersion) {
getFileExtension(this.conf));
}

private static String newMetadataLocation(String baseLocation, String filename) {
return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, filename);
}

private static String parseBaseLocation(String metadataLocation) {
int lastSlash = metadataLocation.lastIndexOf('/');
int secondToLastSlash = metadataLocation.lastIndexOf('/', lastSlash);

// verify that the metadata file was contained in a "metadata" folder
String parentFolderName = metadataLocation.substring(secondToLastSlash + 1, lastSlash);
Preconditions.checkArgument(METADATA_FOLDER_NAME.equals(parentFolderName),
"Invalid metadata location, not in metadata/ folder: %s", metadataLocation);

return metadataLocation.substring(0, secondToLastSlash);
}

private static int parseVersion(String metadataLocation) {
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
int versionEnd = metadataLocation.indexOf('-', versionStart);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class BaseSnapshot implements Snapshot {
String... manifestFiles) {
this(ops, snapshotId, null, System.currentTimeMillis(),
Lists.transform(Arrays.asList(manifestFiles),
path -> new GenericManifestFile(ops.newInputFile(path), 0)));
path -> new GenericManifestFile(ops.io().newInputFile(path), 0)));
}

BaseSnapshot(TableOperations ops,
Expand Down Expand Up @@ -136,7 +136,7 @@ private void cacheChanges() {
// accumulate adds and deletes from all manifests.
// because manifests can be reused in newer snapshots, filter the changes by snapshot id.
for (String manifest : Iterables.transform(manifests, ManifestFile::path)) {
try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest))) {
for (ManifestEntry add : reader.addedFiles()) {
if (add.snapshotId() == snapshotId) {
adds.add(add.file().copy());
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/java/com/netflix/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.netflix.iceberg.expressions.Expression;
import com.netflix.iceberg.expressions.Expressions;
import com.netflix.iceberg.expressions.InclusiveManifestEvaluator;
import com.netflix.iceberg.expressions.Projections;
import com.netflix.iceberg.expressions.ResidualEvaluator;
import com.netflix.iceberg.io.CloseableIterable;
import com.netflix.iceberg.types.TypeUtil;
Expand Down Expand Up @@ -174,7 +173,7 @@ public CloseableIterable<FileScanTask> planFiles() {
Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
matchingManifests,
manifest -> {
ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()));
toClose.add(reader);
String schemaString = SchemaParser.toJson(reader.spec().schema());
String specString = PartitionSpecParser.toJson(reader.spec());
Expand Down
15 changes: 4 additions & 11 deletions core/src/main/java/com/netflix/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.iceberg.exceptions.CommitFailedException;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.util.Tasks;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -260,18 +258,13 @@ public void commit(TableMetadata base, TableMetadata metadata) {
}

@Override
public InputFile newInputFile(String path) {
return ops.newInputFile(path);
public FileIO io() {
return ops.io();
}

@Override
public OutputFile newMetadataFile(String filename) {
return ops.newMetadataFile(filename);
}

@Override
public void deleteFile(String path) {
ops.deleteFile(path);
public String metadataFileLocation(String fileName) {
return ops.metadataFileLocation(fileName);
}

@Override
Expand Down
50 changes: 50 additions & 0 deletions core/src/main/java/com/netflix/iceberg/FileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.netflix.iceberg;

import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;

import java.io.Serializable;

/**
* Pluggable module for reading, writing, and deleting files.
* <p>
* Both table metadata files and data files can be written and read by this module. Implementations
* must be serializable because various clients of Spark tables may initialize this once and pass
* it off to a separate module that would then interact with the streams.
*/
public interface FileIO extends Serializable {

/**
* Get a {@link InputFile} instance to read bytes from the file at the given path.
*/
InputFile newInputFile(String path);

/**
* Get a {@link OutputFile} instance to write bytes to the file at the given path.
*/
OutputFile newOutputFile(String path);

/**
* Delete the file at the given path.
*/
void deleteFile(String path);
}
2 changes: 1 addition & 1 deletion core/src/main/java/com/netflix/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public CloseableIterable<ManifestEntry> entries() {
Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
matchingManifests,
manifest -> {
ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()));
FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
toClose.add(reader);
return Iterables.filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ private ManifestFile filterManifest(Expression deleteExpression,
return manifest;
}

try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
Expression inclusiveExpr = Projections
.inclusive(reader.spec())
.project(deleteExpression);
Expand Down Expand Up @@ -460,7 +460,7 @@ private ManifestFile createManifest(int specId, List<ManifestFile> bin) throws I
try {

for (ManifestFile manifest : bin) {
try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
for (ManifestEntry entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted by this snapshot
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RemoveSnapshots implements ExpireSnapshots {
private final Consumer<String> defaultDelete = new Consumer<String>() {
@Override
public void accept(String file) {
ops.deleteFile(file);
ops.io().deleteFile(file);
}
};

Expand Down Expand Up @@ -161,7 +161,7 @@ public void commit() {
).run(manifest -> {
// even if the manifest is still used, it may contain files that can be deleted
// TODO: eliminate manifests with no deletes without scanning
try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
for (ManifestEntry entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
if (entry.status() == ManifestEntry.Status.DELETED &&
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/java/com/netflix/iceberg/SnapshotParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.util.JsonUtil;
import com.netflix.iceberg.util.Tasks;
import com.netflix.iceberg.util.ThreadPools;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
Expand Down Expand Up @@ -89,13 +86,13 @@ static Snapshot fromJson(TableOperations ops, JsonNode node) {
if (node.has(MANIFEST_LIST)) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.newInputFile(manifestList));
return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.io().newInputFile(manifestList));

} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
// loaded lazily, if it is needed
List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
location -> new GenericManifestFile(ops.newInputFile(location), 0));
location -> new GenericManifestFile(ops.io().newInputFile(location), 0));
return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
}
}
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public Snapshot apply() {

return new BaseSnapshot(ops,
snapshotId(), parentSnapshotId, System.currentTimeMillis(),
ops.newInputFile(manifestList.location()));
ops.io().newInputFile(manifestList.location()));

} else {
return new BaseSnapshot(ops,
Expand Down Expand Up @@ -185,16 +185,17 @@ protected void cleanAll() {
}

protected void deleteFile(String path) {
ops.deleteFile(path);
ops.io().deleteFile(path);
}

protected OutputFile manifestListPath() {
return ops.newMetadataFile(FileFormat.AVRO.addExtension(
String.format("snap-%d-%s", snapshotId(), commitUUID)));
return ops.io().newOutputFile(ops.metadataFileLocation(FileFormat.AVRO.addExtension(
String.format("snap-%d-%s", snapshotId(), commitUUID))));
}

protected OutputFile manifestPath(int i) {
return ops.newMetadataFile(FileFormat.AVRO.addExtension(commitUUID + "-m" + i));
return ops.io().newOutputFile(
ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + i)));
}

protected long snapshotId() {
Expand All @@ -205,7 +206,7 @@ protected long snapshotId() {
}

private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
int addedFiles = 0;
int existingFiles = 0;
Expand Down

0 comments on commit 733de8c

Please sign in to comment.