Skip to content

Commit

Permalink
DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2 (a…
Browse files Browse the repository at this point in the history
…pache#2485)

* DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2

Also includes:

* DRILL-8159: Convert the HTTPD reader to use EVF V2

* Build fix

* Changes from review comments

* Fix test issue
  • Loading branch information
paul-rogers authored and jnturton committed Jul 11, 2022
1 parent 5ffcfb0 commit e34b035
Show file tree
Hide file tree
Showing 23 changed files with 555 additions and 418 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,36 +41,29 @@
import java.io.InputStream;
import java.io.InputStreamReader;

public class HttpdLogBatchReader implements ManagedReader<FileSchemaNegotiator> {
public class HttpdLogBatchReader implements ManagedReader {

private static final Logger logger = LoggerFactory.getLogger(HttpdLogBatchReader.class);
public static final String RAW_LINE_COL_NAME = "_raw";
public static final String MATCHED_COL_NAME = "_matched";
private final HttpdLogFormatConfig formatConfig;
private final int maxRecords;
private final EasySubScan scan;
private HttpdParser parser;
private FileSplit split;
private final HttpdParser parser;
private final FileDescrip file;
private InputStream fsStream;
private RowSetLoader rowWriter;
private final RowSetLoader rowWriter;
private BufferedReader reader;
private int lineNumber;
private CustomErrorContext errorContext;
private ScalarWriter rawLineWriter;
private ScalarWriter matchedWriter;
private final CustomErrorContext errorContext;
private final ScalarWriter rawLineWriter;
private final ScalarWriter matchedWriter;
private int errorCount;


public HttpdLogBatchReader(HttpdLogFormatConfig formatConfig, int maxRecords, EasySubScan scan) {
public HttpdLogBatchReader(HttpdLogFormatConfig formatConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
this.formatConfig = formatConfig;
this.maxRecords = maxRecords;
this.scan = scan;
}

@Override
public boolean open(FileSchemaNegotiator negotiator) {
// Open the input stream to the log file
openFile(negotiator);
file = negotiator.file();
openFile();
errorContext = negotiator.parentErrorContext();
try {
parser = new HttpdParser(
Expand All @@ -92,7 +86,6 @@ public boolean open(FileSchemaNegotiator negotiator) {
parser.addFieldsToParser(rowWriter);
rawLineWriter = addImplicitColumn(RAW_LINE_COL_NAME, MinorType.VARCHAR);
matchedWriter = addImplicitColumn(MATCHED_COL_NAME, MinorType.BIT);
return true;
}

@Override
Expand All @@ -108,11 +101,6 @@ public boolean next() {
private boolean nextLine(RowSetLoader rowWriter) {
String line;

// Check if the limit has been reached
if (rowWriter.limitReached(maxRecords)) {
return false;
}

try {
line = reader.readLine();
if (line == null) {
Expand Down Expand Up @@ -164,19 +152,19 @@ public void close() {
try {
fsStream.close();
} catch (IOException e) {
logger.warn("Error when closing HTTPD file: {} {}", split.getPath().toString(), e.getMessage());
logger.warn("Error when closing HTTPD file: {} {}", file.split().getPath().toString(), e.getMessage());
}
fsStream = null;
}

private void openFile(FileSchemaNegotiator negotiator) {
split = negotiator.split();
private void openFile() {
Path path = file.split().getPath();
try {
fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
fsStream = file.fileSystem().openPossiblyCompressedStream(path);
} catch (Exception e) {
throw UserException
.dataReadError(e)
.message("Failed to open open input file: %s", split.getPath().toString())
.message("Failed to open open input file: %s", path.toString())
.addContext(e.getMessage())
.build(logger);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
Expand All @@ -41,18 +41,16 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
private static class HttpLogReaderFactory extends FileReaderFactory {

private final HttpdLogFormatConfig config;
private final int maxRecords;
private final EasySubScan scan;

private HttpLogReaderFactory(HttpdLogFormatConfig config, int maxRecords, EasySubScan scan) {
private HttpLogReaderFactory(HttpdLogFormatConfig config, EasySubScan scan) {
this.config = config;
this.maxRecords = maxRecords;
this.scan = scan;
}

@Override
public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
return new HttpdLogBatchReader(config, maxRecords, scan);
public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
return new HttpdLogBatchReader(config, scan, negotiator);
}
}

Expand All @@ -76,24 +74,15 @@ private static EasyFormatConfig easyConfig(Configuration fsConf, HttpdLogFormatC
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
.readerOperatorType(OPERATOR_TYPE)
.scanVersion(ScanFrameworkVersion.EVF_V1)
.scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.build();
}

@Override
public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
EasySubScan scan, OptionManager options) {
return new HttpdLogBatchReader(formatConfig, scan.getMaxRecords(), scan);
}

@Override
protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
builder.setReaderFactory(new HttpLogReaderFactory(formatConfig, scan.getMaxRecords(), scan));

initScanBuilder(builder, scan);
protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
return builder;
builder.readerFactory(new HttpLogReaderFactory(formatConfig, scan));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ public TupleMetadata setupParser()

dummy.addParseTarget(String.class.getMethod("indexOf", String.class), allParserPaths);

/*
If the column is not requested explicitly, remove it from the requested path list.
*/
// If the column is not requested explicitly, remove it from the requested path list.
if (!isStarQuery() &&
!isMetadataQuery() &&
!isOnlyImplicitColumns()) {
Expand Down

0 comments on commit e34b035

Please sign in to comment.