Skip to content

Commit

Permalink
Merge pull request apache#1342 from guobingkun/unit_test
Browse files Browse the repository at this point in the history
Fix a potential issue in DeterminePartitionsJob, and two unit tests, reformat code
  • Loading branch information
fjy committed May 5, 2015
2 parents d7562fd + 1ee550d commit 7608bf5
Show file tree
Hide file tree
Showing 3 changed files with 411 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.joda.time.DateTime;
Expand All @@ -79,13 +78,13 @@
/**
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
* choosing the best dimension that satisfies the criteria:
*
* <p/>
* <ul>
* <li>Must have exactly one value per row.</li>
* <li>Must not generate oversized partitions. A dimension with N rows having the same value will necessarily
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
* </ul>
*
* <p/>
* "Best" means a very high cardinality dimension, or, if none exist, the dimension that minimizes variation of
* segment size relative to the target.
*/
Expand Down Expand Up @@ -221,8 +220,8 @@ public boolean run()
if (Utils.exists(dimSelectionJob, fileSystem, partitionInfoPath)) {
List<ShardSpec> specs = config.jsonMapper.readValue(
Utils.openInputStream(dimSelectionJob, partitionInfoPath), new TypeReference<List<ShardSpec>>()
{
}
{
}
);

List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
Expand Down Expand Up @@ -461,7 +460,7 @@ public void setConf(Configuration config)
private static abstract class DeterminePartitionsDimSelectionBaseReducer
extends Reducer<BytesWritable, Text, BytesWritable, Text>
{
protected static volatile HadoopDruidIndexerConfig config = null;
protected volatile HadoopDruidIndexerConfig config = null;

@Override
protected void setup(Context context)
Expand Down Expand Up @@ -756,13 +755,13 @@ protected void innerReduce(

final List<ShardSpec> chosenShardSpecs = Lists.transform(
chosenPartitions.partitions, new Function<DimPartition, ShardSpec>()
{
@Override
public ShardSpec apply(DimPartition dimPartition)
{
return dimPartition.shardSpec;
}
}
{
@Override
public ShardSpec apply(DimPartition dimPartition)
{
return dimPartition.shardSpec;
}
}
);

log.info("Chosen partitions:");
Expand All @@ -772,12 +771,12 @@ public ShardSpec apply(DimPartition dimPartition)

try {
HadoopDruidIndexerConfig.jsonMapper
.writerWithType(
new TypeReference<List<ShardSpec>>()
{
}
)
.writeValue(out, chosenShardSpecs);
.writerWithType(
new TypeReference<List<ShardSpec>>()
{
}
)
.writeValue(out, chosenShardSpecs);
}
finally {
Closeables.close(out, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexer;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;

@RunWith(Parameterized.class)
public class DeterminePartitionsJobTest
{

private HadoopDruidIndexerConfig config;
private int expectedNumOfSegments;
private int[] expectedNumOfShardsForEachSegment;
private String[][][] expectedStartEndForEachShard;
private File dataFile;
private File tmpDir;

@Parameterized.Parameters(name = "assumeGrouped={0}, "
+ "targetPartitionSize={1}, "
+ "interval={2}"
+ "expectedNumOfSegments={3}, "
+ "expectedNumOfShardsForEachSegment={4}, "
+ "expectedStartEndForEachShard={5}, "
+ "data={6}")
public static Collection<Object[]> constructFeed()
{
return Arrays.asList(
new Object[][]{
{
true,
3L,
"2014-10-22T00:00:00Z/P1D",
1,
new int[]{5},
new String[][][]{
{
{ null, "c.example.com" },
{ "c.example.com", "e.example.com" },
{ "e.example.com", "g.example.com" },
{ "g.example.com", "i.example.com" },
{ "i.example.com", null }
}
},
ImmutableList.of(
"2014102200,a.example.com,CN,100",
"2014102200,b.exmaple.com,US,50",
"2014102200,c.example.com,US,200",
"2014102200,d.example.com,US,250",
"2014102200,e.example.com,US,123",
"2014102200,f.example.com,US,567",
"2014102200,g.example.com,US,11",
"2014102200,h.example.com,US,251",
"2014102200,i.example.com,US,963",
"2014102200,j.example.com,US,333"
)
},
{
false,
3L,
"2014-10-20T00:00:00Z/P1D",
1,
new int[]{5},
new String[][][]{
{
{ null, "c.example.com"},
{ "c.example.com", "e.example.com" },
{ "e.example.com", "g.example.com" },
{ "g.example.com", "i.example.com" },
{ "i.example.com", null }
}
},
ImmutableList.of(
"2014102000,a.example.com,CN,100",
"2014102000,a.example.com,CN,100",
"2014102000,b.exmaple.com,US,50",
"2014102000,b.exmaple.com,US,50",
"2014102000,c.example.com,US,200",
"2014102000,c.example.com,US,200",
"2014102000,d.example.com,US,250",
"2014102000,d.example.com,US,250",
"2014102000,e.example.com,US,123",
"2014102000,e.example.com,US,123",
"2014102000,f.example.com,US,567",
"2014102000,f.example.com,US,567",
"2014102000,g.example.com,US,11",
"2014102000,g.example.com,US,11",
"2014102000,h.example.com,US,251",
"2014102000,h.example.com,US,251",
"2014102000,i.example.com,US,963",
"2014102000,i.example.com,US,963",
"2014102000,j.example.com,US,333",
"2014102000,j.example.com,US,333"
)
},
{
true,
6L,
"2014-10-20T00:00:00Z/P3D",
3,
new int[]{2, 2, 2},
new String[][][]{
{
{ null, "f.example.com" },
{ "f.example.com", null }
},
{
{ null, "f.example.com" },
{ "f.example.com", null }
},
{
{ null, "f.example.com" },
{ "f.example.com", null }
}
},
ImmutableList.of(
"2014102000,a.example.com,CN,100",
"2014102000,b.exmaple.com,CN,50",
"2014102000,c.example.com,CN,200",
"2014102000,d.example.com,US,250",
"2014102000,e.example.com,US,123",
"2014102000,f.example.com,US,567",
"2014102000,g.example.com,US,11",
"2014102000,h.example.com,US,251",
"2014102000,i.example.com,US,963",
"2014102000,j.example.com,US,333",
"2014102000,k.example.com,US,555",
"2014102100,a.example.com,CN,100",
"2014102100,b.exmaple.com,CN,50",
"2014102100,c.example.com,CN,200",
"2014102100,d.example.com,US,250",
"2014102100,e.example.com,US,123",
"2014102100,f.example.com,US,567",
"2014102100,g.example.com,US,11",
"2014102100,h.example.com,US,251",
"2014102100,i.example.com,US,963",
"2014102100,j.example.com,US,333",
"2014102100,k.example.com,US,555",
"2014102200,a.example.com,CN,100",
"2014102200,b.exmaple.com,CN,50",
"2014102200,c.example.com,CN,200",
"2014102200,d.example.com,US,250",
"2014102200,e.example.com,US,123",
"2014102200,f.example.com,US,567",
"2014102200,g.example.com,US,11",
"2014102200,h.example.com,US,251",
"2014102200,i.example.com,US,963",
"2014102200,j.example.com,US,333",
"2014102200,k.example.com,US,555"
)
}
}
);
}

public DeterminePartitionsJobTest(
boolean assumeGrouped,
Long targetPartitionSize,
String interval,
int expectedNumOfSegments,
int[] expectedNumOfShardsForEachSegment,
String[][][] expectedStartEndForEachShard,
List<String> data
) throws IOException
{
this.expectedNumOfSegments = expectedNumOfSegments;
this.expectedNumOfShardsForEachSegment = expectedNumOfShardsForEachSegment;
this.expectedStartEndForEachShard = expectedStartEndForEachShard;

dataFile = File.createTempFile("test_website_data", "tmp");
dataFile.deleteOnExit();
tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();

FileUtils.writeLines(dataFile, data);

config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
"website",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("host", "country"), null, null),
null,
ImmutableList.of("timestamp", "host", "country", "visited_num")
)
),
new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(new Interval(interval))
)
),
new HadoopIOConfig(
ImmutableMap.<String, Object>of(
"paths",
dataFile.getCanonicalPath(),
"type",
"static"
),
null,
tmpDir.getCanonicalPath()
),
new HadoopTuningConfig(
tmpDir.getCanonicalPath(),
null,
new SingleDimensionPartitionsSpec(null, targetPartitionSize, null, assumeGrouped),
null,
null,
null,
false,
false,
false,
false,
null,
false,
false,
false,
null,
null
)
)
);
}

@Test
public void testPartitionJob()
{
DeterminePartitionsJob job = new DeterminePartitionsJob(config);
job.run();

int shardNum = 0;
int segmentNum = 0;
Assert.assertEquals(expectedNumOfSegments, config.getSchema().getTuningConfig().getShardSpecs().size());

for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : config.getSchema()
.getTuningConfig()
.getShardSpecs()
.entrySet()) {
int partitionNum = 0;
List<HadoopyShardSpec> specs = entry.getValue();
Assert.assertEquals(expectedNumOfShardsForEachSegment[segmentNum], specs.size());

for (HadoopyShardSpec spec : specs) {
SingleDimensionShardSpec actualSpec = (SingleDimensionShardSpec) spec.getActualSpec();
Assert.assertEquals(shardNum, spec.getShardNum());
Assert.assertEquals(expectedStartEndForEachShard[segmentNum][partitionNum][0], actualSpec.getStart());
Assert.assertEquals(expectedStartEndForEachShard[segmentNum][partitionNum][1], actualSpec.getEnd());
Assert.assertEquals(partitionNum, actualSpec.getPartitionNum());
shardNum++;
partitionNum++;
}

segmentNum++;
}
}

@After
public void tearDown() throws Exception
{
FileUtils.forceDelete(dataFile);
FileUtils.deleteDirectory(tmpDir);
}
}
Loading

0 comments on commit 7608bf5

Please sign in to comment.