Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Powerpoint, Word doc, PDF, Outlook email, ZIP, GZ, BZ2 #25

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,30 @@
<version>4.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-scratchpad</artifactId>
<version>4.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.pdfbox</groupId>
<artifactId>pdfbox</artifactId>
<version>2.0.19</version>
</dependency>

<dependency>
<groupId>com.auxilii.msgparser</groupId>
<artifactId>msgparser</artifactId>
<version>1.1.15</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.20</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
254 changes: 242 additions & 12 deletions src/main/java/org/ebyhr/presto/flex/FlexRecordCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,34 @@
import io.prestosql.spi.connector.RecordCursor;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.type.Type;
import org.apache.poi.extractor.POIOLE2TextExtractor;
import org.apache.poi.extractor.POITextExtractor;
import org.apache.poi.hsmf.exceptions.ChunkNotFoundException;
import org.apache.poi.hwpf.extractor.WordExtractor;
import org.apache.poi.ooxml.extractor.ExtractorFactory;
import org.apache.poi.openxml4j.exceptions.OpenXML4JException;
import org.apache.poi.poifs.filesystem.POIFSFileSystem;
import org.apache.poi.hslf.extractor.PowerPointExtractor;
import org.apache.xmlbeans.XmlException;
import org.ebyhr.presto.flex.operator.FilePlugin;
import org.ebyhr.presto.flex.operator.PluginFactory;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.text.PDFTextStripper;
import org.apache.pdfbox.text.TextPosition;
import com.auxilii.msgparser.Message;
import com.auxilii.msgparser.MsgParser;

import java.io.*;
import java.util.Enumeration;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

Expand Down Expand Up @@ -62,25 +83,234 @@ public FlexRecordCursor(List<FlexColumnHandle> columnHandles, SchemaTableName sc
fieldToColumnIndex[i] = columnHandle.getOrdinalPosition();
}


URI uri = URI.create(schemaTableName.getTableName());
String tblName = schemaTableName.getTableName();
URI uri = null;
ByteSource byteSource;

try {
byteSource = Resources.asByteSource(uri.toURL());
byteSource = Resources.asByteSource(URI.create(tblName).toURL());
} catch (MalformedURLException e) {
throw new RuntimeException(e.getMessage());
}

//powerpoint
if (tblName.endsWith(".ppt") || tblName.endsWith(".pptx") || tblName.contains(".ppt?") || tblName.contains(".pptx?")) {
PowerPointExtractor powerPointExtractor = null;
try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) {
try {
powerPointExtractor = new PowerPointExtractor(input);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
ArrayList<String> lst = new ArrayList<>();
lst.add(powerPointExtractor.getText());
String ntes = powerPointExtractor.getNotes();
if (ntes != null && ntes.length() > 0)
lst.add(ntes);
lines = lst.iterator();
totalBytes = input.getCount();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
try { if (powerPointExtractor != null)
powerPointExtractor.close();
} catch (Exception e) { /* ignored */ }
}
}
//word doc
else if (tblName.endsWith(".doc") || tblName.endsWith(".docx") || tblName.contains(".doc?") || tblName.contains(".docx?")) {
WordExtractor wordExtractor = null;
try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) {
try {
wordExtractor = new WordExtractor(input);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
ArrayList<String> lst = new ArrayList<>();
String[] paragraphText = wordExtractor.getParagraphText();
for (String paragraph : paragraphText) {
lst.add(paragraph.replaceAll("\u0007", "").replaceAll("\f", "").replaceAll("\r", "").replaceAll("\n", "").replaceAll("\u0015", ""));
}
String hdr = wordExtractor.getHeaderText();
if (hdr != null && hdr.length() > 0)
lst.add(hdr);
String ftr = wordExtractor.getFooterText();
if (ftr != null && ftr.length() > 0)
lst.add(ftr);
lines = lst.iterator();
totalBytes = input.getCount();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
try { if (wordExtractor != null)
wordExtractor.close();
} catch (Exception e) { /* ignored */ }
}
}
//pdf
else if (tblName.endsWith(".pdf") || tblName.contains(".pdf?")) {
PDDocument pddDocument = null;
try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) {
try {
pddDocument = PDDocument.load(input);
} catch (MalformedURLException e) {
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
PDFTextStripper textStripper = null;
try {
textStripper = new PDFTextStripper();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
String rawText = null;
try {
rawText = textStripper.getText(pddDocument);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

// split by whitespace
String rawLines[] = rawText.split("\\r?\\n");
ArrayList<String> lst = new ArrayList<>();
for (String line : rawLines) {
lst.add(line);
}
lines = lst.iterator();
totalBytes = input.getCount();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
try { if (pddDocument != null)
pddDocument.close();
} catch (Exception e) { /* ignored */ }
}
}
//outlook email (unlikely to get from http though!)
else if (tblName.endsWith(".msg")) {
MsgParser msgp = null;
Message msg = null;
String from_email = null;
String from_name = null;
String subject = null;
String body = null;
String to_list = null;
String cc_list = null;
String bcc_list = null;
try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) {
try {
msgp = new MsgParser();
msg = msgp.parseMsg(input);
from_email = msg.getFromEmail();
from_name = msg.getFromName();
subject = msg.getSubject();
body = msg.getBodyText();
to_list = msg.getDisplayTo();
cc_list = msg.getDisplayCc();
bcc_list = msg.getDisplayBcc();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
ArrayList<String> lst = new ArrayList<>();
lst.add("Attachments -" + msg.getAttachments().size());
lst.add("from_email " + from_email);
lst.add("from_name " + from_name);
lst.add("to_list " + to_list);
lst.add("cc_list " + cc_list);
lst.add("bcc_list " + bcc_list);
lst.add("subject " + subject);
lst.add("body " + body);
lines = lst.iterator();
totalBytes = input.getCount();

try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) {
lines = plugin.getIterator(byteSource);
if (plugin.skipFirstLine()) {
lines.next();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
totalBytes = input.getCount();
}
catch (IOException e) {
throw new UncheckedIOException(e);
//zip
else if (tblName.endsWith(".zip") || tblName.contains(".zip?")) {
ArrayList<String> lst = new ArrayList<>();
BufferedReader bufferedeReader = null;
try (CountingInputStream input = new CountingInputStream(byteSource.openStream());
ZipInputStream zin = new ZipInputStream(input);
InputStreamReader isr = new InputStreamReader(zin)) {

ZipEntry entry;
bufferedeReader = new BufferedReader(isr);
while ((entry = zin.getNextEntry()) != null) {
String line = bufferedeReader.readLine();
while (line != null) {
lst.add(line);
line = bufferedeReader.readLine();
}
}
lines = lst.iterator();
totalBytes = input.getCount();
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
} finally {
try { if (bufferedeReader != null)
bufferedeReader.close();
} catch (Exception e) { /* ignored */ }
}
}
//gz
else if (tblName.endsWith(".gz") || tblName.endsWith(".gzip") || tblName.contains(".gz?") || tblName.contains(".gzip?")) {
//getting unreadable compressed data back right now!
//todo need to fix with https://stackoverflow.com/a/11093226/8874837 https://www.rgagnon.com/javadetails/java-HttpUrlConnection-with-GZIP-encoding.html
ArrayList<String> lst = new ArrayList<>();
BufferedReader in = null;
try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) {
in = new BufferedReader(new InputStreamReader(new GZIPInputStream(input)));
String inputLine;
while ((inputLine = in.readLine()) != null){
lst.add(inputLine);
}
lines = lst.iterator();
totalBytes = input.getCount();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
try { if (in != null)
in.close();
} catch (Exception e) { /* ignored */ }
}
}
//bz2
else if (tblName.endsWith(".bz2") || tblName.endsWith(".bzip2") || tblName.contains(".bz2?") || tblName.contains(".bzip2?")) {
ArrayList<String> lst = new ArrayList<>();
BufferedReader in = null;
try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) {
in = new BufferedReader(new InputStreamReader(new MultiStreamBZip2InputStream(input)));
String inputLine;
while ((inputLine = in.readLine()) != null){
lst.add(inputLine);
}
lines = lst.iterator();
totalBytes = input.getCount();
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
try { if (in != null)
in.close();
} catch (Exception e) { /* ignored */ }
}
}
else {
//text/csv..etc
try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) {
lines = plugin.getIterator(byteSource);
if (plugin.skipFirstLine()) {
lines.next();
}
totalBytes = input.getCount();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
Comment on lines +96 to +313
Copy link
Member

@ebyhr ebyhr Jun 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to the existing implementation. I wouldn't add file specific logic here.

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.ebyhr.presto.flex;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.compress.compressors.CompressorInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;

/**
* Original code posted in [https://chaosinmotion.blog/2011/07/29/and-another-curiosity-multi-stream-bzip2-files/]
* Handle multistream BZip2 files.
*/
public class MultiStreamBZip2InputStream extends CompressorInputStream {
private InputStream fInputStream;
private BZip2CompressorInputStream fBZip2;

public MultiStreamBZip2InputStream(InputStream in) throws IOException {
fInputStream = in;
fBZip2 = new BZip2CompressorInputStream(in);
}

@Override
public int read() throws IOException {
int ch = fBZip2.read();
if (ch == -1) {
/*
* If this is a multistream file, there will be more data that
* follows that is a valid compressor input stream. Restart the
* decompressor engine on the new segment of the data.
*/
if (fInputStream.available() > 0) {
// Make use of the fact that if we hit EOF, the data for
// the old compressor was deleted already, so we don't need
// to close.
fBZip2 = new BZip2CompressorInputStream(fInputStream);
ch = fBZip2.read();
}
}
return ch;
}

/**
* Read the data from read(). This makes sure we funnel through read so
* we can do our multistream magic.
*/
public int read(byte[] dest, int off, int len) throws IOException {
if ((off < 0) || (len < 0) || (off + len > dest.length)) {
throw new IndexOutOfBoundsException();
}

int i = 1;
int c = read();
if (c == -1) return -1;
dest[off++] = (byte) c;
while (i < len) {
c = read();
if (c == -1) break;
dest[off++] = (byte) c;
++i;
}
return i;
}

public void close() throws IOException {
fBZip2.close();
fInputStream.close();
}
}