Skip to content
This repository has been archived by the owner on Apr 5, 2022. It is now read-only.

Commit

Permalink
+ add namespace for pig
Browse files Browse the repository at this point in the history
  • Loading branch information
Costin Leau committed Nov 28, 2011
1 parent 4b87130 commit 0595698
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 37 deletions.
Expand Up @@ -40,5 +40,6 @@ public void init() {
registerBeanDefinitionParser("stream-job", new HadoopStreamJobParser());
registerBeanDefinitionParser("config", new HadoopConfigParser());
registerBeanDefinitionParser("resource-loader", new HadoopResourceLoaderParser());
registerBeanDefinitionParser("pig", new PigParser());
}
}
Expand Up @@ -66,6 +66,6 @@ protected void doParse(Element element, BeanDefinitionBuilder builder) {
builder.addPropertyValue("outputValueType", element.getAttribute(OUTPUT_VALUE_TYPE_ATTR));
builder.addPropertyValue("outputKeyType", element.getAttribute(OUTPUT_KEY_TYPE_ATTR));
builder.addPropertyReference("target", element.getAttribute(REF_ATTR));
NamespaceUtils.setValueIfAttributeDefined(builder, element, METHOD_ATTR);
NamespaceUtils.setPropertyValue(element, builder, METHOD_ATTR);
}
}
Expand Up @@ -21,6 +21,7 @@
/**
* Namespace utilities.
*
* @author Costin Leau
*/
abstract class NamespaceUtils {

Expand All @@ -37,6 +38,10 @@ static void setPropertyValue(Element element, BeanDefinitionBuilder builder, Str
}
}

static void setPropertyValue(Element element, BeanDefinitionBuilder builder, String attrName) {
setPropertyValue(element, builder, attrName, Conventions.attributeNameToPropertyName(attrName));
}

static boolean setPropertyReference(Element element, BeanDefinitionBuilder builder, String attrName, String propertyName) {
String attr = element.getAttribute(attrName);
if (StringUtils.hasText(attr)) {
Expand All @@ -46,22 +51,8 @@ static boolean setPropertyReference(Element element, BeanDefinitionBuilder build
return false;
}


/**
* Populates the bean definition property corresponding to the specified attributeName with the value of that
* attribute if it is defined in the given element.
*
* <p>
* The property name will be the camel-case equivalent of the lower case hyphen separated attribute (e.g. the
* "foo-bar" attribute would match the "fooBar" property).
*
* @see Conventions#attributeNameToPropertyName(String)
*
* @param builder the bean definition builder to be configured
* @param element the XML element where the attribute should be defined
* @param attributeName the name of the attribute whose value will be set on the property
*/
static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Element element, String attributeName) {
setPropertyValue(element, builder, attributeName, Conventions.attributeNameToPropertyName(attributeName));
static boolean setPropertyReference(Element element, BeanDefinitionBuilder builder, String attrName) {
return setPropertyReference(element, builder, attrName,
Conventions.attributeNameToPropertyName(isReference(attrName)? attrName.substring(0, attrName.length() - 4) : attrName));
}
}
@@ -0,0 +1,85 @@
/*
* Copyright 2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.hadoop.config;

import java.util.Map;

import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.data.hadoop.pig.PigContextFactoryBean;
import org.springframework.data.hadoop.pig.PigServerFactoryBean;
import org.springframework.util.StringUtils;
import org.w3c.dom.Element;


/**
* Parser for pig element.
*
* @author Costin Leau
*/
class PigParser extends AbstractImprovedSimpleBeanDefinitionParser {

@Override
protected Class<?> getBeanClass(Element element) {
return PigServerFactoryBean.class;
}

@Override
protected boolean isEligibleAttribute(String attributeName) {
return !("scripts".equals(attributeName) || "paths-to-skip".equals(attributeName)
|| "job-tracker".equals(attributeName) || "configuration-ref".equals(attributeName) || "exec-type".equals(attributeName))
&& super.isEligibleAttribute(attributeName);
}


@Override
protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
// parse attributes using conventions
super.doParse(element, parserContext, builder);

// parse resources
String attr = element.getAttribute("scripts");

if (StringUtils.hasText(attr)) {
builder.addPropertyValue("scripts", StringUtils.commaDelimitedListToStringArray(attr));
}

attr = element.getAttribute("paths-to-skip");
if (StringUtils.hasText(attr)) {
builder.addPropertyValue("pathsToSkip", StringUtils.commaDelimitedListToStringArray(attr));
}

// parse nested PigContext definition

BeanDefinitionBuilder contextBuilder = BeanDefinitionBuilder.genericBeanDefinition(PigContextFactoryBean.class);

Map parsedProps = parserContext.getDelegate().parsePropsElement(element);
if (!parsedProps.isEmpty()) {
contextBuilder.addPropertyValue("properties", parsedProps);
}

NamespaceUtils.setPropertyValue(element, contextBuilder, "job-tracker");
NamespaceUtils.setPropertyValue(element, contextBuilder, "exec-type");
NamespaceUtils.setPropertyReference(element, contextBuilder, "configuration-ref");

builder.addPropertyValue("pigContext", contextBuilder.getBeanDefinition());
}

@Override
protected String defaultId() {
return "hadoop-pig";
}
}
Expand Up @@ -56,6 +56,6 @@ protected void doParse(Element element, BeanDefinitionBuilder builder) {
builder.addPropertyValue("outputValueType", element.getAttribute(OUTPUT_VALUE_TYPE_ATTR));
builder.addPropertyValue("outputKeyType", element.getAttribute(OUTPUT_KEY_TYPE_ATTR));
builder.addPropertyReference("target", element.getAttribute(REF_ATTR));
NamespaceUtils.setValueIfAttributeDefined(builder, element, METHOD_ATTR);
NamespaceUtils.setPropertyValue(element, builder, METHOD_ATTR);
}
}
Expand Up @@ -15,8 +15,13 @@
*/
package org.springframework.data.hadoop.pig;

import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.pig.ExecType;
import org.apache.pig.impl.PigContext;
import org.springframework.beans.factory.FactoryBean;
Expand All @@ -35,7 +40,8 @@ public class PigContextFactoryBean implements InitializingBean, FactoryBean<PigC
private String lastAlias;
private String jobTracker;
private ExecType execType = ExecType.MAPREDUCE;
private Properties properties = new Properties();
private Properties properties;
private Configuration configuration;

public PigContext getObject() throws Exception {
return context;
Expand All @@ -50,12 +56,32 @@ public boolean isSingleton() {
}

public void afterPropertiesSet() throws Exception {
Properties prop = new Properties();

// first add the hadoop config
if (configuration != null) {
Iterator<Entry<String, String>> iterator = configuration.iterator();
while (iterator.hasNext()) {
Map.Entry<java.lang.String, java.lang.String> entry = iterator.next();
prop.setProperty(entry.getKey(), entry.getValue());
}
}

// add properties
if (properties != null) {
Enumeration<?> names = properties.propertyNames();
while (names.hasMoreElements()) {
String name = (String) names.nextElement();
prop.setProperty(name, properties.getProperty(name));
}
}

if (StringUtils.hasText(jobTracker)) {
properties.setProperty("mapred.job.tracker", jobTracker);
prop.setProperty("mapred.job.tracker", jobTracker);
// invoking setter below causes NPE since PIG expects the engine to be started already ...
// context.setJobtrackerLocation(jobTracker);
}
context = new PigContext(execType, properties);
context = new PigContext(execType, prop);

if (StringUtils.hasText(lastAlias)) {
context.setLastAlias(lastAlias);
Expand Down Expand Up @@ -89,4 +115,11 @@ public void setExecType(ExecType execType) {
public void setProperties(Properties properties) {
this.properties = properties;
}

/**
* @param configuration The configuration to set.
*/
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
}
Expand Up @@ -16,8 +16,7 @@
package org.springframework.data.hadoop.pig;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Collection;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -33,7 +32,7 @@
import org.springframework.util.StringUtils;

/**
* Factory for creating a {@link PigServerFactoryBean} instance.
* Factory for creating a {@link PigServer} instance.
*
* @author Costin Leau
*/
Expand All @@ -48,8 +47,8 @@ public class PigServerFactoryBean implements SmartLifecycle, InitializingBean, D
private boolean autoStartup = true;

private PigContext pigContext;
private List<String> pathToSkip;
private List<Resource> scripts;
private Collection<String> pathToSkip;
private Collection<Resource> scripts;
private Integer parallelism;
private String jobName;
private String jobPriority;
Expand Down Expand Up @@ -167,16 +166,14 @@ public void setPigContext(PigContext pigContext) {
/**
* @param pathToSkip The pathToSkip to set.
*/
public void setPathToSkip(String pathToSkip) {
if (StringUtils.hasText(pathToSkip)) {
this.pathToSkip = Arrays.asList(StringUtils.commaDelimitedListToStringArray(pathToSkip));
}
public void setPathsToSkip(Collection<String> pathToSkip) {
this.pathToSkip = pathToSkip;
}

/**
* @param scripts The scripts to set.
*/
public void setScripts(List<Resource> scripts) {
public void setScripts(Collection<Resource> scripts) {
this.scripts = scripts;
}

Expand Down
Expand Up @@ -604,8 +604,77 @@ Reference to the Hadoop Configuration. Defaults to 'hadoop-configuration'.]]></x
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>

</xsd:complexType>
</xsd:element>


<xsd:element name="pig">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines a Pig server.
]]>
</xsd:documentation>
<xsd:appinfo>
<tool:annotation>
<tool:exports type="org.apache.pig.PigServer"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
<xsd:complexType>
<xsd:complexContent>
<xsd:extension base="beans:propsType">
<xsd:attribute name="id" type="xsd:ID" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Bean id (default is "hadoop-pig").
]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="scripts">
<xsd:annotation>
<xsd:documentation source="org.springframework.core.io.Resource"><![CDATA[
Pig scripts to be registered. Multiple resources can be specified, using comma (,) as a separator.]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="direct">
<tool:expected-type type="org.springframework.core.io.Resource[]" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="paths-to-skip">
<xsd:annotation>
<xsd:documentation><![CDATA[
The path to be skipped while automatically shipping binaries for streaming. Multiple resources can be specified, using comma (,) as a separator.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="parallelism" type="xsd:integer"/>
<xsd:attribute name="validate-each-statement" type="xsd:boolean"/>
<xsd:attribute name="job-priority" type="xsd:string"/>
<xsd:attribute name="job-name" type="xsd:string"/>
<xsd:attribute name="job-tracker" type="xsd:string"/>
<xsd:attribute name="auto-startup" type="xsd:boolean"/>
<xsd:attribute name="configuration-ref">
<xsd:annotation>
<xsd:documentation source="org.apache.hadoop.conf.Configuration"><![CDATA[
Reference to the Hadoop Configuration. Can be tweaked through the nested properties or the other properties.]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.apache.hadoop.conf.Configuration" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="exec-type" default="MAPREDUCE">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="MAPREDUCE"/>
<xsd:enumeration value="LOCAL"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
</xsd:schema>
Expand Up @@ -46,17 +46,20 @@
reducer="${path.wc}"
/>

<!--
<bean id="pig" class="org.springframework.data.hadoop.pig.PigServerFactoryBean"
p:scripts="classpath:org/springframework/data/hadoop/pig/script.pig" p:auto-startup="false">
<property name="pigContext">
<bean id="pig-ctx" class="org.springframework.data.hadoop.pig.PigContextFactoryBean" p:exec-type="LOCAL"/>
</property>
</bean>

-->
<hdp:pig scripts="classpath:org/springframework/data/hadoop/pig/script.pig" auto-startup="false" exec-type="LOCAL" />

<bean id="hadoop-pig-tasklet" class="org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter">
<property name="targetObject" ref="&amp;pig"/>
<property name="targetObject" ref="&amp;hadoop-pig"/>
<property name="targetMethod" value="start" />
</bean>
</bean>

<bean id="file-reader" class="org.springframework.batch.item.file.ResourcesItemReader" p:resources="classpath:${input.directory}/*.log"/>

Expand Down

0 comments on commit 0595698

Please sign in to comment.