Skip to content

Commit

Permalink
GEOMESA-3366 FSDS - fix Hadoop 3.4 AWS compatibility (#3124)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Jun 7, 2024
1 parent f1aacf8 commit e4eceaf
Show file tree
Hide file tree
Showing 20 changed files with 526 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ zookeeper_install_version="%%zookeeper.version.recommended%%"
# required for hadoop - make sure it corresponds to the hadoop installed version
guava_install_version="%%accumulo.guava.version%%"

function version_ge() { test "$(echo "$@" | tr " " "\n" | sort -rV | head -n 1)" == "$1"; }

# gets the dependencies for this module
# args:
# $1 - current classpath
Expand Down
9 changes: 9 additions & 0 deletions geomesa-fs/geomesa-fs-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-fs-datastore_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
</dependencies>

<build>
Expand Down
21 changes: 21 additions & 0 deletions geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,32 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
</dependency>
<!-- aws v1 for hadoop 3.3 and earlier -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<scope>provided</scope>
</dependency>
<!-- aws v2 for hadoop 3.4 and later -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3-transfer-manager</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk.crt</groupId>
<artifactId>aws-crt</artifactId>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,49 @@

package org.locationtech.geomesa.fs.storage.common.s3

import com.amazonaws.services.s3.AmazonS3
import org.apache.accumulo.access.AccessExpression
import org.apache.hadoop.fs.Path
import org.geotools.api.feature.simple.SimpleFeature
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.security.SecurityUtils

import java.io.IOException
import java.nio.charset.StandardCharsets
import java.util.Base64

/**
* Creates a tag containing the base64 encoded summary visibility for the observed file
*
* @param s3 s3 client
* @param path file path
* @param tag tag name to use
*/
class S3VisibilityObserver(s3: AmazonS3, path: Path, tag: String) extends S3ObjectTagObserver(s3, path) {
abstract class AbstractS3VisibilityObserver(path: Path) extends FileSystemObserver {

private val visibilities = scala.collection.mutable.Set.empty[String]

private val (bucket, key) = {
val uri = path.toUri
val uriPath = uri.getPath
val key = if (uriPath.startsWith("/")) { uriPath.substring(1) } else { uriPath }
(uri.getHost, key)
}

override def flush(): Unit = {}

override def close(): Unit = {
try { makeTagRequest(bucket, key) } catch {
case e: Exception => throw new IOException("Error tagging object", e)
}
}
override def write(feature: SimpleFeature): Unit = {
val vis = SecurityUtils.getVisibility(feature)
if (vis != null) {
visibilities.add(vis)
}
}

override protected def tags(): Iterable[(String, String)] = {
if (visibilities.isEmpty) { Seq.empty } else {
private def makeTagRequest(bucket: String, key: String): Unit = {
if (visibilities.nonEmpty) {
val vis = visibilities.mkString("(", ")&(", ")")
// this call simplifies and de-duplicates the expression
val expression = AccessExpression.of(vis, /*normalize = */true).getExpression
Seq(tag -> Base64.getEncoder.encodeToString(expression.getBytes(StandardCharsets.UTF_8)))
makeTagRequest(bucket: String, key: String, Base64.getEncoder.encodeToString(expression.getBytes(StandardCharsets.UTF_8)))
}
}

protected def makeTagRequest(bucket: String, key: String, visibility: String): Unit
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,49 @@

package org.locationtech.geomesa.fs.storage.common.s3

import com.amazonaws.services.s3.AmazonS3
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory}

import java.io.IOException
import scala.util.control.NonFatal

/**
* Factory for S3VisibilityObserver
*/
class S3VisibilityObserverFactory extends FileSystemObserverFactory {
class S3VisibilityObserverFactory extends FileSystemObserverFactory with LazyLogging {

private var fs: FileSystem = _
private var s3: AmazonS3 = _
private var tag: String = _
private var delegate: FileSystemObserverFactory = _

override def init(conf: Configuration, root: Path, sft: SimpleFeatureType): Unit = {
try {
// use reflection to access to private client factory used by the s3a hadoop impl
fs = root.getFileSystem(conf)
val field = fs.getClass.getDeclaredField("s3")
field.setAccessible(true)
s3 = field.get(fs).asInstanceOf[AmazonS3]
tag = conf.get(S3VisibilityObserverFactory.TagNameConfig, S3VisibilityObserverFactory.DefaultTag)
if (S3VisibilityObserverFactory.UseV2) {
delegate = new v2.S3VisibilityObserverFactory()
} else {
delegate = new v1.S3VisibilityObserverFactory()
}
delegate.init(conf, root, sft)
} catch {
case e: Exception => throw new RuntimeException("Unable to get s3 client", e)
}
}

override def apply(path: Path): FileSystemObserver = new S3VisibilityObserver(s3, path, tag)

override def close(): Unit = {
s3 = null
if (fs != null) {
try {
fs.close()
} catch {
case e: Exception => throw new IOException("Error closing S3 filesystem", e)
} finally {
fs = null
}
}
}
override def apply(path: Path): FileSystemObserver = delegate.apply(path)

override def close(): Unit = if (delegate != null) { delegate.close() }
}

object S3VisibilityObserverFactory {
object S3VisibilityObserverFactory extends LazyLogging {

val TagNameConfig = "geomesa.fs.vis.tag"
val DefaultTag = "geomesa.file.visibility"

lazy private val UseV2: Boolean = try {
val versionRegex = """(\d+)\.(\d+)\..*""".r
val versionRegex(maj, min) = org.apache.hadoop.util.VersionInfo.getVersion
maj.toInt >= 3 && min.toInt >= 4
} catch {
case NonFatal(e) => logger.warn("Unable to evaluate hadoop version, defaulting to aws sdk v2: ", e); true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.common.s3
package v1

import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{ObjectTagging, SetObjectTaggingRequest, Tag}
import org.apache.hadoop.fs.Path

import java.util.Collections

/**
* Creates a tag containing the base64 encoded summary visibility for the observed file
*
* @param path file path
* @param s3 s3 client
* @param tag tag name to use
*/
class S3VisibilityObserver(val path: Path, s3: AmazonS3, tag: String) extends AbstractS3VisibilityObserver(path) {
override protected def makeTagRequest(bucket: String, key: String, visibility: String): Unit = {
val tagging = new ObjectTagging(Collections.singletonList(new Tag(tag, visibility)))
val request = new SetObjectTaggingRequest(bucket, key, tagging)
s3.setObjectTagging(request)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.common.s3
package v1

import com.amazonaws.services.s3.AmazonS3
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory}
import org.locationtech.geomesa.utils.io.CloseQuietly

import java.io.IOException

/**
* Visibility observer for aws sdk v1
*/
class S3VisibilityObserverFactory extends FileSystemObserverFactory {

private var fs: FileSystem = _
private var s3: AmazonS3 = _
private var tag: String = _

override def init(conf: Configuration, root: Path, sft: SimpleFeatureType): Unit = {
try {
// use reflection to access to private client factory used by the s3a hadoop impl
fs = root.getFileSystem(conf)
val field = fs.getClass.getDeclaredField("s3")
field.setAccessible(true)
s3 = field.get(fs).asInstanceOf[AmazonS3]
tag = conf.get(S3VisibilityObserverFactory.TagNameConfig, S3VisibilityObserverFactory.DefaultTag)
} catch {
case e: Exception => throw new RuntimeException("Unable to get s3 client", e)
}
}

override def apply(path: Path): FileSystemObserver = new S3VisibilityObserver(path, s3, tag)

override def close(): Unit = {
if (fs != null) {
val err = CloseQuietly(fs)
s3 = null
fs = null
err.foreach(e => throw new IOException("Error closing S3 filesystem", e))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.common.s3
package v2

import org.apache.hadoop.fs.Path
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.{PutObjectTaggingRequest, Tag, Tagging}

/**
* Creates a tag containing the base64 encoded summary visibility for the observed file
*
* @param path file path
* @param s3 s3 client
* @param tag tag name to use
*/
class S3VisibilityObserver(path: Path, s3: S3Client, tag: String) extends AbstractS3VisibilityObserver(path) {
override protected def makeTagRequest(bucket: String, key: String, visibility: String): Unit = {
val tagging = Tagging.builder().tagSet(Tag.builder.key(tag).value(visibility).build()).build()
val request = PutObjectTaggingRequest.builder.bucket(bucket).key(key).tagging(tagging).build()
s3.putObjectTagging(request)
}
}
Loading

0 comments on commit e4eceaf

Please sign in to comment.