/
AccumuloWriteStrategy.scala
152 lines (133 loc) · 5.93 KB
/
AccumuloWriteStrategy.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package geotrellis.spark.store.accumulo
import geotrellis.store.accumulo._
import geotrellis.store.accumulo.conf.AccumuloConfig
import geotrellis.store.hadoop._
import geotrellis.spark.util._
import geotrellis.spark.store._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.fs.Path
import org.apache.spark.rdd.RDD
import org.apache.accumulo.core.data.{Key, Mutation, Value}
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat
import org.apache.accumulo.core.client.BatchWriterConfig
import cats.effect.IO
import cats.syntax.apply._
import scala.concurrent.ExecutionContext
import java.util.UUID
import java.util.concurrent.Executors
object AccumuloWriteStrategy {
val defaultThreadCount = AccumuloConfig.threads.rdd.writeThreads
def DEFAULT = HdfsWriteStrategy("/geotrellis-ingest")
}
sealed trait AccumuloWriteStrategy {
def write(kvPairs: RDD[(Key, Value)], instance: AccumuloInstance, table: String): Unit
}
/**
* This strategy will perfom Accumulo bulk ingest. Bulk ingest requires that sorted records be written to the
* filesystem, preferbly HDFS, before Accumulo is able to ingest them. After the ingest is finished
* the nodes will likely go through a period of high load as they perform major compactions.
*
* Note: Giving relative URLs will cause HDFS to use the `fs.defaultFS` property in `core-site.xml`.
* If not specified this will default to local ('file:/') system, this is undesriable.
*
* @param ingestPath Path where spark will write RDD records for ingest
*/
case class HdfsWriteStrategy(ingestPath: Path) extends AccumuloWriteStrategy {
/** Requires that the RDD be pre-sorted */
def write(kvPairs: RDD[(Key, Value)], instance: AccumuloInstance, table: String): Unit = {
val sc = kvPairs.sparkContext
val job = Job.getInstance(sc.hadoopConfiguration)
instance.setAccumuloConfig(job)
val conf = job.getConfiguration
val outPath = HdfsUtils.tmpPath(ingestPath, UUID.randomUUID.toString, conf)
val failuresPath = outPath.suffix("-failures")
HdfsUtils.ensurePathExists(failuresPath, conf)
kvPairs
.sortByKey()
.saveAsNewAPIHadoopFile(
outPath.toString,
classOf[Key],
classOf[Value],
classOf[AccumuloFileOutputFormat],
conf)
val ops = instance.connector.tableOperations()
ops.importDirectory(table, outPath.toString, failuresPath.toString, true)
// cleanup ingest directories on success
val fs = ingestPath.getFileSystem(conf)
if( fs.exists(new Path(outPath, "_SUCCESS")) ) {
fs.delete(outPath, true)
fs.delete(failuresPath, true)
} else {
throw new java.io.IOException(s"Accumulo bulk ingest failed at $ingestPath")
}
}
}
object HdfsWriteStrategy {
def apply(ingestPath: String): HdfsWriteStrategy = HdfsWriteStrategy(new Path(ingestPath))
}
/**
* This strategy will create one BatchWriter per partition and attempt to stream the records to the target tablets.
* In order to gain some parallism this strategy will create a number of splits in the target table equal to the number
* of tservers in the cluster. This is suitable for smaller ingests, or where HdfsWriteStrategy is otherwise not possible.
*
* This strategy will not create splits before starting to write. If you wish to do that use [[AccumuloUtils.getSplits]] first.
*
* There is a problem in Accumulo 1.6 (fixed in 1.7) where the split creation does not wait for the resulting
* empty tablets to distribute through the cluster before returning. This will create a warm-up period where the
* pressure the ingest writers on that node will delay tablet re-balancing.
*
* The speed of the ingest can be improved by setting `tserver.wal.sync.method=hflush` in accumulo shell.
* Note: this introduces higher chance of data loss due to sudden node failure.
*
* BatchWriter is notified of the tablet migrations and will follow them around the cluster.
*
* @param config Configuration for the BatchWriters
*/
case class SocketWriteStrategy(
@transient config: BatchWriterConfig = new BatchWriterConfig().setMaxMemory(128*1024*1024).setMaxWriteThreads(AccumuloWriteStrategy.defaultThreadCount),
threads: Int = AccumuloWriteStrategy.defaultThreadCount
) extends AccumuloWriteStrategy {
val kwConfig = KryoWrapper(config) // BatchWriterConfig is not java serializable
def write(kvPairs: RDD[(Key, Value)], instance: AccumuloInstance, table: String): Unit = {
kvPairs.foreachPartition { partition =>
if(partition.nonEmpty) {
val pool = Executors.newFixedThreadPool(threads)
implicit val ec = ExecutionContext.fromExecutor(pool)
implicit val cs = IO.contextShift(ec)
val writer = instance.connector.createBatchWriter(table, kwConfig.value)
try {
val mutations: fs2.Stream[IO, Mutation] = fs2.Stream.fromIterator[IO, Mutation](
partition.map { case (key, value) =>
val mutation = new Mutation(key.getRow)
mutation.put(key.getColumnFamily, key.getColumnQualifier, System.currentTimeMillis(), value)
mutation
}
)
val write = { (mutation: Mutation) => fs2.Stream eval IO.shift(ec) *> IO { writer.addMutation(mutation) } }
(mutations map write)
.parJoin(threads)
.compile
.drain
.unsafeRunSync()
} finally {
writer.close(); pool.shutdown()
}
}
}
}
}