Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

Commit

Permalink
+ add hbase namespace
Browse files Browse the repository at this point in the history
+ refactor hbase FB to inject a Configuration object (rather then extend it) - composition over inheritance
  • Loading branch information
Costin Leau committed Nov 29, 2011
1 parent 0595698 commit 6e697ae
Show file tree
Hide file tree
Showing 16 changed files with 179 additions and 34 deletions.
47 changes: 43 additions & 4 deletions build.gradle
Expand Up @@ -56,10 +56,49 @@ apply plugin: 'bundlor' // all core projects should be OSGi-compliant
bundlor.useProjectProps = true
[compileJava, compileTestJava]*.options*.compilerArgs = ["-Xlint:-serial"]

skipPig = true
skipHive = true
skipHBase = true

task enablePigTests {
description = "Enable Pig tests"
group = "Verification"
skipPig = false
}

task enableHiveTests {
description = "Enable Hive tests"
group = "Verification"
skipHive = false
}

task enableHBaseTests {
description = "Enable HBase tests"
group = "Verification"
skipHBase = false
}

task enableAllTest() {
description = "Enable all (incl. Pig, Hive, HBase) tests"
group = "Verification"
skipPig = false
skipHBase = false
skipHive = false
}

test {
//forkEvery = 1
systemProperties['input.path'] = 'build/classes/test/input'
systemProperties['output.path'] = 'build/classes/test/output'
includes = ["**/*.class"]

if (skipPig)
{
println "Excluding Pig"
excludes.add("**/pig/**")
}
if (skipHBase) excludes.add("**/hbase/**")
if (skipHive) excludes.add("**/hive/**")
}

// Common dependencies
Expand Down Expand Up @@ -120,8 +159,8 @@ dependencies {

javaprojects = rootProject

sourceCompatibility = 1.5
targetCompatibility = 1.5
sourceCompatibility = 1.6
targetCompatibility = 1.6

javadoc {
srcDir = file("${projectDir}/docs/src/api")
Expand Down Expand Up @@ -225,8 +264,8 @@ task uploadDist(type: org.springframework.gradle.tasks.S3DistroUpload, dependsOn
description = "Upload the ZIP Distribution"
group = "Distribution"
archiveFile = dist.archivePath
projectKey = 'SGF'
projectName = 'Spring Hadoop'
projectKey = 'SHDP'
projectName = 'Spring Data Hadoop'
}

defaultTasks 'clean', 'build'
Expand Up @@ -38,8 +38,10 @@ public void init() {
registerBeanDefinitionParser("tasklet", new HadoopTaskletParser());
registerBeanDefinitionParser("job", new HadoopJobParser());
registerBeanDefinitionParser("stream-job", new HadoopStreamJobParser());
registerBeanDefinitionParser("config", new HadoopConfigParser());
registerBeanDefinitionParser("configuration", new HadoopConfigParser());
registerBeanDefinitionParser("resource-loader", new HadoopResourceLoaderParser());
registerBeanDefinitionParser("pig", new PigParser());
registerBeanDefinitionParser("hbase-configuration", new HbaseConfigurationParser());
}
}

@@ -0,0 +1,37 @@
/*
* Copyright 2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.hadoop.config;

import org.springframework.data.hadoop.hbase.HbaseConfigurationFactoryBean;
import org.w3c.dom.Element;

/**
* Parser for "hbase-configuration" element.
*
* @author Costin Leau
*/
class HbaseConfigurationParser extends AbstractImprovedSimpleBeanDefinitionParser {

@Override
protected Class<?> getBeanClass(Element element) {
return HbaseConfigurationFactoryBean.class;
}

@Override
protected String defaultId() {
return "hbase-configuration";
}
}
Expand Up @@ -82,4 +82,4 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
protected String defaultId() {
return "hadoop-pig";
}
}
}
Expand Up @@ -19,7 +19,8 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.data.hadoop.configuration.ConfigurationFactoryBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;

/**
* Factory for creating HBase specific configuration. By default also cleans up any connection associated with the current configuration.
Expand All @@ -28,15 +29,12 @@
* @see HConnectionManager
* @author Costin Leau
*/
public class HbaseConfigurationFactoryBean extends ConfigurationFactoryBean implements DisposableBean {
public class HbaseConfigurationFactoryBean implements InitializingBean, DisposableBean, FactoryBean<Configuration> {

private boolean deleteConnection = true;
private boolean stopProxy = true;

@Override
protected void postProcessConfiguration(Configuration configuration) {
HBaseConfiguration.addHbaseResources(configuration);
}
private Configuration config;
private Configuration configuration;

/**
* Indicates whether the potential connection created by this config is destroyed at shutdown (default).
Expand All @@ -47,7 +45,6 @@ public void setDeleteConnection(boolean deleteConnection) {
this.deleteConnection = deleteConnection;
}


/**
* Indicates whether, when/if the associated connection is destroyed, whether the proxy is stopped or not.
*
Expand All @@ -57,9 +54,34 @@ public void setStopProxy(boolean stopProxy) {
this.stopProxy = stopProxy;
}

/**
* Sets the Hadoop configuration to use.
*
* @param configuration The configuration to set.
*/
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}

public void destroy() {
if (deleteConnection) {
HConnectionManager.deleteConnection(getObject(), stopProxy);
}
}
}

public void afterPropertiesSet() {
config = (configuration != null ? HBaseConfiguration.create(configuration) : configuration);
}

public Configuration getObject() {
return config;
}

public Class<? extends Configuration> getObjectType() {
return (config != null ? config.getClass() : Configuration.class);
}

public boolean isSingleton() {
return true;
}
}
Expand Up @@ -200,7 +200,7 @@ Whether to synchronously wait for the job to finish (the default) or not.
</xsd:complexType>
</xsd:element>

<xsd:element name="config">
<xsd:element name="configuration">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines a Hadoop Configuration.
Expand All @@ -218,7 +218,7 @@ Defines a Hadoop Configuration.
<xsd:attribute name="id" type="xsd:ID" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Bean id (default is "hadoop-config").
Bean id (default is "hadoop-configuration").
]]>
</xsd:documentation>
</xsd:annotation>
Expand Down Expand Up @@ -677,4 +677,42 @@ Reference to the Hadoop Configuration. Can be tweaked through the nested propert
</xsd:complexContent>
</xsd:complexType>
</xsd:element>

<!-- HBase -->
<xsd:element name="hbase-configuration">
<xsd:complexType>
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines an HBase configuration.
]]>
</xsd:documentation>
<xsd:appinfo>
<tool:annotation>
<tool:exports type="org.apache.hadoop.conf.Configuration"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
<xsd:attribute name="id" type="xsd:ID" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Bean id (default is "hbase-configuration").
]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="stop-proxy" type="xsd:boolean" default="true"/>
<xsd:attribute name="delete-connection" type="xsd:boolean" default="true"/>
<xsd:attribute name="configuration-ref">
<xsd:annotation>
<xsd:documentation source="org.apache.hadoop.conf.Configuration"><![CDATA[
Reference to the Hadoop configuration.]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.apache.hadoop.conf.Configuration" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
</xsd:schema>
Expand Up @@ -46,7 +46,7 @@ public void testWorkflow() throws Exception {

ctx.registerShutdownHook();

FileSystem fs = ctx.getBean(FileSystem.class);
FileSystem fs = FileSystem.get(ctx.getBean(Configuration.class));
System.out.println("FS is " + fs.getClass().getName());
HdfsResourceLoader hrl = ctx.getBean(HdfsResourceLoader.class);
Resource resource = hrl.getResource("/ide-test/output/word/");
Expand All @@ -72,7 +72,7 @@ public void testWorkflowNS() throws Exception {

ctx.registerShutdownHook();

FileSystem fs = ctx.getBean(FileSystem.class);
FileSystem fs = FileSystem.get(ctx.getBean(Configuration.class));
fs.delete(new Path("/ide-test/output/word/"), true);

TriggerJobs tj = new TriggerJobs();
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.data.hadoop.batch;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
Expand Down Expand Up @@ -72,7 +73,7 @@ public void testStreamingNS() throws Exception {
}

private void cleanOutput(ApplicationContext ctx) throws Exception {
FileSystem fs = ctx.getBean(FileSystem.class);
FileSystem fs = FileSystem.get(ctx.getBean(Configuration.class));
fs.copyFromLocalFile(new Path("./build.gradle"), new Path("test/"));
fs.delete(new Path("output"), true);
}
Expand Down
Expand Up @@ -32,7 +32,7 @@
@RunWith(SpringJUnit4ClassRunner.class)
public class HadoopCfgNsTest {

@Resource(name = "hadoop-config")
@Resource(name = "hadoop-configuration")
private Configuration simple;

@Resource
Expand Down
Expand Up @@ -45,7 +45,7 @@ public void testHiveConnection() throws Exception {

ctx.registerShutdownHook();

Configuration config = ctx.getBean("hbase-config", Configuration.class);
Configuration config = ctx.getBean("hbase-configuration", Configuration.class);
HBaseAdmin admin = new HBaseAdmin(config);
String tableName = "myTable";
String columnName = "myColumnFamily";
Expand Down
Expand Up @@ -38,7 +38,7 @@
input-path="/ide-test/input/word/" output-path="/ide-test/output/word/"
mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"
reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"
configuration-ref="fs-config"
configuration-ref="hadoop-configuration"
validate-paths="false"
/>
</beans>
Expand Up @@ -26,7 +26,7 @@
<bean id="hadoop-tasklet" class="org.springframework.data.hadoop.batch.HadoopTasklet" p:job-ref="mr-job" p:wait-for-job="true"/>

<bean id="mr-job" class="org.springframework.data.hadoop.mapreduce.JobFactoryBean"
p:configuration-ref="fs-config"
p:configuration-ref="hadoop-configuration"
p:input-paths="/ide-test/input/word/"
p:output-path="/ide-test/output/word/"
p:mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"
Expand Down
Expand Up @@ -9,13 +9,13 @@


<!-- default -->
<hdp:config value-type="java.lang.String">
<hdp:configuration value-type="java.lang.String">
<entry key="test.name"><value>default</value></entry>
</hdp:config>
</hdp:configuration>

<hdp:config id="complex" resources="classpath:/test-site.xml, classpath:/test-site-2.xml" configuration-ref="hadoop-config">
<hdp:configuration id="complex" resources="classpath:/test-site.xml, classpath:/test-site-2.xml" configuration-ref="hadoop-configuration">
<entry key="test.name.2" value="complex"/>
</hdp:config>
</hdp:configuration>

<!-- default hdfs resource loader
<hdp:resource-loader />
Expand Down
Expand Up @@ -28,9 +28,9 @@


<!-- default id is 'hadoop-config' -->
<hdp:config register-url-handler="false">
<hdp:configuration register-url-handler="false">
<entry key="fs.default.name" value="${hd.fs}"/>
</hdp:config>
</hdp:configuration>

<!--
<bean id="fs-config" class="org.springframework.data.hadoop.configuration.ConfigurationFactoryBean">
Expand Down
@@ -1,16 +1,22 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:hdp="http://www.springframework.org/schema/hadoop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:c="http://www.springframework.org/schema/c"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop-1.0.xsd">

<import resource="../hadoop-ctx.xml"/>

<bean id="hbase-config" class="org.springframework.data.hadoop.hbase.HbaseConfigurationFactoryBean" p:configuration-ref="fs-config" />
<!--
<bean id="hbase-config" class="org.springframework.data.hadoop.hbase.HbaseConfigurationFactoryBean" p:configuration-ref="hadoop-configuration" />
-->

<hdp:hbase-configuration configuration-ref="hadoop-configuration" />

<context:property-placeholder location="test.properties"/>

Expand Down

0 comments on commit 6e697ae

Please sign in to comment.