Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jfrazee committed May 11, 2016
0 parents commit d82e831
Show file tree
Hide file tree
Showing 10 changed files with 773 additions and 0 deletions.
28 changes: 28 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
target
.project
.settings
.classpath
nbactions.xml
nb-configuration.xml
.DS_Store
.metadata
.recommenders
*.class

# Package Files #
*.jar
*.war
*.ear
*.nar

# Intellij
.idea/
*.iml
*.iws
*~

# Mobile Tools for Java (J2ME)
.mtj.tmp/

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
1 change: 1 addition & 0 deletions .java-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.7
41 changes: 41 additions & 0 deletions nifi-tika-nar/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-tika-bundle</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>

<artifactId>nifi-tika-nar</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-tika-processors</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies>

</project>
64 changes: 64 additions & 0 deletions nifi-tika-processors/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-tika-bundle</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>

<artifactId>nifi-tika-processors</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-parsers</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tika;

import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.xml.sax.SAXException;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import org.apache.tika.config.TikaConfig;
import org.apache.tika.detect.Detector;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaMetadataKeys;
import org.apache.tika.mime.MediaType;
import org.apache.tika.mime.MimeType;
import org.apache.tika.mime.MimeTypeException;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.AutoDetectParser;
// import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;

@Tags({"pdf", "doc", "tika", "attributes", "text"})
@CapabilityDescription("Convert PDF files, Word documents, etc. to plain text")
@SeeAlso(classNames = {"org.apache.nifi.processors.standard.IdentifyMimeType"})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class ConvertDocumentToText extends AbstractProcessor {

private TikaConfig config;
private Detector detector;

public static final PropertyDescriptor MAX_FILE_SIZE = new PropertyDescriptor
.Builder().name("Maximum File Size")
.displayName("Maximum File Size")
.description("Maximum size of file to try to convert")
.required(true)
.defaultValue("1MB")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description(
"Any FlowFile that is successfully parsed is routed to " +
"this relationship"
)
.build();

public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description(
"Any FlowFile that fails to be parsed is routed to " +
"this relationship"
)
.build();

public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description(
"The original file is always routed to this relationship"
)
.build();

private List<PropertyDescriptor> descriptors;

private Set<Relationship> relationships;

@Override
protected void init(final ProcessorInitializationContext context) {
this.config = TikaConfig.getDefaultConfig();
this.detector = config.getDetector();

final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(MAX_FILE_SIZE);
this.descriptors = Collections.unmodifiableList(descriptors);

final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_ORIGINAL);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}

@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}

@OnScheduled
public void onScheduled(final ProcessContext context) {

}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

final int maxFileSize = context.getProperty(MAX_FILE_SIZE).asDataSize(DataUnit.B).intValue();
final long fileSize = flowFile.getSize();

if (fileSize > maxFileSize) {
getLogger().error("FlowFile {} file size {} exceeds maximum file size {}", new Object[]{flowFile, fileSize, maxFileSize});
session.transfer(flowFile, REL_FAILURE);
return;
}

final BodyContentHandler parserHandler = new BodyContentHandler(maxFileSize > 0 ? maxFileSize : -1);
final ParseContext parserContext = new ParseContext();
final AutoDetectParser parser = new AutoDetectParser();

final String filename = flowFile.getAttribute(CoreAttributes.FILENAME.key());

final AtomicReference<String> mimeTypeRef =
new AtomicReference<String>(null);
final AtomicReference<Map<String, String>> attributesRef =
new AtomicReference<Map<String, String>>(null);

final AtomicReference<FlowFile> textRef =
new AtomicReference<FlowFile>(session.clone(flowFile));
final AtomicBoolean failedRef = new AtomicBoolean(false);

session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream stream) throws IOException {
try (final InputStream in = new BufferedInputStream(stream)) {
final TikaInputStream tikaStream = TikaInputStream.get(in);
final Metadata metadata = new Metadata();

// Add filename if it exists
if (filename != null) {
metadata.add(TikaMetadataKeys.RESOURCE_NAME_KEY, filename);
}

// Get MIME type
final MediaType mediatype = detector.detect(tikaStream, metadata);
final String mimeType = mediatype.toString();

// Get MIME extension
String mimeExt = null;
try {
mimeExt = config.getMimeRepository().forName(mimeType).getExtension();
if (mimeExt == null || mimeExt.isEmpty()) {
getLogger().warn("MIME type extension is {}", new Object[]{mimeExt == null ? "null" : "empty"});
}
} catch (MimeTypeException e) {
getLogger().warn(e.getMessage(), e);
}

// Get document metadata
try {
parser.parse(tikaStream, parserHandler, metadata, parserContext);
}
catch (TikaException e) {
getLogger().error(e.getMessage(), e);
failedRef.set(true);
return;

}
catch (SAXException e) {
getLogger().error(e.getMessage(), e);
failedRef.set(true);
return;
}

final Map<String, String> attributes = new HashMap<String, String>();
for (final String key : metadata.names()) {
final String value = metadata.get(key);
if (value != null && !value.isEmpty()) {
attributes.put(key, value);
}
}
attributesRef.set(attributes);

FlowFile text = textRef.get();
text = session.putAttribute(text, CoreAttributes.MIME_TYPE.key(), mimeType);
text = session.putAttribute(text, "mime.extension", mimeExt);
if (filename != null && !filename.isEmpty() && mimeExt != null && !mimeExt.isEmpty()) {
text = session.putAttribute(text, CoreAttributes.FILENAME.key(), filename.replaceAll(mimeExt, ".txt"));
}
text = session.putAllAttributes(text, attributes);

// Write plain text to FlowFile contents
final FlowFile _text = text;
text = session.write(text, new OutputStreamCallback() {
@Override
public void process(final OutputStream stream) throws IOException {
try (final OutputStream out = new BufferedOutputStream(stream)) {
final byte[] bytes = parserHandler.toString().getBytes();
if (bytes.length > 0) {
out.write(bytes);
}
else {
getLogger().warn("FlowFile {} was empty or can't be converted to text", new Object[]{_text});
failedRef.set(true);
}
}
}
});

textRef.set(text);
}
}
});

final FlowFile text = textRef.get();
final boolean failed = failedRef.get();

if (failed) {
session.transfer(flowFile, REL_FAILURE);
session.remove(text);
return;
}

session.transfer(flowFile, REL_ORIGINAL);
session.transfer(text, REL_SUCCESS);
}
}
Loading

0 comments on commit d82e831

Please sign in to comment.