Skip to content

Commit

Permalink
Added first revision of fixed width loader
Browse files Browse the repository at this point in the history
  • Loading branch information
Russ Lankenau committed Dec 4, 2012
1 parent c7c80c7 commit 721a845
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 1 deletion.
18 changes: 17 additions & 1 deletion README.md
@@ -1,4 +1,20 @@
fixed-width-pig-loader
======================

Pig LoadFunc for fixed-width data
Pig LoadFunc for fixed-width data


Building
--------

$mvn package

Running
-------

Sample pig script for use with this loader:

register 'maprfs:///user/rlankenau/FixedWidthLoader-1.0-SNAPSHOT.jar';

A = load 'maprfs:///user/rlankenau/testfile' using com.mapr.util.FixedWidthLoader('-10','11-15','16-','-');
store A into 'maprfs:///user/rlankenau/outputfile' using PigStorage(',');
47 changes: 47 additions & 0 deletions pom.xml
@@ -0,0 +1,47 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.mapr.util</groupId>
<artifactId>FixedWidthLoader</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>FixedWidthLoader</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.5</source>
<target>1.5</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
114 changes: 114 additions & 0 deletions src/main/java/com/mapr/util/FixedWidthLoader.java
@@ -0,0 +1,114 @@
package com.mapr.util;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.io.*;
import org.apache.pig.*;
import org.apache.pig.data.*;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.*;
import java.util.*;
import java.io.*;

public class FixedWidthLoader extends LoadFunc
{

/** Describes a single field (character range)
*/
private class FWField {
int start;
int end;

FWField(int character) {
this.start = character;
this.end = character+1;
}

FWField(int start, int end) {
this.start = start;
this.end = end;
}

}

/* Holds descriptors for fields to be parsed */
List<FWField> fields = new ArrayList<FWField>();
private RecordReader<LongWritable, Text> reader;

/** Construct a FixedWidthLoader using the specified mappings.
* The fields parameter is a string similar to that used in the unix 'cut' command.
* e.g. -2,3-10,20-30,100,102-
* Offsets specified in this string are 0-based. Note that the string "-" should result in the
* same output as a standard TextLoader.
*/
public FixedWidthLoader(String... fields) {
for(String range : fields) {
if(range.indexOf("-") != -1) {
/* Actual range */
int start, end;
String[] range_offsets = range.split("-", 2);
if(range_offsets[0].equals("")) {
start=0;
} else {
start=Integer.parseInt(range_offsets[0]);
}

if(range_offsets[1].equals("")) {
end=Integer.MAX_VALUE;
} else {
end=Integer.parseInt(range_offsets[1]);
}
this.fields.add(new FWField(start,end));
} else {
/* Single char */
int f_offset = Integer.parseInt(range);
this.fields.add(new FWField(f_offset));
}
}

}

@Override
public InputFormat getInputFormat() throws IOException {
return new TextInputFormat();
}

@Override
public void setLocation(String location, Job job) throws IOException
{
FileInputFormat.setInputPaths(job, location);
}

@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException
{
this.reader = reader;
}

@Override
public Tuple getNext() throws IOException {
Tuple output_rec = TupleFactory.getInstance().newTuple(this.fields.size());
try {
if(this.reader.nextKeyValue()) {
Text rawText = (Text)reader.getCurrentValue();
String input_rec = rawText.toString();

for(int i=0;i<fields.size();i++) {
FWField field = fields.get(i);
/* Ignore any field that starts after the end of the record */
if(field.start > input_rec.length()) {
continue;
}

String extractedField = input_rec.substring(field.start, Math.min(field.end, input_rec.length()));
output_rec.set(i, extractedField);

}
return output_rec;
}
} catch ( Exception e) {
throw new IOException("An error was encountered while parsing a record", e);
}
return null;
}

}
38 changes: 38 additions & 0 deletions src/test/java/com/mapr/util/AppTest.java
@@ -0,0 +1,38 @@
package com.mapr.util;

import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;

/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
}

/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
}

/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
}
}

0 comments on commit 721a845

Please sign in to comment.