Skip to content

Commit

Permalink
Runtime: Refactor parallel package. Fix #246
Browse files Browse the repository at this point in the history
  • Loading branch information
minborg committed Sep 13, 2016
1 parent a040ab6 commit bdbac18
Show file tree
Hide file tree
Showing 19 changed files with 78 additions and 32 deletions.
Expand Up @@ -17,7 +17,7 @@
package com.speedment.runtime.db;

import com.speedment.runtime.annotation.Api;
import com.speedment.runtime.stream.HasParallelStrategy;
import com.speedment.runtime.stream.parallel.HasParallelStrategy;

import java.sql.ResultSet;
import java.util.List;
Expand Down
Expand Up @@ -26,7 +26,7 @@
import com.speedment.runtime.db.AsynchronousQueryResult;
import com.speedment.runtime.exception.SpeedmentException;
import com.speedment.runtime.internal.stream.StreamUtil;
import com.speedment.runtime.stream.ParallelStrategy;
import com.speedment.runtime.stream.parallel.ParallelStrategy;

import java.sql.Connection;
import java.sql.PreparedStatement;
Expand Down
Expand Up @@ -16,7 +16,7 @@
*/
package com.speedment.runtime.internal.stream;

import com.speedment.runtime.stream.HasParallelStrategy;
import com.speedment.runtime.stream.parallel.HasParallelStrategy;
import com.speedment.runtime.stream.Pipeline;
import com.speedment.runtime.stream.StreamDecorator;

Expand Down
Expand Up @@ -22,7 +22,7 @@
package com.speedment.runtime.internal.stream;

import com.speedment.runtime.exception.SpeedmentException;
import com.speedment.runtime.stream.ParallelStrategy;
import com.speedment.runtime.stream.parallel.ParallelStrategy;

import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down
Expand Up @@ -14,7 +14,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import java.util.Comparator;
import java.util.Spliterator;
Expand Down
Expand Up @@ -14,9 +14,10 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import com.speedment.runtime.stream.ParallelStrategy;
import com.speedment.runtime.stream.parallel.ParallelStrategy;
import com.speedment.runtime.stream.parallel.ConfigurableIteratorSpliterator;

import java.util.Iterator;
import java.util.Spliterator;
Expand All @@ -31,7 +32,7 @@ public final class ComputeIntensityExtremeParallelStrategy implements ParallelSt

@Override
public <T> Spliterator<T> spliteratorUnknownSize(Iterator<? extends T> iterator, int characteristics) {
return new ConfigurableIteratorSpliterator<>(iterator, characteristics, BATCH_SIZES);
return ConfigurableIteratorSpliterator.of(iterator, characteristics, BATCH_SIZES);
}

}
Expand Up @@ -14,9 +14,10 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import com.speedment.runtime.stream.ParallelStrategy;
import com.speedment.runtime.stream.parallel.ParallelStrategy;
import com.speedment.runtime.stream.parallel.ConfigurableIteratorSpliterator;

import java.util.Iterator;
import java.util.Spliterator;
Expand All @@ -35,7 +36,7 @@ public final class ComputeIntensityHighParallelStrategy implements ParallelStrat

@Override
public <T> Spliterator<T> spliteratorUnknownSize(Iterator<? extends T> iterator, int characteristics) {
return new ConfigurableIteratorSpliterator<>(iterator, characteristics, BATCH_SIZES);
return ConfigurableIteratorSpliterator.of(iterator, characteristics, BATCH_SIZES);
}

}
Expand Up @@ -14,9 +14,10 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import com.speedment.runtime.stream.ParallelStrategy;
import com.speedment.runtime.stream.parallel.ParallelStrategy;
import com.speedment.runtime.stream.parallel.ConfigurableIteratorSpliterator;

import java.util.Iterator;
import java.util.Spliterator;
Expand All @@ -34,7 +35,7 @@ public final class ComputeIntensityMediumParallelStrategy implements ParallelStr

@Override
public <T> Spliterator<T> spliteratorUnknownSize(Iterator<? extends T> iterator, int characteristics) {
return new ConfigurableIteratorSpliterator<>(iterator, characteristics, BATCH_SIZES);
return ConfigurableIteratorSpliterator.of(iterator, characteristics, BATCH_SIZES);
}

}
Expand Up @@ -14,7 +14,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import java.util.stream.IntStream;

Expand Down
Expand Up @@ -14,7 +14,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import java.util.Arrays;
import java.util.Comparator;
Expand All @@ -29,7 +29,7 @@
* @author pemi
* @param <T> type of {@link Spliterator} to implement
*/
public final class ConfigurableIteratorSpliterator<T> implements Spliterator<T> {
public final class ConfigurableIteratorSpliteratorImpl<T> implements Spliterator<T> {

static final int MAX_BATCH = 1 << 25;
private final Iterator<? extends T> iterator;
Expand All @@ -48,7 +48,7 @@ public final class ConfigurableIteratorSpliterator<T> implements Spliterator<T>
* elements.
* @param batchSizes the batch sizes to use for pseudo parallelism
*/
public ConfigurableIteratorSpliterator(Iterator<? extends T> iterator, long size, int characteristics, int[] batchSizes) {
public ConfigurableIteratorSpliteratorImpl(Iterator<? extends T> iterator, long size, int characteristics, int[] batchSizes) {
this.iterator = iterator;
this.sizeEstimate = size;
this.characteristics = (characteristics & Spliterator.CONCURRENT) == 0
Expand All @@ -66,7 +66,7 @@ public ConfigurableIteratorSpliterator(Iterator<? extends T> iterator, long size
* elements.
* @param batchSizes the batch sizes to use for pseudo parallelism
*/
public ConfigurableIteratorSpliterator(Iterator<? extends T> iterator, int characteristics, int[] batchSizes) {
public ConfigurableIteratorSpliteratorImpl(Iterator<? extends T> iterator, int characteristics, int[] batchSizes) {
this.iterator = iterator;
this.sizeEstimate = Long.MAX_VALUE;
this.characteristics = characteristics & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
Expand Down
Expand Up @@ -14,7 +14,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import java.util.Comparator;
import java.util.Spliterator;
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package com.speedment.runtime.stream;

import com.speedment.runtime.stream.parallel.ParallelStrategy;
import com.speedment.runtime.stream.parallel.HasParallelStrategy;
import com.speedment.runtime.annotation.Api;
import com.speedment.runtime.internal.stream.ComposedStreamDecorator;
import com.speedment.runtime.internal.util.Cast;
Expand Down
@@ -0,0 +1,38 @@
/**
*
* Copyright (c) 2006-2016, Speedment, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); You may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.runtime.stream.parallel;

import com.speedment.runtime.internal.stream.parallel.ConfigurableIteratorSpliteratorImpl;
import java.util.Iterator;
import java.util.Spliterator;

/**
*
* @author pemi
* @param <T> type of {@link Spliterator} to implement
*/
public interface ConfigurableIteratorSpliterator<T> extends Spliterator<T> {

static <T> Spliterator<T> of(Iterator<? extends T> iterator, long size, int characteristics, int[] batchSizes) {
return new ConfigurableIteratorSpliteratorImpl<>(iterator, size, characteristics, batchSizes);
}

static <T> Spliterator<T> of(Iterator<? extends T> iterator, int characteristics, int[] batchSizes) {
return new ConfigurableIteratorSpliteratorImpl<>(iterator, characteristics, batchSizes);
}

}
Expand Up @@ -14,7 +14,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.runtime.stream;
package com.speedment.runtime.stream.parallel;

import com.speedment.runtime.annotation.Api;

Expand Down
Expand Up @@ -14,13 +14,14 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.speedment.runtime.stream;
package com.speedment.runtime.stream.parallel;

import com.speedment.runtime.annotation.Api;
import com.speedment.runtime.internal.stream.parallelstrategy.ComputeIntensityExtremeParallelStrategy;
import com.speedment.runtime.internal.stream.parallelstrategy.ComputeIntensityHighParallelStrategy;
import com.speedment.runtime.internal.stream.parallelstrategy.ComputeIntensityMediumParallelStrategy;
import com.speedment.runtime.internal.stream.parallelstrategy.ConfigurableIteratorSpliterator;
import com.speedment.runtime.internal.stream.parallel.ComputeIntensityExtremeParallelStrategy;
import com.speedment.runtime.internal.stream.parallel.ComputeIntensityHighParallelStrategy;
import com.speedment.runtime.internal.stream.parallel.ComputeIntensityMediumParallelStrategy;
import com.speedment.runtime.internal.stream.parallel.ConfigurableIteratorSpliteratorImpl;
import com.speedment.runtime.stream.parallel.ConfigurableIteratorSpliterator;

import java.util.Iterator;
import java.util.Spliterator;
Expand Down Expand Up @@ -63,7 +64,7 @@ public static <T> ParallelStrategy of(final int[] batchSizes) {
return new ParallelStrategy() {
@Override
public <T> Spliterator<T> spliteratorUnknownSize(Iterator<? extends T> iterator, int characteristics) {
return new ConfigurableIteratorSpliterator<>(iterator, characteristics, batchSizes);
return ConfigurableIteratorSpliterator.of(iterator, characteristics, batchSizes);
}
};
}
Expand Down
Expand Up @@ -19,8 +19,9 @@
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import com.speedment.runtime.internal.stream.parallel.ArraySpliterator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down
Expand Up @@ -19,7 +19,7 @@
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import org.junit.Rule;
import org.junit.Test;
Expand Down
Expand Up @@ -19,9 +19,9 @@
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import com.speedment.runtime.stream.ParallelStrategy;
import com.speedment.runtime.stream.parallel.ParallelStrategy;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down
Expand Up @@ -19,8 +19,9 @@
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.speedment.runtime.internal.stream.parallelstrategy;
package com.speedment.runtime.internal.stream.parallel;

import com.speedment.runtime.internal.stream.parallel.SingletonSpliterator;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down

0 comments on commit bdbac18

Please sign in to comment.