Skip to content
This repository has been archived by the owner on Mar 9, 2019. It is now read-only.

Commit

Permalink
Added support for the 'constant' operator, necessary to implement the…
Browse files Browse the repository at this point in the history
… SQL values operator.

Signed-off-by: Jacques Nadeau <jacques@apache.org>
  • Loading branch information
Jason Altekruse authored and jacques-n committed Jun 19, 2013
1 parent 65ffe9b commit 97eb07a
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 4 deletions.
Expand Up @@ -69,6 +69,10 @@ public <T> T getWith(Class<T> c){
public JsonNode path(String name){
return root.path(name);
}

public JsonNode getRoot(){
return root;
}

private static synchronized ObjectMapper getMapper(){
if(MAPPER == null){
Expand Down
@@ -0,0 +1,44 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.drill.common.logical.data;

import com.google.common.base.Preconditions;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.JSONOptions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

@JsonTypeName("constant")
public class Constant extends SourceOperator{

private final JSONOptions content;

@JsonCreator
public Constant(@JsonProperty("content") JSONOptions content){
super();
this.content = content;
Preconditions.checkNotNull(content, "content attribute is required for source operator 'constant'.");
}

public JSONOptions getContent() {
return content;
}

}
6 changes: 3 additions & 3 deletions sandbox/prototype/common/src/test/resources/logback.xml
Expand Up @@ -22,13 +22,13 @@
<pattern>%date %level [%thread] %logger{10} [%file:%line] %msg%n</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- daily rollover -->

<fileNamePattern>/logs/test-common.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- keep 30 days' worth of history -->

<maxHistory>30</maxHistory>
</rollingPolicy>
</appender>

<logger name="org.apache.drill" additivity="false">
<level value="debug" />
<appender-ref ref="SOCKET" />
Expand Down
@@ -0,0 +1,112 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.drill.exec.ref.rops;

import java.io.IOException;
import java.util.Iterator;

import org.apache.drill.common.logical.data.Constant;
import org.apache.drill.exec.ref.IteratorRegistry;
import org.apache.drill.exec.ref.RecordIterator;
import org.apache.drill.exec.ref.RecordPointer;
import org.apache.drill.exec.ref.RunOutcome.OutcomeType;
import org.apache.drill.exec.ref.UnbackedRecord;
import org.apache.drill.exec.ref.exceptions.SetupException;
import org.apache.drill.exec.ref.rse.JSONRecordReader;
import org.apache.drill.exec.ref.rse.RecordReader;

import com.fasterxml.jackson.databind.JsonNode;

public class ConstantROP extends ROPBase<Constant>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanROP.class);

private ConstantIterator iter;
private UnbackedRecord record;

public ConstantROP(Constant config) {
super(config);
record = new UnbackedRecord();
}


@Override
protected void setupIterators(IteratorRegistry registry) throws SetupException {
// try{
super.setupIterators(registry);
// need to assign reader
// throw new IOException();
//}catch(IOException e){
//throw new SetupException("Failure while setting up reader.");
//}
}


@Override
protected RecordIterator getIteratorInternal() {
return new ConstantIterator(ConstantROP.this.config.getContent().getRoot());
}


@Override
public void cleanup(OutcomeType outcome) {
super.cleanup(outcome);
}


class ConstantIterator implements RecordIterator {

Iterator<JsonNode> jsonIter;

ConstantIterator(JsonNode json) {
jsonIter = json.elements();
}

public RecordPointer getRecordPointer(){
return record;
}

public NextOutcome next(){
if ( ! jsonIter.hasNext()){
return NextOutcome.NONE_LEFT;
}
JsonNode contentJSON = ConstantROP.this.config.getContent().getRoot();
if (contentJSON.isArray())
{ // list of constant records was specified
JsonNode node;
node = jsonIter.next();
convertJsonToRP(node, record);
return NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
}
else{
convertJsonToRP(contentJSON, record);
return NextOutcome.NONE_LEFT;
}
}

private void convertJsonToRP(JsonNode node, RecordPointer rp){
record.clear();
record.merge(JSONRecordReader.convert(node));
}

public ROP getParent(){
return ConstantROP.this;
}

}

}
Expand Up @@ -102,7 +102,7 @@ public ROP getParent() {

}

private DataValue convert(JsonNode node) {
public static DataValue convert(JsonNode node) {
if (node == null || node.isNull() || node.isMissingNode()) {
return DataValue.NULL_VALUE;
} else if (node.isArray()) {
Expand Down
@@ -0,0 +1,97 @@
package org.apache.drill.exec.ref.rops;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.data.Constant;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.ref.*;
import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory;
import org.apache.drill.exec.ref.rse.RSERegistry;
import org.apache.drill.exec.ref.values.ScalarValues;
import org.junit.Test;


import java.io.File;
import java.util.Collection;
import java.util.Iterator;

/**
* Created with IntelliJ IDEA.
* User: jaltekruse
* Date: 6/4/13
* Time: 4:15 PM
* To change this template use File | Settings | File Templates.
*/
public class ConstantROPTest {

@Test
public void testConstant(){

ObjectMapper map = DrillConfig.create().getMapper();
Constant con;
try{
con = map.readValue( FileUtils.getResourceAsString("/constant.json"), Constant.class);
if (con == null){
System.out.println("constant is null");
}
System.out.println(con);

ConstantROP rop = new ConstantROP(con);

rop.setupIterators(new IteratorRegistry());
RecordIterator iter = rop.getIteratorInternal();
RecordPointer ptr = iter.getRecordPointer();

int i = 1;
while (iter.next() != RecordIterator.NextOutcome.NONE_LEFT){
System.out.println(ptr);
org.junit.Assert.assertEquals("Integer value in record " + i + " is incorrect.",
ptr.getField(new SchemaPath("c1")), new ScalarValues.IntegerScalar(i));
org.junit.Assert.assertEquals("String value in record " + i + " is incorrect.",
ptr.getField(new SchemaPath("c2")), new ScalarValues.StringScalar("string " + i));
i++;
}
org.junit.Assert.assertEquals("Incorrect number of records returned by 'constant' record iterator.", 3, i - 1);
} catch (Exception ex){ ex.printStackTrace(); }
System.out.println("end test");
}

// not sure if we want to keep this as a test and check the results. Now that the internals of the ConstantROP work
// it might now be worth running the reference intepreter with every build
@Test
public void testRefInterp(){

try{
DrillConfig config = DrillConfig.create();
final String jsonFile = "/constant2.json";
LogicalPlan plan = LogicalPlan.parse(config, FileUtils.getResourceAsString(jsonFile));
org.junit.Assert.assertEquals("Constant operator not read in properly or not recognized as a source operator.",
plan.getGraph().getSources().toString(), "[Constant [memo=null]]");

org.junit.Assert.assertEquals("Edge between constant operator and sink not recognized.",
plan.getGraph().getSinks().toString(), "[Store [memo=output sink]]");

org.junit.Assert.assertEquals("Constant operator not read in properly or not recognized as a sink operator.",
plan.getGraph().getAdjList().getAllEdges().toString(), "[Edge [from=Node [val=Constant [memo=null]], to=Node [val=Store [memo=output sink]]]]");

IteratorRegistry ir = new IteratorRegistry();
ReferenceInterpreter i = new ReferenceInterpreter(plan, ir, new BasicEvaluatorFactory(ir), new RSERegistry(config));
i.setup();
Collection<RunOutcome> outcomes = i.run();

for(RunOutcome outcome : outcomes){
System.out.println("============");
System.out.println(outcome);
if(outcome.outcome == RunOutcome.OutcomeType.FAILED && outcome.exception != null){
outcome.exception.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
4 changes: 4 additions & 0 deletions sandbox/prototype/exec/ref/src/test/resources/constant.json
@@ -0,0 +1,4 @@
{
"op" : "constant",
"content" : [{ "c1" : 1, "c2" : "string 1" }, {"c1" : 2, "c2" : "string 2" }, { "c1" : 3, "c2" : "string 3" }]
}
42 changes: 42 additions & 0 deletions sandbox/prototype/exec/ref/src/test/resources/constant2.json
@@ -0,0 +1,42 @@
{
head:{
type:"apache_drill_logical_plan",
version:"1",
generator:{
type:"manual",
info:"na"
}
},
storage:[
{
type:"console",
name:"console"
},
{
type:"fs",
name:"fs1",
root:"file:///"
},
{
type:"classpath",
name:"cp"
}
],
query:[
{ op:"sequence",
do:[
{
op:"constant",
content : [{ "c1" : 1, "c2" : "string 1" }, {"c1" : 2, "c2" : "string 2" }, { "c1" : 3, "c2" : "string 3" }]
},
{
op: "store",
memo: "output sink",
storageengine: "console",
target: {pipe: "STD_OUT"}
}

]}

]
}

0 comments on commit 97eb07a

Please sign in to comment.