-
Notifications
You must be signed in to change notification settings - Fork 703
/
LzoGenericScheme.scala
128 lines (102 loc) · 4.93 KB
/
LzoGenericScheme.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
Copyright 2015 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding.commons.source
import scala.reflect.ClassTag
import com.twitter.bijection._
import com.twitter.chill.Externalizer
import com.twitter.elephantbird.cascading2.scheme.LzoBinaryScheme
import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat
import com.twitter.elephantbird.mapreduce.io.{ BinaryConverter, GenericWritable }
import com.twitter.elephantbird.mapreduce.input.{ BinaryConverterProvider, MultiInputFormat }
import com.twitter.elephantbird.mapreduce.output.LzoGenericBlockOutputFormat
import com.twitter.elephantbird.mapred.output.DeprecatedOutputFormatWrapper
import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader }
import org.apache.hadoop.conf.Configuration
import cascading.tap.Tap
import cascading.flow.FlowProcess
/**
* Serializes BinaryConverters to JobConf.
*/
private[source] object ExternalizerSerializer {
def inj[T]: Injection[Externalizer[T], String] = {
import com.twitter.bijection.Inversion.attemptWhen
import com.twitter.bijection.codec.Base64
implicit val baseInj = JavaSerializationInjection[Externalizer[T]]
implicit val unwrap: Injection[GZippedBase64String, String] =
// this does not catch cases where it's Base64 but not compressed
// but the decompression injection will, so it's safe to do this
new AbstractInjection[GZippedBase64String, String] {
override def apply(gzbs: GZippedBase64String) = gzbs.str
override def invert(str: String) = attemptWhen(str)(Base64.isBase64)(GZippedBase64String(_))
}
Injection.connect[Externalizer[T], Array[Byte], GZippedBase64String, String]
}
}
private[source] object ConfigBinaryConverterProvider {
val ProviderConfKey = "com.twitter.scalding.lzo.converter.provider"
}
/**
* Provides BinaryConverter serialized in JobConf.
*/
private[source] class ConfigBinaryConverterProvider[M] extends BinaryConverterProvider[M] {
import ConfigBinaryConverterProvider._
private[this] var cached: Option[(String, BinaryConverter[M])] = None
override def getConverter(conf: Configuration): BinaryConverter[M] = {
val data = conf.get(ProviderConfKey)
require(data != null, s"$ProviderConfKey is not set in configuration")
cached match {
case Some((data, conv)) => conv
case _ =>
val extern = ExternalizerSerializer.inj.invert(data).get
val conv = extern.get.asInstanceOf[BinaryConverter[M]]
cached = Some((data, conv))
conv
}
}
}
/**
* Generic scheme for data stored as lzo-compressed protobuf messages.
* Serialization is performed using the supplied BinaryConverter.
*/
class LzoGenericScheme[M: ClassTag](@transient conv: BinaryConverter[M]) extends LzoBinaryScheme[M, GenericWritable[M]] {
override protected def prepareBinaryWritable(): GenericWritable[M] =
new GenericWritable(conv)
override def sourceConfInit(fp: FlowProcess[JobConf],
tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]],
conf: JobConf): Unit = {
val extern = Externalizer(conv)
try {
ExternalizerSerializer.inj.invert(ExternalizerSerializer.inj(extern)).get
} catch {
case e: Exception => throw new RuntimeException("Unable to roundtrip the BinaryConverter in the Externalizer.", e)
}
conf.set(ConfigBinaryConverterProvider.ProviderConfKey, ExternalizerSerializer.inj(extern))
MultiInputFormat.setClassConf(implicitly[ClassTag[M]].runtimeClass, conf)
MultiInputFormat.setGenericConverterClassConf(classOf[ConfigBinaryConverterProvider[_]], conf)
DelegateCombineFileInputFormat.setDelegateInputFormat(conf, classOf[MultiInputFormat[_]])
}
override def sinkConfInit(fp: FlowProcess[JobConf],
tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]],
conf: JobConf): Unit = {
val extern = Externalizer(conv)
try {
ExternalizerSerializer.inj.invert(ExternalizerSerializer.inj(extern)).get
} catch {
case e: Exception => throw new RuntimeException("Unable to roundtrip the BinaryConverter in the Externalizer.", e)
}
LzoGenericBlockOutputFormat.setClassConf(implicitly[ClassTag[M]].runtimeClass, conf)
conf.set(ConfigBinaryConverterProvider.ProviderConfKey, ExternalizerSerializer.inj(extern))
LzoGenericBlockOutputFormat.setGenericConverterClassConf(classOf[ConfigBinaryConverterProvider[_]], conf)
DeprecatedOutputFormatWrapper.setOutputFormat(classOf[LzoGenericBlockOutputFormat[_]], conf)
}
}