Skip to content

Commit

Permalink
SFTP support
Browse files Browse the repository at this point in the history
  • Loading branch information
mmolimar committed Jul 5, 2020
1 parent 4e951ac commit f137ea1
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<orc.version>1.6.3</orc.version>
<univocity.version>2.8.4</univocity.version>
<cron-utils.version>9.0.2</cron-utils.version>
<jsch.version>0.1.54</jsch.version>
<junit-jupiter.version>5.6.2</junit-jupiter.version>
<easymock.version>4.2</easymock.version>
<powermock.version>2.0.7</powermock.version>
Expand Down Expand Up @@ -90,6 +91,11 @@
<artifactId>cron-utils</artifactId>
<version>${cron-utils.version}</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down Expand Up @@ -260,7 +266,7 @@
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<enabled>false</enabled>
</snapshots>
<url>http://packages.confluent.io/maven/</url>
</repository>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ private Map<String, Object> customConfigs() {
private void configFs(Map<String, Object> customConfigs) throws IOException {
for (String uri : this.conf.getFsUris()) {
Configuration fsConfig = new Configuration();
fsConfig.set("fs.sftp.impl", "org.apache.hadoop.fs.sftp.SFTPFileSystem");
customConfigs.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(FsSourceTaskConfig.POLICY_PREFIX_FS))
.forEach(entry -> fsConfig.set(entry.getKey().replace(FsSourceTaskConfig.POLICY_PREFIX_FS, ""),
Expand Down Expand Up @@ -104,7 +105,9 @@ private String convert(String uri) {
@Override
public List<String> getURIs() {
List<String> uris = new ArrayList<>();
fileSystems.forEach(fs -> uris.add(fs.getWorkingDirectory().toString()));
fileSystems.forEach(fs ->
uris.add(Optional.ofNullable(fs.getWorkingDirectory()).orElse(new Path("./")).toString())
);
return uris;
}

Expand Down Expand Up @@ -210,7 +213,7 @@ public FileReader offer(FileMetadata metadata, Map<String, Object> offsetMap) {
FileSystem current = fileSystems.stream()
.filter(fs -> metadata.getPath().startsWith(fs.getWorkingDirectory().toString()))
.findFirst()
.orElse(null);
.orElse(fileSystems.stream().findFirst().orElseThrow(() -> new ConnectException(("Invalid FS."))));

Supplier<FileReader> makeReader = () -> ReflectionUtils.makeReader(
(Class<? extends FileReader>) conf.getClass(FsSourceTaskConfig.FILE_READER_CLASS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public static Policy makePolicy(Class<? extends Policy> clazz, FsSourceTaskConfi
private static <T> T make(Class<T> clazz, Object... args) {
try {
Class[] constClasses = Arrays.stream(args).map(Object::getClass).toArray(Class[]::new);

Constructor<T> constructor = ConstructorUtils.getMatchingAccessibleConstructor(clazz, constClasses);

return constructor.newInstance(args);
} catch (IllegalAccessException |
InstantiationException |
Expand Down

0 comments on commit f137ea1

Please sign in to comment.