/
StartMojo.java
467 lines (425 loc) · 17 KB
/
StartMojo.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
/**
* Licensed to WibiData, Inc. under one or more contributor license
* agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. WibiData, Inc.
* licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.wibidata.maven.plugins.hbase;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.maven.artifact.Artifact;
import org.apache.maven.artifact.DependencyResolutionRequiredException;
import org.apache.maven.plugin.AbstractMojo;
import org.apache.maven.plugin.MojoExecutionException;
import org.apache.maven.project.MavenProject;
/**
* A maven goal that starts a mini HBase cluster in a new daemon thread.
*
* <p>A new daemon thread is created that starts a mini HBase cluster. The main thread
* blocks until the HBase cluster has full started. The daemon thread with the
* in-process HBase cluster will continue to run in the background until stopped by the
* 'stop' goal of the plugin.</p>
*
* <p>The configuration of the started mini HBase cluster will be written to a
* hbase-site.xml file in the test classpath ('${basedir}/target/test-classes' by
* default). The path to the generated configuration file may be customized with the
* 'hbaseSiteFile' property</p>
*
* <p>A configuration index can be written by this goal. The configuration index is a file that
* contains one line for each configuration file written by this goal, where the line contains the
* path to the configuration file. By default, the goal does not write a configuration index.
* Setting the property 'writeConfIndex' to true will cause a configuration index to be written.
* By default, the configuration index will be written to
* '${basedir}/target/test-classes/conf-index.conf'. The path to the generated configuration index
* can be customized by setting the 'hbaseConfIndex' property.</p>
*
* @goal start
* @phase pre-integration-test
* @requiresDependencyResolution test
*/
public class StartMojo extends AbstractMojo {
/**
* If true, this goal should be a no-op.
*
* @parameter property="skip" default-value="false"
*/
private boolean mSkip;
/**
* The file that will store the configuration required to connect to the started mini HBase
* cluster. This file will be generated by the goal.
*
* @parameter property="hbaseSiteFile" expression="${hbase.site.file}" default-value="${project.build.testOutputDirectory}/hbase-site.xml"
* @required
*/
private File mHBaseSiteFile;
/**
* The file that will store the configuration required to connect to the started mini HDFS and
* MapReduce clusters. This file will be generated by the goal.
*
* @parameter property="coreSiteFile" expression="${core.site.file}" default-value="${project.build.testOutputDirectory}/core-site.xml"
* @required
*/
private File mCoreSiteFile;
/**
* If true, this goal should write an index file that provides the paths to the HBase
* configuration files written by this goal.
*
* @parameter property="writeConfIndex" expression="${hbase.conf.index}" default-value="false"
*/
private boolean mWriteConfIndex;
/**
* The file that will store paths to the configuration files generated by the goal. This file
* will be generated by the goal and will contain one line for each configuration file giving the
* path to that configuration file.
*
* @parameter property="hbaseConfIndex" expression="${hbase.conf.index.file}" default-value="${project.build.testOutputDirectory}/conf-index.conf"
*/
private File mHBaseConfIndex;
/**
* If true, also start a mini MapReduce cluster.
*
* @parameter property="mapReduceEnabled" expression="${mapreduce.enabled}" default-value="false"
*/
private boolean mIsMapReduceEnabled;
/**
* Extra Hadoop configuration properties to use.
*
* @parameter property="hadoopConfiguration"
*/
private Properties mHadoopConfiguration;
/**
* A list of this plugin's dependency artifacts.
*
* @parameter default-value="${plugin.artifacts}"
* @required
* @readonly
*/
private List<Artifact> mPluginDependencyArtifacts;
/**
* The maven project this plugin is running within.
*
* @parameter default-value="${project}"
* @required
* @readonly
*/
private MavenProject mMavenProject;
/**
* Sets whether this goal should be a no-op.
*
* @param skip If true, this goal should do nothing.
*/
public void setSkip(boolean skip) {
mSkip = skip;
}
/**
* Sets the file that we should write the HBase cluster configuration to.
*
* <p>Note: The property "hbaseSiteFile" defined in this mojo means this method must be
* named setHbaseSiteFile instead of setHBaseSiteFile.</p>
*
* @param hbaseSiteFile The file we should write to.
*/
public void setHbaseSiteFile(File hbaseSiteFile) {
mHBaseSiteFile = hbaseSiteFile;
}
/**
* Sets the file that we should write the MapReduce/HDFS cluster configuration to.
*
* @param coreSiteFile The file we should write to.
*/
public void setCoreSiteFile(File coreSiteFile) {
mCoreSiteFile = coreSiteFile;
}
/**
* Sets whether this goal should write a configuration index file.
*
* @param writeConfIndex True if an index file should be written, false otherwise.
*/
public void setWriteConfIndex(boolean writeConfIndex) {
mWriteConfIndex = writeConfIndex;
}
/**
* Sets the file that the HBase configuration index should be written to.
*
* <p>Note: The property "hbaseConfIndex" defined in this mojo means this method should be named
* setHbaseConfIndex.</p>
*
* @param hbaseConfIndex The file we should write to.
*/
public void setHbaseConfIndex(File hbaseConfIndex) {
mHBaseConfIndex = hbaseConfIndex;
}
/**
* Sets whether we should start a mini MapReduce cluster in addition to the HBase cluster.
*
* @param enabled Whether to start a mini MapReduce cluster.
*/
public void setMapReduceEnabled(boolean enabled) {
mIsMapReduceEnabled = enabled;
}
/**
* Sets Hadoop configuration properties.
*
* @param properties Hadoop configuration properties to use in the mini cluster.
*/
public void setHadoopConfiguration(Properties properties) {
mHadoopConfiguration = properties;
}
/**
* Starts a mini HBase cluster in a new thread.
*
* <p>This method is called by the maven plugin framework to run the goal.</p>
*
* @throws MojoExecutionException If there is a fatal error during this goal's execution.
*/
@Override
public void execute() throws MojoExecutionException {
if (mSkip) {
getLog().info("Not starting an HBase cluster because skip=true.");
return;
}
System.setProperty("java.class.path", getClassPath());
getLog().info("Set java.class.path to: " + System.getProperty("java.class.path"));
// Set any extra hadoop options.
Configuration conf = new Configuration();
if (null != mHadoopConfiguration) {
for (Map.Entry<Object, Object> property : mHadoopConfiguration.entrySet()) {
String confKey = property.getKey().toString();
String confValue = property.getValue().toString();
getLog().info("Setting hadoop conf property '" + confKey + "' to '" + confValue + "'");
conf.set(confKey, confValue);
}
}
// Start the cluster.
try {
MiniHBaseClusterSingleton.INSTANCE.startAndWaitUntilReady(
getLog(), mIsMapReduceEnabled, conf);
} catch (IOException e) {
throw new MojoExecutionException("Unable to start HBase cluster.", e);
}
// Write the HBase configuration file.
writeHBaseSiteFile(conf);
// Write the MapReduce/HDFS configuration file.
writeCoreSiteFile(conf);
// Write the configuration index.
if (mWriteConfIndex) {
writeConfigurationIndex();
}
}
/**
* Gets the runtime classpath required to run the mini clusters.
*
* <p>The maven classloading scheme is nonstandard. They only put the "classworlds" jar
* on the classpath, and it takes care of ClassLoading the rest of the jars. This a
* problem if we are going to start a mini MapReduce cluster. The TaskTracker will
* start a child JVM with the same classpath as this process, and it won't have
* configured the classworlds class loader. To work around this, we will put all of
* our dependencies into the java.class.path system property, which will be read by
* the TaskRunner's child JVM launcher to build the child JVM classpath.</p>
*
* <p>Note that when we say "all of our dependencies" we mean both the dependencies of
* this plugin as well as the test classes and dependencies of the project that is
* running the plugin. We need to include the latter on the classpath because tests are
* still just .class files at integration-test-time. There will be no jars available
* yet to put on the distributed cache via job.setJarByClass(). Hence, all of the
* test-classes in the project running this plugin need to already be on the classpath
* of the MapReduce cluster.<p>
*/
private String getClassPath() throws MojoExecutionException {
// Maintain a set of classpath components added so we can de-dupe.
Set<String> alreadyAddedComponents = new HashSet<String>();
// Use this to build up the classpath string.
StringBuilder classpath = new StringBuilder();
// Add the existing classpath.
String existingClasspath = System.getProperty("java.class.path");
classpath.append(existingClasspath);
alreadyAddedComponents.addAll(Arrays.asList(existingClasspath.split(":")));
// Add the test classes and dependencies of the maven project running this plugin.
//
// Note: It is important that we add these classes and dependencies before we add this
// plugin's dependencies in case the maven project needs to override a jar version.
List<?> testClasspathComponents;
try {
testClasspathComponents = mMavenProject.getTestClasspathElements();
} catch (DependencyResolutionRequiredException e) {
throw new MojoExecutionException("Unable to retrieve project test classpath", e);
}
for (Object testClasspathComponent : testClasspathComponents) {
String dependency = testClasspathComponent.toString();
if (alreadyAddedComponents.contains(dependency)) {
continue;
}
classpath.append(":");
classpath.append(dependency);
alreadyAddedComponents.add(dependency);
}
// Add this plugin's dependencies.
for (Artifact artifact : mPluginDependencyArtifacts) {
String dependency = artifact.getFile().getPath();
if (alreadyAddedComponents.contains(dependency)) {
continue;
}
classpath.append(":");
classpath.append(dependency);
alreadyAddedComponents.add(dependency);
}
return classpath.toString();
}
/**
* Writes the HBase-specific contents of the specified configuration to the HBase site file.
*
* @param conf The configuration to write.
* @throws MojoExecutionException If there is an error writing the file.
*/
private void writeHBaseSiteFile(Configuration conf) throws MojoExecutionException {
writeSiteFile(getHBaseOnlyConfiguration(conf), mHBaseSiteFile);
}
/**
* Writes the MapReduce/HDFS-specific contents of the specified configuration to the core
* site file.
*
* @param conf The configuration to write.
* @throws MojoExecutionException If there is an error writing the file.
*/
private void writeCoreSiteFile(Configuration conf) throws MojoExecutionException {
writeSiteFile(getMapReduceOnlyConfiguration(conf), mCoreSiteFile);
}
/**
* Writes the specified configuration to the specified file.
*
* @param conf The configuration to write.
* @param siteFile The file to write the configuration to.
* @throws MojoExecutionException If there is an error writing the file.
*/
private void writeSiteFile(Configuration conf, File siteFile) throws MojoExecutionException {
// Create the parent directory for the site file if it does not already exist.
createFileParentDir(siteFile);
// Write the file.
FileOutputStream fileOutputStream = null;
try {
fileOutputStream = new FileOutputStream(siteFile);
conf.writeXml(fileOutputStream);
} catch (IOException e) {
throw new MojoExecutionException(
"Unable to write to site file: " + siteFile.getPath(), e);
} finally {
closeFileOutputStream(fileOutputStream);
}
getLog().info("Wrote " + siteFile.getPath() + ".");
}
/**
* Gets a new configuration created from the specified configuration, including only HBase
* configuration variables.
*
* @param conf The configuration to filter.
* @return A new configuration containing copies of the appropriate configuration variables.
*/
private Configuration getHBaseOnlyConfiguration(Configuration conf) {
return getFilteredConfiguration(conf, true);
}
/**
* Gets a new configuration created from the specified configuration, including only
* MapReduce/HDFS configuration variables.
*
* @param conf The configuration to filter.
* @return A new configuration containing copies of the appropriate configuration variables.
*/
private Configuration getMapReduceOnlyConfiguration(Configuration conf) {
return getFilteredConfiguration(conf, false);
}
/**
* Gets a new configuration created from the specified configuration,
* including only MapReduce/HDFS configuration variables or HBase only configuration variables.
*
* @param conf The configuration to filter.
* @param hBaseOnly <code>true</code> if only HBase configuration variables should be included,
* <code>false</code> if only MapReduce/HDFS configuration variables should be included.
* @return A new configuration with copies of the appropriate configuration variables.
*/
private Configuration getFilteredConfiguration(Configuration conf, boolean hBaseOnly) {
Configuration filteredConf = new Configuration(false);
for (Map.Entry<String, String> entry: conf) {
boolean startsWithHBase = entry.getKey().startsWith("hbase");
if ((startsWithHBase && hBaseOnly) || (!startsWithHBase && !hBaseOnly)) {
filteredConf.set(entry.getKey(), entry.getValue());
}
}
return filteredConf;
}
/**
* Writes a configuration index.
*
* @throws MojoExecutionException If there is an error writing the configuration file.
*/
private void writeConfigurationIndex() throws MojoExecutionException {
// Create the parent directory of the file we are writing.
createFileParentDir(mHBaseConfIndex);
// Write the file.
FileOutputStream fileOutputStream = null;
PrintWriter fileWriter = null;
try {
fileOutputStream = new FileOutputStream(mHBaseConfIndex);
fileWriter = new PrintWriter(fileOutputStream);
fileWriter.println(mHBaseSiteFile.getPath());
} catch (IOException e) {
throw new MojoExecutionException(
"Unable to write to configuration index file: " + mHBaseConfIndex.getPath(), e);
} finally {
if (null != fileWriter) {
fileWriter.close();
}
closeFileOutputStream(fileOutputStream);
}
getLog().info("Wrote " + mHBaseConfIndex.getPath() + ".");
}
/**
* Gets the parent directory of the specified file. Creates the directory if it does not already
* exist.
*
* @return The parent directory.
* @throws MojoExecutionException If there is an error getting or creating the parent directory.
*/
private static File createFileParentDir(File file) throws MojoExecutionException {
File parentDir = file.getParentFile();
if (null != parentDir && !parentDir.exists() && !parentDir.mkdirs()) {
throw new MojoExecutionException(
"Unable to create or access parent directory of: "
+ file.getParent());
}
return parentDir;
}
/**
* Closes the specified FileOutputStream. The specified stream may be null, in which case this
* operation is a no-op.
*
* @throws MojoExecutionException If there is an error closing the stream.
*/
private static void closeFileOutputStream(FileOutputStream stream) throws MojoExecutionException {
if (null != stream) {
try {
stream.close();
} catch (IOException e) {
throw new MojoExecutionException ("Unable to close file stream.", e);
}
}
}
}