Skip to content

Commit

Permalink
Tested extended provided types
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-rogers committed Apr 27, 2020
1 parent 360a7a3 commit 7eefe01
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 56 deletions.
Expand Up @@ -27,15 +27,15 @@
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
import org.apache.drill.exec.store.easy.json.parser.ValueParser;
import org.apache.drill.exec.store.easy.json.values.BinaryValueListener;
import org.apache.drill.exec.store.easy.json.values.DateValueListener;
import org.apache.drill.exec.store.easy.json.values.UtcDateValueListener;
import org.apache.drill.exec.store.easy.json.values.DecimalValueListener;
import org.apache.drill.exec.store.easy.json.values.IntervalValueListener;
import org.apache.drill.exec.store.easy.json.values.StrictBigIntValueListener;
import org.apache.drill.exec.store.easy.json.values.StrictDoubleValueListener;
import org.apache.drill.exec.store.easy.json.values.StrictIntValueListener;
import org.apache.drill.exec.store.easy.json.values.StrictStringValueListener;
import org.apache.drill.exec.store.easy.json.values.TimeValueListener;
import org.apache.drill.exec.store.easy.json.values.TimestampValueListener;
import org.apache.drill.exec.store.easy.json.values.UtcTimestampValueListener;

import com.fasterxml.jackson.core.JsonToken;

Expand Down Expand Up @@ -168,7 +168,7 @@ private BaseExtendedValueParser numberIntParser(FieldDefn fieldDefn, boolean isA

private BaseExtendedValueParser dateParser(FieldDefn fieldDefn, boolean isArray) {
return new MongoDateValueParser(fieldDefn.parser(),
new TimestampValueListener(loader(),
new UtcTimestampValueListener(loader(),
fieldDefn.scalarWriterFor(MinorType.TIMESTAMP, isArray)));
}

Expand All @@ -188,7 +188,7 @@ private BaseExtendedValueParser oidParser(FieldDefn fieldDefn, boolean isArray)
private BaseExtendedValueParser dateDayParser(FieldDefn fieldDefn, boolean isArray) {
return new SimpleExtendedValueParser(
fieldDefn.parser(), ExtendedTypeNames.DATE_DAY,
new DateValueListener(loader(),
new UtcDateValueListener(loader(),
fieldDefn.scalarWriterFor(MinorType.DATE, isArray)));
}

Expand Down
Expand Up @@ -17,22 +17,20 @@
*/
package org.apache.drill.exec.store.easy.json.values;

import java.time.Duration;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;

import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
import org.apache.drill.exec.vector.accessor.ScalarWriter;

import com.fasterxml.jackson.core.JsonToken;

/**
* Drill-specific extension to allow dates only.
* <p>
* Drill dates are in the local time zone, so conversion is needed.
* Drill dates are stores in ms, which is odd.
* Parse local time dates. Stored internally in a local epoch
* offset from the local epoch, in ms. Does no time zone conversions,
* simply asserts that the date is in in the same time zone as the
* Drillbit.
*/
public class DateValueListener extends ScalarListener {

Expand All @@ -46,6 +44,9 @@ public void onValue(JsonToken token, TokenIterator tokenizer) {
case VALUE_NULL:
setNull();
break;
case VALUE_NUMBER_INT:
writer.setLong(tokenizer.longValue());
break;
case VALUE_STRING:
try {

Expand All @@ -55,9 +56,9 @@ public void onValue(JsonToken token, TokenIterator tokenizer) {
// want to copy the offset since the epoch from UTC to our local
// time, so that we retain the date, even if the span of the date
// is different locally than UTC. A mess.
LocalDate localDate = LocalDate.parse(tokenizer.stringValue(), DateUtility.isoFormatDate);
ZonedDateTime utc = localDate.atStartOfDay(ZoneOffset.UTC);
writer.setLong(utc.toEpochSecond() * 1000);
LocalDate localDate = LocalDate.parse(tokenizer.stringValue());
writer.setLong(Duration.between(TimestampValueListener.LOCAL_EPOCH,
localDate.atStartOfDay()).toMillis());
} catch (Exception e) {
throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
}
Expand Down
Expand Up @@ -47,6 +47,9 @@ public void onValue(JsonToken token, TokenIterator tokenizer) {
case VALUE_NULL:
setNull();
break;
case VALUE_NUMBER_INT:
writer.setInt((int) tokenizer.longValue());
break;
case VALUE_STRING:
try {
LocalTime localTime = LocalTime.parse(tokenizer.stringValue(), TIME_FORMAT);
Expand Down
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.drill.exec.store.easy.json.values;

import java.time.Instant;
import java.time.ZoneId;
import java.time.Duration;
import java.time.LocalDateTime;

import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
Expand All @@ -27,46 +27,37 @@
import com.fasterxml.jackson.core.JsonToken;

/**
* Per the <a href="https://docs.mongodb.com/manual/reference/mongodb-extended-json-v1/#bson.data_date">
* V1 docs</a>:
* <quote>
* In Strict mode, {@code <date>} is an ISO-8601 date format with a mandatory time zone field
* following the template YYYY-MM-DDTHH:mm:ss.mmm<+/-Offset>.
* <p>
* In Shell mode, {@code <date>} is the JSON representation of a 64-bit signed
* integer giving the number of milliseconds since epoch UTC.
* </quote>
* <p>
* Drill dates are in the local time zone, so conversion is needed.
* Drill-flavored version of a timestamp parser. Assumes the date time is in
* a local (unspecified) time zone, interpreted to be the default time zone
* of the Drillbit machine. Does no time zone conversions.
*/
public class TimestampValueListener extends ScalarListener {

private final ZoneId localZoneId = ZoneId.systemDefault();
public static final LocalDateTime LOCAL_EPOCH = LocalDateTime.of(1970, 1, 1, 0, 0, 0);

public TimestampValueListener(JsonLoaderImpl loader, ScalarWriter writer) {
super(loader, writer);
}

@Override
public void onValue(JsonToken token, TokenIterator tokenizer) {
Instant instant;
switch (token) {
case VALUE_NULL:
setNull();
return;
case VALUE_NUMBER_INT:
instant = Instant.ofEpochMilli(tokenizer.longValue());
writer.setLong(tokenizer.longValue());
break;
case VALUE_STRING:
try {
instant = Instant.parse(tokenizer.stringValue());
LocalDateTime localDT = LocalDateTime.parse(tokenizer.stringValue());
writer.setLong(Duration.between(LOCAL_EPOCH, localDT).toMillis());
} catch (Exception e) {
throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
}
break;
default:
throw tokenizer.invalidValue(token);
}
writer.setLong(instant.toEpochMilli() + localZoneId.getRules().getOffset(instant).getTotalSeconds() * 1000);
}
}
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.json.values;

import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;

import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
import org.apache.drill.exec.vector.accessor.ScalarWriter;

import com.fasterxml.jackson.core.JsonToken;

/**
* Drill-specific extension to allow dates only, expressed in UTC
* to be consistent with Mongo timestamps.
* <p>
* Drill dates are in the local time zone, so conversion is needed.
* Drill dates are stored in ms, which is odd.
*/
public class UtcDateValueListener extends ScalarListener {

public UtcDateValueListener(JsonLoaderImpl loader, ScalarWriter writer) {
super(loader, writer);
}

@Override
public void onValue(JsonToken token, TokenIterator tokenizer) {
switch (token) {
case VALUE_NULL:
setNull();
break;
case VALUE_NUMBER_INT:
writer.setLong(tokenizer.longValue());
break;
case VALUE_STRING:
try {

// A Drill date is ms since the epoch, local time. Our input
// is in UTC. We DO NOT want to convert from the date, midnight, UTC
// to local time since that will change the date. Instead, we just
// want to copy the offset since the epoch from UTC to our local
// time, so that we retain the date, even if the span of the date
// is different locally than UTC. A mess.
LocalDate localDate = LocalDate.parse(tokenizer.stringValue(), DateUtility.isoFormatDate);
ZonedDateTime utc = localDate.atStartOfDay(ZoneOffset.UTC);
writer.setLong(utc.toEpochSecond() * 1000);
} catch (Exception e) {
throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
}
break;
default:
throw tokenizer.invalidValue(token);
}
}
}
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.json.values;

import java.time.Instant;
import java.time.ZoneId;

import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
import org.apache.drill.exec.vector.accessor.ScalarWriter;

import com.fasterxml.jackson.core.JsonToken;

/**
* Per the <a href="https://docs.mongodb.com/manual/reference/mongodb-extended-json-v1/#bson.data_date">
* V1 docs</a>:
* <quote>
* In Strict mode, {@code <date>} is an ISO-8601 date format with a mandatory time zone field
* following the template YYYY-MM-DDTHH:mm:ss.mmm<+/-Offset>.
* <p>
* In Shell mode, {@code <date>} is the JSON representation of a 64-bit signed
* integer giving the number of milliseconds since epoch UTC.
* </quote>
* <p>
* Drill dates are in the local time zone, so conversion is needed.
*/
public class UtcTimestampValueListener extends ScalarListener {

private final ZoneId localZoneId = ZoneId.systemDefault();

public UtcTimestampValueListener(JsonLoaderImpl loader, ScalarWriter writer) {
super(loader, writer);
}

@Override
public void onValue(JsonToken token, TokenIterator tokenizer) {
Instant instant;
switch (token) {
case VALUE_NULL:
setNull();
return;
case VALUE_NUMBER_INT:
instant = Instant.ofEpochMilli(tokenizer.longValue());
break;
case VALUE_STRING:
try {
instant = Instant.parse(tokenizer.stringValue());
} catch (Exception e) {
throw loader.dataConversionError(schema(), "date", tokenizer.stringValue());
}
break;
default:
throw tokenizer.invalidValue(token);
}
writer.setLong(instant.toEpochMilli() + localZoneId.getRules().getOffset(instant).getTotalSeconds() * 1000);
}
}
Expand Up @@ -32,6 +32,7 @@
import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.easy.json.loader.BaseJsonLoaderTest.JsonLoaderFixture;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Test;

Expand Down Expand Up @@ -125,14 +126,16 @@ public void testCaseInsensitive() {

@Test
public void testProjection() {
final String json =
"{a: 1, b: [[{x: [[{y: []}]]}]]}\n" +
"{a: 2}\n" +
"{b: \"bar\"}";
String json =
"{a: 10, b: true}\n" +
"{a: 20, b: [\"what?\"]}\n" +
"{a: 30, b: {c: \"oh, my!\"}}" +
"{a: 40}" +
"{a: 50, b: [[{x: [[{y: []}]]}]]}";

JsonLoaderFixture loader = new JsonLoaderFixture();
ProjectionFilter projectionFilter = ProjectionFilter.projectionFilter(
Projections.parse(RowSetTestUtils.projectList("a")), EmptyErrorContext.INSTANCE);
loader.rsLoaderOptions.projectionFilter(projectionFilter);
loader.rsLoaderOptions.projection(
Projections.parse(RowSetTestUtils.projectList("a")));
loader.open(json);
RowSet results = loader.next();
assertNotNull(results);
Expand All @@ -141,9 +144,11 @@ public void testProjection() {
.addNullable("a", MinorType.BIGINT)
.build();
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(1)
.addRow(2)
.addSingleCol(null)
.addRow(10)
.addRow(20)
.addRow(30)
.addRow(40)
.addRow(50)
.build();
RowSetUtilities.verify(expected, results);
assertNull(loader.next());
Expand Down

0 comments on commit 7eefe01

Please sign in to comment.