Skip to content

Commit

Permalink
Add LayerReaderProvider and LayerWriterProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
echeipesh committed Jun 12, 2017
1 parent 767ff34 commit 6f0b7fd
Show file tree
Hide file tree
Showing 28 changed files with 267 additions and 72 deletions.
@@ -1 +1 @@
geotrellis.spark.io.accumulo.AccumuloAttributeStoreProvider
geotrellis.spark.io.accumulo.AccumuloLayerProvider
@@ -0,0 +1 @@
geotrellis.spark.io.accumulo.AccumuloLayerProvider
@@ -0,0 +1 @@
geotrellis.spark.io.accumulo.AccumuloLayerProvider
Expand Up @@ -19,11 +19,12 @@ package geotrellis.spark.io.accumulo
import org.apache.accumulo.core.client._
import org.apache.accumulo.core.client.mapreduce.{AbstractInputFormat => AIF, AccumuloOutputFormat => AOF}
import org.apache.accumulo.core.client.mock.MockInstance
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken
import org.apache.accumulo.core.client.security.tokens.{AuthenticationToken, PasswordToken}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job

import scala.collection.JavaConversions._
import java.net.URI

trait AccumuloInstance extends Serializable {
def connector: Connector
Expand All @@ -50,6 +51,18 @@ object AccumuloInstance {
val tokenClass = token.getClass.getCanonicalName
BaseAccumuloInstance(instanceName, zookeeper, user, (tokenClass, tokenBytes))
}

def apply(uri: URI): AccumuloInstance = {
import geotrellis.util.UriUtils._

val zookeeper = uri.getHost
val instance = uri.getPath.drop(1)
val (user, pass) = getUserInfo(uri)
AccumuloInstance(
instance, zookeeper,
user.getOrElse("root"),
new PasswordToken(pass.getOrElse("")))
}
}

case class BaseAccumuloInstance(
Expand Down
Expand Up @@ -18,6 +18,8 @@ package geotrellis.spark.io.accumulo

import geotrellis.spark._
import geotrellis.spark.io._
import geotrellis.util.UriUtils
import org.apache.spark.SparkContext
import org.apache.accumulo.core.client.security.tokens.PasswordToken
import java.net.URI

Expand All @@ -28,22 +30,31 @@ import java.net.URI
*
* Metadata table name is optional, not provided default value will be used.
*/
class AccumuloAttributeStoreProvider extends AttributeStoreProvider {
class AccumuloLayerProvider extends AttributeStoreProvider with LayerReaderProvider {
def canProcess(uri: URI): Boolean = uri.getScheme.toLowerCase == "accumulo"

def attributeStore(uri: URI): AttributeStore = {
val zookeeper = uri.getHost
val instance = uri.getPath.drop(1)
val (user, pass) = getUserInfo(uri)
val accumuloInstance = AccumuloInstance(
instance, zookeeper,
user.getOrElse("root"),
new PasswordToken(pass.getOrElse("")))
val instance = AccumuloInstance(uri)
val attributeTable = uri.getFragment

if (null == attributeTable)
AccumuloAttributeStore(accumuloInstance)
AccumuloAttributeStore(instance)
else
AccumuloAttributeStore(accumuloInstance, attributeTable)
AccumuloAttributeStore(instance, attributeTable)
}

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
val instance = AccumuloInstance(uri)

new AccumuloLayerReader(store)(sc, instance)
}

def layerWriter(uri: URI, store: AttributeStore): LayerWriter[LayerId] = {
val instance = AccumuloInstance(uri)
val params = UriUtils.getParams(uri)
val table = params.getOrElse("table",
throw new IllegalArgumentException("Missing required URI parameter: table"))

AccumuloLayerWriter(instance, store, table)
}
}
@@ -1 +1 @@
geotrellis.spark.io.cassandra.CassandraAttributeStoreProvider
geotrellis.spark.io.cassandra.CassandraLayerProvider
@@ -0,0 +1 @@
geotrellis.spark.io.cassandra.CassandraLayerProvider
@@ -0,0 +1 @@
geotrellis.spark.io.cassandra.CassandraLayerProvider
Expand Up @@ -19,6 +19,26 @@ package geotrellis.spark.io.cassandra
import com.datastax.driver.core.policies.{DCAwareRoundRobinPolicy, TokenAwarePolicy}
import com.datastax.driver.core.{Cluster, Session}
import com.typesafe.config.ConfigFactory
import java.net.URI

object CassandraInstance {
def apply(uri: URI): CassandraInstance = {
import geotrellis.util.UriUtils._

val zookeeper = uri.getHost
val port = Option(uri.getPort).getOrElse(2181)
val (user, pass) = getUserInfo(uri)
val keyspace = Option(uri.getPath.drop(1))
.getOrElse(Cassandra.cfg.getString("keyspace"))
val attributeTable = Option(uri.getFragment)
.getOrElse(Cassandra.cfg.getString("catalog"))

BaseCassandraInstance(
List(zookeeper),
user.getOrElse(""),
pass.getOrElse(""))
}
}

trait CassandraInstance extends Serializable {
val hosts: Seq[String]
Expand Down Expand Up @@ -110,4 +130,4 @@ object Cassandra {
val instance = BaseCassandraInstance(hosts, username, password, replicationStrategy, replicationFactor, localDc, usedHostsPerRemoteDc, allowRemoteDCsForLocalConsistencyLevel)
try block(instance) finally instance.closeAsync
}
}
}
Expand Up @@ -18,7 +18,8 @@ package geotrellis.spark.io.cassandra

import geotrellis.spark._
import geotrellis.spark.io._

import geotrellis.util.UriUtils
import org.apache.spark.SparkContext
import java.net.URI


Expand All @@ -28,26 +29,35 @@ import java.net.URI
*
* Metadata table name is optional, not provided default value will be used.
*/
class CassandraAttributeStoreProvider extends AttributeStoreProvider {
class CassandraLayerProvider extends AttributeStoreProvider with LayerReaderProvider {
def canProcess(uri: URI): Boolean = uri.getScheme.toLowerCase == "cassandra"

def attributeStore(uri: URI): AttributeStore = {
val zookeeper = uri.getHost
val port = Option(uri.getPort).getOrElse(2181)
val (user, pass) = getUserInfo(uri)
val instance = CassandraInstance(uri)
val keyspace = Option(uri.getPath.drop(1))
.getOrElse(Cassandra.cfg.getString("keyspace"))
val attributeTable = Option(uri.getFragment)
.getOrElse(Cassandra.cfg.getString("catalog"))

val instance = BaseCassandraInstance(
List(zookeeper),
user.getOrElse(""),
pass.getOrElse(""))

if (null == attributeTable)
CassandraAttributeStore(instance)
else
CassandraAttributeStore(instance, keyspace, attributeTable)
}

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
val instance = CassandraInstance(uri)
new CassandraLayerReader(store, instance)(sc)
}

def layerWriter(uri: URI, store: AttributeStore): LayerWriter[LayerId] = {
val instance = CassandraInstance(uri)
val keyspace = Option(uri.getPath.drop(1))
.getOrElse(Cassandra.cfg.getString("keyspace"))
val params = UriUtils.getParams(uri)
val table = params.getOrElse("table",
throw new IllegalArgumentException("Missing required URI parameter: table"))

new CassandraLayerWriter(store, instance, keyspace, table)
}
}
@@ -1 +1 @@
geotrellis.spark.io.hbase.HBaseAttributeStoreProvider
geotrellis.spark.io.hbase.HBaseLayerProvider
@@ -0,0 +1 @@
geotrellis.spark.io.hbase.HBaseLayerProvider
@@ -0,0 +1 @@
geotrellis.spark.io.hbase.HBaseLayerProvider
16 changes: 16 additions & 0 deletions hbase/src/main/scala/geotrellis/spark/io/hbase/HBaseInstance.scala
Expand Up @@ -18,6 +18,22 @@ package geotrellis.spark.io.hbase

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client._
import java.net.URI

object HBaseInstance {
def apply(uri: URI): HBaseInstance = {
import geotrellis.util.UriUtils._

val zookeeper = uri.getHost
val port = Option(uri.getPort).getOrElse(2181)
val attributeTable = uri.getFragment
val params = getParams(uri)
HBaseInstance(
List(zookeeper),
params.getOrElse("master", ""),
port.toString)
}
}

case class HBaseInstance(zookeepers: Seq[String], master: String, clientPort: String = "2181") extends Serializable {
@transient lazy val conf = {
Expand Down
Expand Up @@ -18,32 +18,40 @@ package geotrellis.spark.io.hbase

import geotrellis.spark._
import geotrellis.spark.io._

import geotrellis.util.UriUtils
import org.apache.spark.SparkContext
import java.net.URI


/**
* Provides [[HBaseAttributeStore]] instance for URI with `cassandra` scheme.
* Provides [[HBaseAttributeStore]] instance for URI with `hbase` scheme.
* ex: `hbase://zookeeper[:port][?master=host]#metadata-table-name]`
*
* Metadata table name is optional, not provided default value will be used.
*/
class HBaseAttributeStoreProvider extends AttributeStoreProvider {
class HBaseLayerProvider extends AttributeStoreProvider with LayerReaderProvider {
def canProcess(uri: URI): Boolean = uri.getScheme.toLowerCase == "hbase"

def attributeStore(uri: URI): AttributeStore = {
val zookeeper = uri.getHost
val port = Option(uri.getPort).getOrElse(2181)
val instance = HBaseInstance(uri)
val attributeTable = uri.getFragment
val params = getParams(uri)
val instance = HBaseInstance(
List(zookeeper),
params.getOrElse("master", ""),
port.toString)

if (null == attributeTable)
HBaseAttributeStore(instance)
else
HBaseAttributeStore(instance, attributeTable)
}

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
val instance = HBaseInstance(uri)
new HBaseLayerReader(store, instance)(sc)
}

def layerWriter(uri: URI, store: AttributeStore): LayerWriter[LayerId] = {
val instance = HBaseInstance(uri)
val params = UriUtils.getParams(uri)
val table = params.getOrElse("table",
throw new IllegalArgumentException("Missing required URI parameter: table"))
new HBaseLayerWriter(store, instance, table)
}
}
@@ -1 +1 @@
geotrellis.spark.io.s3.S3AttributeStoreProvider
geotrellis.spark.io.s3.S3LayerProvider
@@ -0,0 +1 @@
geotrellis.spark.io.s3.S3LayerProvider
@@ -0,0 +1 @@
geotrellis.spark.io.s3.S3LayerProvider
Expand Up @@ -18,19 +18,28 @@ package geotrellis.spark.io.s3

import geotrellis.spark._
import geotrellis.spark.io._
import org.apache.spark._
import java.net.URI

/**
* Provides [[S3AttributeStore]] instance for URI with `s3` scheme.
* Provides [[S3LayerReader]] instance for URI with `s3` scheme.
* The uri represents S3 bucket an prefix of catalog root.
* ex: `s3://<bucket>/<prefix-to-catalog>`
*/
class S3AttributeStoreProvider extends AttributeStoreProvider {
class S3LayerProvider extends AttributeStoreProvider
with LayerReaderProvider with LayerWriterProvider {
def canProcess(uri: URI): Boolean = uri.getScheme.toLowerCase == "s3"

def attributeStore(uri: URI): AttributeStore = {
val bucket = uri.getAuthority
val prefix = uri.getPath.drop(1)
new S3AttributeStore(bucket, prefix)
new S3AttributeStore(bucket = uri.getAuthority, prefix = uri.getPath.drop(1))
}

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
new S3LayerReader(store)(sc)
}

def layerWriter(uri: URI, store: AttributeStore): LayerWriter[LayerId] = {
// TODO: encoder ACL changes in putObjectModifier
new S3LayerWriter(store, bucket = uri.getAuthority, keyPrefix = uri.getPath.drop(1))
}
}
@@ -1,2 +1,2 @@
geotrellis.spark.io.file.FileAttributeStoreProvider
geotrellis.spark.io.hadoop.HadoopAttributeStoreProvider
geotrellis.spark.io.file.FileLayerProvider
geotrellis.spark.io.hadoop.HadoopLayerProvider
@@ -0,0 +1,2 @@
geotrellis.spark.io.file.FileLayerProvider
geotrellis.spark.io.hadoop.HadoopLayerProvider
@@ -0,0 +1,2 @@
geotrellis.spark.io.file.FileLayerProvider
geotrellis.spark.io.hadoop.HadoopLayerProvider
Expand Up @@ -24,30 +24,4 @@ trait AttributeStoreProvider {
def canProcess(uri: URI): Boolean

def attributeStore(uri: URI): AttributeStore

/** Parse URI user and password */
protected def getUserInfo(uri: URI): (Option[String], Option[String]) = {
val info = uri.getUserInfo
if (null == info)
None -> None
else {
val chunk = info.split(":")
if (chunk.length == 1)
Some(chunk(0)) -> None
else
Some(chunk(0)) -> Some(chunk(1))
}
}

protected def getParams(uri: URI): Map[String, String] = {
val query = uri.getQuery
if (null == query)
Map.empty[String, String]
else {
query.split("&").map{ param =>
val arr = param.split("=")
arr(0) -> arr(1)
}.toMap
}
}
}
27 changes: 27 additions & 0 deletions spark/src/main/scala/geotrellis/spark/io/LayerReaderProvider.scala
@@ -0,0 +1,27 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io

import geotrellis.spark._
import org.apache.spark._
import java.net.URI

trait LayerReaderProvider {
def canProcess(uri: URI): Boolean

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId]
}

0 comments on commit 6f0b7fd

Please sign in to comment.