Skip to content

Commit

Permalink
fix S3 filesystem in all filescan operators
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobin Baker committed Aug 25, 2017
1 parent 0141614 commit 5732f48
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 29 deletions.
13 changes: 2 additions & 11 deletions src/edu/washington/escience/myria/io/UriSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import edu.washington.escience.myria.coordinator.CatalogException;
import edu.washington.escience.myria.util.MyriaUtils;

public class UriSink implements DataSink {
/** Required for Java serialization. */
Expand All @@ -24,17 +25,7 @@ public UriSink(@JsonProperty(value = "uri", required = true) final String uri)
throws CatalogException, URISyntaxException {
this.uri = URI.create(Objects.requireNonNull(uri, "Parameter uri cannot be null"));
/* Force using the Hadoop S3A FileSystem */
if (this.uri.getScheme().equals("s3")) {
this.uri =
new URI(
"s3a",
this.uri.getUserInfo(),
this.uri.getHost(),
this.uri.getPort(),
this.uri.getPath(),
this.uri.getQuery(),
this.uri.getFragment());
}
this.uri = MyriaUtils.normalizeS3Uri(this.uri);
if (!this.uri.getScheme().equals("hdfs") && !this.uri.getScheme().equals("s3a")) {
throw new CatalogException("URI must be an HDFS or S3 URI");
}
Expand Down
14 changes: 3 additions & 11 deletions src/edu/washington/escience/myria/io/UriSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import edu.washington.escience.myria.util.MyriaUtils;

/**
* A data source that pulls data from a specified URI. The URI may be: a path on the local file system; an HDFS link; a
* web link; an AWS link; and perhaps more.
Expand Down Expand Up @@ -51,17 +53,7 @@ public UriSource(@JsonProperty(value = "uri", required = true) final String uri)
parsedUri =
URI.create(Objects.requireNonNull(uri, "Parameter uri to UriSource may not be null"));
/* Force using the Hadoop S3A FileSystem */
if (parsedUri.getScheme().equals("s3")) {
parsedUri =
new URI(
"s3a",
parsedUri.getUserInfo(),
parsedUri.getHost(),
parsedUri.getPort(),
parsedUri.getPath(),
parsedUri.getQuery(),
parsedUri.getFragment());
}
parsedUri = MyriaUtils.normalizeS3Uri(parsedUri);
}

@Override
Expand Down
10 changes: 6 additions & 4 deletions src/edu/washington/escience/myria/operator/NChiladaFileScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -30,6 +31,7 @@
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;
import edu.washington.escience.myria.storage.TupleUtils;
import edu.washington.escience.myria.util.MyriaUtils;

/**
* Parse NChilada file formats. See <a
Expand Down Expand Up @@ -285,10 +287,10 @@ private InputStream getGroupFileStream(final String groupFilePath) throws DbExce
InputStream groupInputStreamLocal;
try {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(groupFilePath), conf);
FileSystem fs = FileSystem.get(MyriaUtils.normalizeS3Uri(URI.create(groupFilePath)), conf);
Path rootPath = new Path(groupFilePath);
groupInputStreamLocal = fs.open(rootPath);
} catch (IOException e) {
} catch (IOException | URISyntaxException e) {
throw new DbException(e);
}
return groupInputStreamLocal;
Expand All @@ -306,7 +308,7 @@ private Map<String, DataInput> getFilesToDataInput(final String path) throws DbE
FileSystem fs;
Map<String, DataInput> map = new HashMap<>();
try {
fs = FileSystem.get(URI.create(path), conf);
fs = FileSystem.get(MyriaUtils.normalizeS3Uri(URI.create(path)), conf);
Path rootPath = new Path(path + File.separator);
FileStatus[] statii = fs.listStatus(rootPath);
if (statii == null || statii.length == 0) {
Expand All @@ -319,7 +321,7 @@ private Map<String, DataInput> getFilesToDataInput(final String path) throws DbE
DataInput dataInputStream = fs.open(p);
map.put(fileName, dataInputStream);
}
} catch (IOException e) {
} catch (IOException | URISyntaxException e) {
throw new DbException(e);
}
return map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;
import edu.washington.escience.myria.storage.TupleUtils;
import edu.washington.escience.myria.util.MyriaUtils;

/**
* Read and merge Tipsy bin file, iOrder ascii file and group number ascii file.
Expand Down Expand Up @@ -346,7 +347,7 @@ protected Schema generateSchema() {

private static InputStream openFileOrUrlInputStream(String filenameOrUrl) throws DbException {
try {
URI uri = new URI(filenameOrUrl);
URI uri = MyriaUtils.normalizeS3Uri(new URI(filenameOrUrl));
if (uri.getScheme() == null) {
return openFileInputStream(filenameOrUrl);
} else if (uri.getScheme().equals("hdfs")) {
Expand Down
27 changes: 25 additions & 2 deletions src/edu/washington/escience/myria/util/MyriaUtils.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package edu.washington.escience.myria.util;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.apache.commons.io.IOUtils;
import org.joda.time.DateTime;

import com.google.common.base.Preconditions;
import edu.washington.escience.myria.io.AmazonS3Source;

import edu.washington.escience.myria.io.UriSource;

/**
Expand Down Expand Up @@ -165,4 +166,26 @@ public static ByteBuffer getBlob(final String filename) {
return null;
}
}

/**
* This function replaces the `s3` scheme with the `s3a` scheme,
* to force use of the new S3A Hadoop filesystem.
* @param uri input URI
* @return normalized URI
* @throws URISyntaxException
*/
public static URI normalizeS3Uri(URI uri) throws URISyntaxException {
if (uri.getScheme() != null && uri.getScheme().equals("s3")) {
uri =
new URI(
"s3a",
uri.getUserInfo(),
uri.getHost(),
uri.getPort(),
uri.getPath(),
uri.getQuery(),
uri.getFragment());
}
return uri;
}
}

0 comments on commit 5732f48

Please sign in to comment.