Skip to content

Commit

Permalink
Remove caching from AttributeStoreProviders
Browse files Browse the repository at this point in the history
Signed-off-by: Eugene Cheipesh <echeipesh@gmail.com>
  • Loading branch information
echeipesh committed Jul 14, 2017
1 parent 63e1511 commit 3e72f2f
Show file tree
Hide file tree
Showing 13 changed files with 6 additions and 51 deletions.
1 change: 0 additions & 1 deletion accumulo/build.sbt
Expand Up @@ -7,7 +7,6 @@ libraryDependencies ++= Seq(
exclude("org.apache.hadoop", "hadoop-client"),
sparkCore % "provided",
spire,
scaffeine,
scalatest % "test")

fork in Test := false
Expand Down
Expand Up @@ -19,16 +19,11 @@ package geotrellis.spark.io.accumulo
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.util.UriUtils
import com.github.blemale.scaffeine.{Scaffeine, Cache}
import org.apache.spark.SparkContext
import org.apache.accumulo.core.client.security.tokens.PasswordToken
import com.typesafe.config.ConfigFactory
import java.net.URI

object AccumuloLayerProvider {
private val cache: Cache[(String, String), AttributeStore] = Scaffeine().softValues().build()
}

/**
* Provides [[AccumuloAttributeStore]] instance for URI with `accumulo` scheme.
* ex: `accumulo://[user[:password]@]zookeeper/instance-name[?attributes=table1[&layers=table2]]`
Expand All @@ -44,9 +39,7 @@ class AccumuloLayerProvider extends AttributeStoreProvider with LayerReaderProvi
val params = UriUtils.getParams(uri)
val attributeTable = params.getOrElse("attributes",
ConfigFactory.load().getString("geotrellis.accumulo.catalog"))

AccumuloLayerProvider.cache.get(uri.getSchemeSpecificPart -> attributeTable,
_ => AccumuloAttributeStore(instance, attributeTable))
AccumuloAttributeStore(instance, attributeTable)
}

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
Expand Down
1 change: 0 additions & 1 deletion cassandra/build.sbt
Expand Up @@ -8,7 +8,6 @@ libraryDependencies ++= Seq(
ExclusionRule("org.slf4j"), ExclusionRule("io.spray"), ExclusionRule("com.typesafe.akka")
) exclude("org.apache.hadoop", "hadoop-client"),
sparkCore % "provided",
scaffeine,
spire,
scalatest % "test")

Expand Down
Expand Up @@ -19,14 +19,9 @@ package geotrellis.spark.io.cassandra
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.util.UriUtils
import com.github.blemale.scaffeine.{Scaffeine, Cache}
import org.apache.spark.SparkContext
import java.net.URI

object CassandraLayerProvider {
private val cache: Cache[(String, String), AttributeStore] = Scaffeine().softValues().build()
}

/**
* Provides [[CassandraAttributeStore]] instance for URI with `cassandra` scheme.
* ex: `cassandra://[user:password@]zookeeper[:port][/keyspace][?attributes=table1[&layers=table2]]`
Expand All @@ -44,9 +39,7 @@ class CassandraLayerProvider extends AttributeStoreProvider with LayerReaderProv
Cassandra.cfg.getString("catalog"))
val keyspace = Option(uri.getPath.drop(1)).getOrElse(
Cassandra.cfg.getString("keyspace"))

CassandraLayerProvider.cache.get(uri.getSchemeSpecificPart -> attributeTable,
_ => CassandraAttributeStore(instance, keyspace, attributeTable))
CassandraAttributeStore(instance, keyspace, attributeTable)
}

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
Expand Down
1 change: 0 additions & 1 deletion hbase/build.sbt
Expand Up @@ -9,7 +9,6 @@ libraryDependencies ++= Seq(
"org.codehaus.jackson" % "jackson-core-asl" % "1.9.13",
sparkCore % "provided",
spire,
scaffeine,
scalatest % "test")

fork in Test := false
Expand Down
Expand Up @@ -19,15 +19,10 @@ package geotrellis.spark.io.hbase
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.util.UriUtils
import com.github.blemale.scaffeine.{Scaffeine, Cache}
import com.typesafe.config.ConfigFactory
import org.apache.spark.SparkContext
import java.net.URI

object HBaseLayerProvider {
private val cache: Cache[(String, String), AttributeStore] = Scaffeine().softValues().build()
}

/**
* Provides [[HBaseAttributeStore]] instance for URI with `hbase` scheme.
* ex: `hbase://zookeeper[:port][?master=host][?attributes=table1[&layers=table2]]`
Expand All @@ -43,9 +38,7 @@ class HBaseLayerProvider extends AttributeStoreProvider with LayerReaderProvider
val params = UriUtils.getParams(uri)
val attributeTable = params.getOrElse("attributes",
ConfigFactory.load().getString("geotrellis.hbase.catalog"))

HBaseLayerProvider.cache.get(uri.getSchemeSpecificPart -> attributeTable,
_ => HBaseAttributeStore(instance, attributeTable))
HBaseAttributeStore(instance, attributeTable)
}

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
Expand Down
1 change: 0 additions & 1 deletion project/Dependencies.scala
Expand Up @@ -17,7 +17,6 @@
import sbt._

object Dependencies {
val scaffeine = "com.github.blemale" %% "scaffeine" % "2.2.0"
val typesafeConfig = "com.typesafe" % "config" % "1.3.1"
val logging = "com.typesafe.scala-logging" %% "scala-logging" % "3.5.0"
val scalatest = "org.scalatest" %% "scalatest" % "3.0.1"
Expand Down
1 change: 0 additions & 1 deletion s3/build.sbt
Expand Up @@ -5,7 +5,6 @@ libraryDependencies ++= Seq(
sparkCore % "provided",
awsSdkS3,
spire,
scaffeine,
scalatest % "test")

fork in Test := false
Expand Down
Expand Up @@ -20,13 +20,8 @@ import geotrellis.spark._
import geotrellis.spark.io._
import org.apache.spark._
import com.amazonaws.services.s3.AmazonS3URI
import com.github.blemale.scaffeine.{Scaffeine, Cache}
import java.net.URI

object S3LayerProvider {
private val cache: Cache[String, AttributeStore] = Scaffeine().softValues().build()
}

/**
* Provides [[S3LayerReader]] instance for URI with `s3` scheme.
* The uri represents S3 bucket an prefix of catalog root.
Expand All @@ -38,8 +33,7 @@ class S3LayerProvider extends AttributeStoreProvider

def attributeStore(uri: URI): AttributeStore = {
val s3Uri = new AmazonS3URI(uri)
S3LayerProvider.cache.get(uri.getSchemeSpecificPart,
_ => new S3AttributeStore(bucket = s3Uri.getBucket, prefix = s3Uri.getKey))
new S3AttributeStore(bucket = s3Uri.getBucket, prefix = s3Uri.getKey)
}

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
Expand Down
1 change: 0 additions & 1 deletion spark/build.sbt
Expand Up @@ -11,7 +11,6 @@ libraryDependencies ++= Seq(
monocleCore, monocleMacro,
chronoscala,
scalazStream,
scaffeine,
scalatest % "test"
)

Expand Down
Expand Up @@ -60,7 +60,6 @@ object AttributeStore {
/**
* Produce AttributeStore instance based on URI description.
* This method uses instances of [[AttributeServiceProvider]] loaded through Java SPI.
* Repeated calls to this function will return previosly instantiated instances.
*/
def apply(uri: URI): AttributeStore = {
import scala.collection.JavaConversions._
Expand Down
Expand Up @@ -18,15 +18,10 @@ package geotrellis.spark.io.file

import geotrellis.spark._
import geotrellis.spark.io._
import com.github.blemale.scaffeine.{Scaffeine, Cache}
import org.apache.spark.SparkContext
import java.net.URI
import java.io.File

object FileLayerProvider {
private val cache: Cache[String, AttributeStore] = Scaffeine().softValues().build()
}

/**
* Provides [[FileLayerReader]] instance for URI with `file` scheme.
* The uri represents local path to catalog root.
Expand All @@ -39,8 +34,7 @@ class FileLayerProvider extends AttributeStoreProvider

def attributeStore(uri: URI): AttributeStore = {
val file = new File(uri)
FileLayerProvider.cache.get(file.getCanonicalPath,
canonicalPath => new FileAttributeStore(canonicalPath))
new FileAttributeStore(file.getCanonicalPath)
}

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
Expand Down
Expand Up @@ -19,16 +19,11 @@ package geotrellis.spark.io.hadoop
import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.util.UriUtils
import com.github.blemale.scaffeine.{Scaffeine, Cache}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import java.net.URI

object HadoopLayerProvider {
private val cache: Cache[Path, AttributeStore] = Scaffeine().softValues().build()
}

/**
* Provides [[HadoopAttributeStore]] instance for URI with `hdfs`, `hdfs+file`, `s3n`, and `s3a` schemes.
* The uri represents Hadoop [[Path]] of catalog root.
Expand All @@ -49,7 +44,7 @@ class HadoopLayerProvider extends AttributeStoreProvider
def attributeStore(uri: URI): AttributeStore = {
val path = new Path(trim(uri))
val conf = new Configuration()
HadoopLayerProvider.cache.get(path, new HadoopAttributeStore(_, conf))
new HadoopAttributeStore(path, conf)
}

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
Expand Down

0 comments on commit 3e72f2f

Please sign in to comment.