Permalink
Browse files

DRILL-334: Subdivide Drillbit control and data messages. Add memory T…

…opLevel and Child memory allocator with debug mode to capture memory leaks. Various memory leak fixes to get build to complete.
  • Loading branch information...
1 parent 4a83dae commit 72c5553f2bea22b5b0fe58380c446737256a0d90 @jacques-n committed Jan 3, 2014
Showing with 13,475 additions and 9,048 deletions.
  1. +26 −0 common/src/main/java/org/apache/drill/common/config/DrillConfig.java
  2. +1 −1 exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
  3. +18 −18 exec/java-exec/pom.xml
  4. +5 −3 exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
  5. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
  6. +5 −5 exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
  7. +3 −3 exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
  8. +33 −29 exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
  9. +31 −33 exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
  10. +2 −3 exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
  11. +2 −6 exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
  12. +3 −1 exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
  13. +5 −5 exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
  14. +3 −5 exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
  15. +858 −0 exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java
  16. +192 −0 exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
  17. +112 −0 exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
  18. +14 −28 exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
  19. +0 −65 exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
  20. +51 −0 exec/java-exec/src/main/java/org/apache/drill/exec/memory/OutOfMemoryException.java
  21. +161 −0 exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
  22. +4 −5 exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java
  23. +101 −54 exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
  24. +62 −0 exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
  25. +26 −0 exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
  26. +10 −3 exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
  27. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
  28. +4 −4 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
  29. +4 −0 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
  30. +22 −7 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
  31. +25 −28 ...ava/org/apache/drill/exec/{work/foreman/TunnelManager.java → physical/impl/SendingAccountor.java}
  32. +15 −6 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
  33. +5 −1 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
  34. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
  35. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
  36. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
  37. +9 −0 ...java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
  38. +250 −180 ...main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
  39. +35 −16 ...a-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
  40. +23 −8 ...ec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
  41. +22 −21 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
  42. +22 −14 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
  43. +7 −0 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
  44. +9 −1 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
  45. +5 −1 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
  46. +11 −23 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
  47. +5 −0 ...ec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
  48. +3 −3 exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
  49. 0 {protocol → exec/java-exec}/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
  50. +14 −4 exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
  51. +8 −2 exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
  52. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
  53. +2 −2 exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
  54. +10 −2 exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
  55. +2 −0 exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
  56. +3 −1 exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
  57. +10 −7 exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
  58. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
  59. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
  60. +12 −8 exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
  61. +11 −12 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
  62. +19 −6 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
  63. +51 −22 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
  64. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
  65. +5 −10 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit → }/FutureBitCommand.java
  66. +7 −11 ...ec/src/main/java/org/apache/drill/exec/rpc/{bit/ListeningBitCommand.java → ListeningCommand.java}
  67. +22 −4 .../java/org/apache/drill/exec/rpc/{ZeroCopyProtobufLengthDecoder.java → ProtobufLengthDecoder.java}
  68. +242 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
  69. +64 −24 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
  70. +85 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java
  71. +0 −2 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
  72. +20 −10 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
  73. +3 −3 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitCommand.java → RpcCommand.java}
  74. +8 −3 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
  75. +0 −99 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
  76. +0 −189 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
  77. +0 −59 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
  78. +2 −2 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit → control}/AvailabilityListener.java
  79. +11 −19 .../java-exec/src/main/java/org/apache/drill/exec/rpc/{bit → control}/ConnectionManagerRegistry.java
  80. +105 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
  81. +28 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlCommand.java
  82. +37 −46 ...src/main/java/org/apache/drill/exec/rpc/{bit/BitConnection.java → control/ControlConnection.java}
  83. +60 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
  84. +40 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java
  85. +9 −11 ...c/src/main/java/org/apache/drill/exec/rpc/{bit/BitRpcConfig.java → control/ControlRpcConfig.java}
  86. +31 −26 ...va-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitServer.java → control/ControlServer.java}
  87. +16 −43 ...va-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitTunnel.java → control/ControlTunnel.java}
  88. +6 −16 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitCom.java → control/Controller.java}
  89. +14 −27 ...-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitComImpl.java → control/ControllerImpl.java}
  90. +8 −8 ...ache/drill/exec/rpc/{bit/BitComDefaultInstanceHandler.java → control/DefaultInstanceHandler.java}
  91. +22 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/SendProgress.java
  92. +113 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
  93. +55 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
  94. +100 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
  95. +80 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
  96. +63 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
  97. +53 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
  98. +58 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java
  99. +38 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java
  100. +31 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
  101. +64 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
  102. +42 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
  103. +128 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
  104. +63 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
  105. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit → data}/SendProgress.java
  106. +10 −8 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
  107. +38 −0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java
  108. +20 −5 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
  109. +8 −1 exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
  110. +3 −3 exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
  111. +23 −8 exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
  112. +22 −11 exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
  113. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
  114. +35 −0 exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
  115. +0 −2 exec/java-exec/src/main/java/org/apache/drill/exec/util/AtomicState.java
  116. +0 −1 exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
  117. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/work/{foreman → }/ErrorHelper.java
  118. +1 −2 exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
  119. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
  120. +33 −16 exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
  121. +6 −8 ...va/org/apache/drill/exec/work/batch/{AbstractFragmentCollector.java → AbstractDataCollector.java}
  122. +0 −211 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
  123. +150 −0 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
  124. +7 −7 .../src/main/java/org/apache/drill/exec/work/batch/{BitComHandler.java → ControlMessageHandler.java}
  125. +6 −6 ...va-exec/src/main/java/org/apache/drill/exec/work/batch/{BatchCollector.java → DataCollector.java}
  126. +13 −15 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
  127. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
  128. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
  129. +19 −8 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
  130. +17 −16 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
  131. +33 −13 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
  132. +8 −7 exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
  133. +1 −1 exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
  134. +29 −22 ...rc/main/java/org/apache/drill/exec/work/foreman/{RunningFragmentManager.java → QueryManager.java}
  135. +39 −0 exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RootStatusReporter.java
  136. +8 −8 ...ache/drill/exec/work/{AbstractFragmentRunnerListener.java → fragment/AbstractStatusReporter.java}
  137. +27 −15 ...c/src/main/java/org/apache/drill/exec/work/{FragmentRunner.java → fragment/FragmentExecutor.java}
  138. +14 −9 ...in/java/org/apache/drill/exec/work/fragment/{IncomingFragmentHandler.java → FragmentManager.java}
  139. +17 −17 ...va/org/apache/drill/exec/work/fragment/{RemoteFragmentHandler.java → NonRootFragmentManager.java}
  140. +8 −9 .../apache/drill/exec/work/{RemoteFragmentRunnerListener.java → fragment/NonRootStatusReporter.java}
  141. +13 −10 ...n/java/org/apache/drill/exec/work/fragment/{LocalFragmentHandler.java → RootFragmentManager.java}
  142. +6 −3 ...main/java/org/apache/drill/exec/work/{FragmentRunnerListener.java → fragment/StatusReporter.java}
  143. +2 −2 exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
  144. +6 −3 exec/java-exec/src/main/resources/drill-module.conf
  145. +6 −5 exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
  146. +8 −7 exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
  147. +2 −2 exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
  148. +8 −1 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
  149. +4 −2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
  150. +105 −72 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
  151. +12 −10 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
  152. +4 −2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
  153. +6 −4 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
  154. +10 −8 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
  155. +7 −5 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
  156. +1 −1 ...ava-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
  157. +7 −8 ...est/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java
  158. +4 −2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
  159. +6 −4 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
  160. +4 −2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
  161. +4 −2 .../java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
  162. +4 −2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
  163. +4 −2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
  164. +2 −4 exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
  165. +5 −7 exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
  166. +2 −2 exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
  167. +110 −38 exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
  168. +7 −7 exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
  169. +9 −4 exec/java-exec/src/test/resources/logback.xml
  170. +5,438 −0 protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
  171. +2,228 −0 protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
  172. +160 −79 protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java
  173. +212 −6,566 protocol/src/main/java/org/apache/drill/exec/proto/ExecProtos.java
  174. +102 −330 protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
  175. +136 −45 protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
  176. +83 −0 protocol/src/main/protobuf/BitControl.proto
  177. +34 −0 protocol/src/main/protobuf/BitData.proto
  178. +3 −2 protocol/src/main/protobuf/Coordination.proto
  179. +0 −83 protocol/src/main/protobuf/ExecutionProtos.proto
  180. +3 −2 protocol/src/main/protobuf/User.proto
  181. +18 −17 protocol/src/main/protobuf/UserBitShared.proto
  182. +1 −1 sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java
View
26 common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.common.config;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
import java.net.URL;
import java.util.Collection;
import java.util.List;
@@ -35,20 +37,26 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public final class DrillConfig extends NestedConfig{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
private final ObjectMapper mapper;
+ private final ImmutableList<String> startupArguments;
+ @SuppressWarnings("restriction") private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory();
@SuppressWarnings("unchecked")
private volatile List<Queue<Object>> sinkQueues = new CopyOnWriteArrayList<Queue<Object>>(new Queue[1]);
+
+ @SuppressWarnings("restriction")
@VisibleForTesting
public DrillConfig(Config config) {
super(config);
+
mapper = new ObjectMapper();
SimpleModule deserModule = new SimpleModule("LogicalExpressionDeserializationModule")
.addDeserializer(LogicalExpression.class, new LogicalExpression.De(this));
@@ -61,8 +69,16 @@ public DrillConfig(Config config) {
mapper.registerSubtypes(LogicalOperatorBase.getSubTypes(this));
mapper.registerSubtypes(StorageEngineConfigBase.getSubTypes(this));
+ RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
+ this.startupArguments = ImmutableList.copyOf(bean.getInputArguments());
};
+
+
+
+ public List<String> getStartupArguments(){
+ return startupArguments;
+ }
/**
* Create a DrillConfig object using the default config file name
@@ -155,4 +171,14 @@ public ObjectMapper getMapper(){
public String toString(){
return this.root().render();
}
+
+ public static void main(String[] args) throws Exception{
+ //"-XX:MaxDirectMemorySize"
+ DrillConfig config = DrillConfig.create();
+
+ }
+
+ public static long getMaxDirectMemory(){
+ return MAX_DIRECT_MEMORY;
+ }
}
View
2 exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
@@ -30,7 +30,7 @@
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
-final class PooledUnsafeDirectByteBufL extends PooledByteBufL<ByteBuffer> {
+public final class PooledUnsafeDirectByteBufL extends PooledByteBufL<ByteBuffer> {
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
View
36 exec/java-exec/pom.xml
@@ -1,21 +1,16 @@
<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>exec-parent</artifactId>
@@ -37,6 +32,11 @@
<version>4.1</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>2.1</version>
+ </dependency>
+ <dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.5.6</version>
@@ -273,7 +273,7 @@
</resource>
<resource>
<directory>target/generated-sources</directory>
-<!-- <include>*/org</include> -->
+ <!-- <include>*/org</include> -->
<filtering>true</filtering>
</resource>
</resources>
View
8 exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -47,7 +47,9 @@
public static final String TRACE_DUMP_FILESYSTEM = "drill.exec.trace.filesystem";
public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories";
public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem";
- public static final String SPOOLING_BUFFER_IMPL = "drill.exec.spooling.impl";
- public static final String SPOOLING_BUFFER_DELETE = "drill.exec.spooling.delete";
- public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.spooling.size";
+ public static final String INCOMING_BUFFER_IMPL = "drill.exec.buffer.impl";
+ public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; // incoming buffer size (number of batches)
+ public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete";
+ public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
+
}
View
2 exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -20,8 +20,8 @@
import java.io.Closeable;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
public interface DistributedCache extends Closeable{
View
10 exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
@@ -19,10 +19,10 @@
import java.util.concurrent.TimeUnit;
-public interface DistributedMap<V> {
+public interface DistributedMap<V extends DrillSerializable> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class);
- public DrillSerializable get(String key);
- public void put(String key, DrillSerializable value);
- public void putIfAbsent(String key, DrillSerializable value);
- public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeUnit);
+ public V get(String key);
+ public void put(String key, V value);
+ public void putIfAbsent(String key, V value);
+ public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit);
}
View
6 exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
@@ -19,8 +19,8 @@
import java.util.Collection;
-public interface DistributedMultiMap<V> {
+public interface DistributedMultiMap<V extends DrillSerializable> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMultiMap.class);
- public Collection<DrillSerializable> get(String key);
- public void put(String key, DrillSerializable value);
+ public Collection<V> get(String key);
+ public void put(String key, V value);
}
View
62 exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -22,23 +22,28 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.hazelcast.config.SerializerConfig;
-import com.hazelcast.core.*;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
import com.hazelcast.config.Config;
-import org.apache.drill.exec.server.DrillbitContext;
+import com.hazelcast.config.SerializerConfig;
+import com.hazelcast.core.DuplicateInstanceNameException;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
public class HazelCache implements DistributedCache {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
@@ -67,8 +72,9 @@ public void onMessage(Message<HWorkQueueStatus> wrapped) {
public void run() {
Config c = new Config();
- SerializerConfig sc = new SerializerConfig().setImplementation(new HCVectorAccessibleSerializer(allocator))
- .setTypeClass(VectorAccessibleSerializable.class);
+ SerializerConfig sc = new SerializerConfig() //
+ .setImplementation(new HCVectorAccessibleSerializer(allocator)) //
+ .setTypeClass(VectorAccessibleSerializable.class);
c.setInstanceName(instanceName);
c.getSerializationConfig().addSerializerConfig(sc);
instance = getInstanceOrCreateNew(c);
@@ -119,64 +125,62 @@ public void storeFragment(PlanFragment fragment) {
@Override
public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
- return new HCDistributedMultiMapImpl(this.instance.getMultiMap(clazz.toString()), clazz);
+ com.hazelcast.core.MultiMap<String, V> mmap = this.instance.getMultiMap(clazz.toString());
+ return new HCDistributedMultiMapImpl<V>(mmap, clazz);
}
@Override
public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
- return new HCDistributedMapImpl(this.instance.getMap(clazz.toString()), clazz);
+ IMap<String, V> imap = this.instance.getMap(clazz.toString());
+ return new HCDistributedMapImpl<V>(imap, clazz);
}
@Override
public Counter getCounter(String name) {
return new HCCounterImpl(this.instance.getAtomicLong(name));
}
- public static class HCDistributedMapImpl<V> implements DistributedMap<V> {
- private IMap<String, DrillSerializable> m;
- private Class<V> clazz;
+ public static class HCDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
+ private IMap<String, V> m;
- public HCDistributedMapImpl(IMap m, Class<V> clazz) {
+ public HCDistributedMapImpl(IMap<String, V> m, Class<V> clazz) {
this.m = m;
- this.clazz = clazz;
}
- public DrillSerializable get(String key) {
+ public V get(String key) {
return m.get(key);
}
- public void put(String key, DrillSerializable value) {
+ public void put(String key, V value) {
m.put(key, value);
}
- public void putIfAbsent(String key, DrillSerializable value) {
+ public void putIfAbsent(String key, V value) {
m.putIfAbsent(key, value);
}
- public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeunit) {
+ public void putIfAbsent(String key, V value, long ttl, TimeUnit timeunit) {
m.putIfAbsent(key, value, ttl, timeunit);
}
}
- public static class HCDistributedMultiMapImpl<V> implements DistributedMultiMap<V> {
- private com.hazelcast.core.MultiMap<String, DrillSerializable> mmap;
- private Class<V> clazz;
+ public static class HCDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
+ private com.hazelcast.core.MultiMap<String, V> mmap;
- public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap mmap, Class<V> clazz) {
+ public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap<String, V> mmap, Class<V> clazz) {
this.mmap = mmap;
- this.clazz = clazz;
}
- public Collection<DrillSerializable> get(String key) {
- List<DrillSerializable> list = Lists.newArrayList();
- for (DrillSerializable v : mmap.get(key)) {
+ public Collection<V> get(String key) {
+ List<V> list = Lists.newArrayList();
+ for (V v : mmap.get(key)) {
list.add(v);
}
return list;
}
@Override
- public void put(String key, DrillSerializable value) {
+ public void put(String key, V value) {
mmap.put(key, value);
}
}
View
64 exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.cache;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
@@ -27,29 +26,28 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.server.DrillbitContext;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
public class LocalCache implements DistributedCache {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
private volatile Map<FragmentHandle, PlanFragment> handles;
- private volatile ConcurrentMap<Class, DistributedMap> maps;
- private volatile ConcurrentMap<Class, DistributedMultiMap> multiMaps;
+ private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps;
+ private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps;
private volatile ConcurrentMap<String, Counter> counters;
- private static final BufferAllocator allocator = BufferAllocator.getAllocator(DrillConfig.create());
+ private static final BufferAllocator allocator = new TopLevelAllocator();
@Override
public void close() throws IOException {
@@ -66,22 +64,22 @@ public void run() throws DrillbitStartupException {
@Override
public PlanFragment getFragment(FragmentHandle handle) {
- logger.debug("looking for fragment with handle: {}", handle);
+// logger.debug("looking for fragment with handle: {}", handle);
return handles.get(handle);
}
@Override
public void storeFragment(PlanFragment fragment) {
- logger.debug("Storing fragment: {}", fragment);
+// logger.debug("Storing fragment: {}", fragment);
handles.put(fragment.getHandle(), fragment);
}
@Override
public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
- DistributedMultiMap mmap = multiMaps.get(clazz);
+ DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) multiMaps.get(clazz);
if (mmap == null) {
multiMaps.putIfAbsent(clazz, new LocalDistributedMultiMapImpl<V>(clazz));
- return multiMaps.get(clazz);
+ return (DistributedMultiMap<V>) multiMaps.get(clazz);
} else {
return mmap;
}
@@ -91,8 +89,8 @@ public void storeFragment(PlanFragment fragment) {
public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
DistributedMap m = maps.get(clazz);
if (m == null) {
- maps.putIfAbsent(clazz, new LocalDistributedMapImpl(clazz));
- return maps.get(clazz);
+ maps.putIfAbsent(clazz, new LocalDistributedMapImpl<V>(clazz));
+ return (DistributedMap<V>) maps.get(clazz);
} else {
return m;
}
@@ -119,29 +117,29 @@ public static ByteArrayDataOutput serialize(DrillSerializable obj) {
return out;
}
- public static DrillSerializable deserialize(byte[] bytes, Class clazz) {
+ public static <V extends DrillSerializable> V deserialize(byte[] bytes, Class<V> clazz) {
ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
try {
- DrillSerializable obj = (DrillSerializable)clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
+ V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
obj.read(in);
return obj;
} catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
- public static class LocalDistributedMultiMapImpl<V> implements DistributedMultiMap<V> {
+ public static class LocalDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
private ArrayListMultimap<String, ByteArrayDataOutput> mmap;
- private Class<DrillSerializable> clazz;
+ private Class<V> clazz;
- public LocalDistributedMultiMapImpl(Class clazz) {
+ public LocalDistributedMultiMapImpl(Class<V> clazz) {
mmap = ArrayListMultimap.create();
this.clazz = clazz;
}
@Override
- public Collection<DrillSerializable> get(String key) {
- List<DrillSerializable> list = Lists.newArrayList();
+ public Collection<V> get(String key) {
+ List<V> list = Lists.newArrayList();
for (ByteArrayDataOutput o : mmap.get(key)) {
list.add(deserialize(o.toByteArray(), this.clazz));
}
@@ -154,33 +152,33 @@ public void put(String key, DrillSerializable value) {
}
}
- public static class LocalDistributedMapImpl<V> implements DistributedMap<V> {
+ public static class LocalDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
private ConcurrentMap<String, ByteArrayDataOutput> m;
- private Class<DrillSerializable> clazz;
+ private Class<V> clazz;
- public LocalDistributedMapImpl(Class clazz) {
+ public LocalDistributedMapImpl(Class<V> clazz) {
m = Maps.newConcurrentMap();
this.clazz = clazz;
}
@Override
- public DrillSerializable get(String key) {
+ public V get(String key) {
if (m.get(key) == null) return null;
- return deserialize(m.get(key).toByteArray(), this.clazz);
+ return (V) deserialize(m.get(key).toByteArray(), this.clazz);
}
@Override
- public void put(String key, DrillSerializable value) {
+ public void put(String key, V value) {
m.put(key, serialize(value));
}
@Override
- public void putIfAbsent(String key, DrillSerializable value) {
+ public void putIfAbsent(String key, V value) {
m.putIfAbsent(key, serialize(value));
}
@Override
- public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeUnit) {
+ public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit) {
m.putIfAbsent(key, serialize(value));
logger.warn("Expiration not implemented in local map cache");
}
View
5 exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
@@ -17,11 +17,10 @@
*/
package org.apache.drill.exec.cache;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
-import com.google.protobuf.Parser;
import com.hazelcast.core.HazelcastInstance;
public class ProtoBufImpl {
View
8 exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
@@ -17,16 +17,12 @@
*/
package org.apache.drill.exec.cache;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
-
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
View
4 exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -133,6 +133,7 @@ public void writeToStreamAndRetain(OutputStream output) throws IOException {
retain = true;
writeToStream(output);
}
+
/**
* Serializes the VectorAccessible va and writes it to an output stream
@@ -189,9 +190,10 @@ public void writeToStream(OutputStream output) throws IOException {
}
}
- private void clear() {
+ public void clear() {
if (!retain) {
batch.clear();
+ if(sv2 != null) sv2.clear();
}
}
View
10 exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -35,7 +35,7 @@
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
-import org.apache.drill.exec.memory.DirectBufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos;
@@ -59,7 +59,7 @@
private UserClient client;
private volatile ClusterCoordinator clusterCoordinator;
private volatile boolean connected = false;
- private final DirectBufferAllocator allocator = new DirectBufferAllocator();
+ private final TopLevelAllocator allocator = new TopLevelAllocator(Long.MAX_VALUE);
private int reconnectTimes;
private int reconnectDelay;
@@ -107,7 +107,7 @@ public synchronized void connect() throws RpcException {
checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
// just use the first endpoint for now
DrillbitEndpoint endpoint = endpoints.iterator().next();
- this.client = new UserClient(allocator.getUnderlyingAllocator(), new NioEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS), new NamedThreadFactory("Client-")));
+ this.client = new UserClient(allocator, new NioEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS), new NamedThreadFactory("Client-")));
logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
connect(endpoint);
connected = true;
@@ -146,7 +146,7 @@ private void connect(DrillbitEndpoint endpoint) throws RpcException {
}
}
- public DirectBufferAllocator getAllocator() {
+ public TopLevelAllocator getAllocator() {
return allocator;
}
@@ -218,7 +218,7 @@ private void fail(Exception ex) {
@Override
public void resultArrived(QueryResultBatch result) {
- logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
+// logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
results.add(result);
if(result.getHeader().getIsLastChunk()){
future.set(results);
View
8 exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
@@ -19,19 +19,17 @@
import java.io.File;
import java.io.FileInputStream;
-import java.io.IOException;
import java.util.List;
-import org.apache.commons.lang.StringUtils;
-import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.cache.VectorAccessibleSerializable;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.util.VectorUtil;
import com.beust.jcommander.IParameterValidator;
@@ -42,7 +40,7 @@
public class DumpCat {
- private final static BufferAllocator allocator = BufferAllocator.getAllocator(DrillConfig.create());
+ private final static BufferAllocator allocator = new TopLevelAllocator();
public static void main(String args[]) throws Exception {
DumpCat dumpCat = new DumpCat();
View
858 exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java
@@ -0,0 +1,858 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufProcessor;
+import io.netty.buffer.DuplicatedByteBuf;
+import io.netty.buffer.PooledUnsafeDirectByteBufL;
+import io.netty.buffer.SlicedByteBuf;
+import io.netty.buffer.SwappedByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.charset.Charset;
+
+public class AccountingByteBuf extends ByteBuf{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AccountingByteBuf.class);
+
+ private final PooledUnsafeDirectByteBufL b;
+ private final Accountor acct;
+ private int size;
+
+ public AccountingByteBuf(Accountor a, PooledUnsafeDirectByteBufL b) {
+ super();
+ this.b = b;
+ this.acct = a;
+ this.size = b.capacity();
+ }
+
+ @Override
+ public int refCnt() {
+ return b.refCnt();
+ }
+
+ @Override
+ public boolean release() {
+ if(b.release()){
+ acct.release(this, size);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ if(b.release(decrement)){
+ acct.release(this, size);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int capacity() {
+ return b.capacity();
+ }
+
+ @Override
+ public ByteBuf capacity(int newCapacity) {
+ if(newCapacity < size){
+ // do trim, update size and return
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int maxCapacity() {
+ return size;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return b.alloc();
+ }
+
+ @Override
+ public ByteOrder order() {
+ return ByteOrder.LITTLE_ENDIAN;
+ }
+
+ @Override
+ public ByteBuf order(ByteOrder endianness) {
+ return new SwappedByteBuf(this);
+ }
+
+ @Override
+ public ByteBuf unwrap() {
+ return this;
+ }
+
+ @Override
+ public boolean isDirect() {
+ return true;
+ }
+
+ @Override
+ public int readerIndex() {
+ return b.readerIndex();
+ }
+
+ @Override
+ public ByteBuf readerIndex(int readerIndex) {
+ b.readerIndex(readerIndex);
+ return this;
+ }
+
+ @Override
+ public int writerIndex() {
+ return b.writerIndex();
+ }
+
+ @Override
+ public ByteBuf writerIndex(int writerIndex) {
+ b.writerIndex(writerIndex);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setIndex(int readerIndex, int writerIndex) {
+ b.setIndex(readerIndex, writerIndex);
+ return this;
+ }
+
+ @Override
+ public int readableBytes() {
+ return b.readableBytes();
+ }
+
+ @Override
+ public int writableBytes() {
+ return b.writableBytes();
+ }
+
+ @Override
+ public int maxWritableBytes() {
+ return b.maxWritableBytes();
+ }
+
+ @Override
+ public boolean isReadable() {
+ return b.isReadable();
+ }
+
+ @Override
+ public boolean isReadable(int size) {
+ return b.isReadable(size);
+ }
+
+ @Override
+ public boolean isWritable() {
+ return b.isWritable();
+ }
+
+ @Override
+ public boolean isWritable(int size) {
+ return b.isWritable(size);
+ }
+
+ @Override
+ public ByteBuf clear() {
+ b.clear();
+ return this;
+ }
+
+ @Override
+ public ByteBuf markReaderIndex() {
+ b.markReaderIndex();
+ return this;
+ }
+
+ @Override
+ public ByteBuf resetReaderIndex() {
+ b.resetReaderIndex();
+ return this;
+ }
+
+ @Override
+ public ByteBuf markWriterIndex() {
+ b.markWriterIndex();
+ return this;
+ }
+
+ @Override
+ public ByteBuf resetWriterIndex() {
+ b.resetWriterIndex();
+ return this;
+ }
+
+ @Override
+ public ByteBuf discardReadBytes() {
+ b.discardReadBytes();
+ return this;
+ }
+
+ @Override
+ public ByteBuf discardSomeReadBytes() {
+ b.discardSomeReadBytes();
+ return this;
+ }
+
+ @Override
+ public ByteBuf ensureWritable(int minWritableBytes) {
+ b.ensureWritable(minWritableBytes);
+ return this;
+ }
+
+ @Override
+ public int ensureWritable(int minWritableBytes, boolean force) {
+ return b.ensureWritable(minWritableBytes, false);
+ }
+
+ @Override
+ public boolean getBoolean(int index) {
+ return b.getBoolean(index);
+ }
+
+ @Override
+ public byte getByte(int index) {
+ return b.getByte(index);
+ }
+
+ @Override
+ public short getUnsignedByte(int index) {
+ return b.getUnsignedByte(index);
+ }
+
+ @Override
+ public short getShort(int index) {
+ return b.getShort(index);
+ }
+
+ @Override
+ public int getUnsignedShort(int index) {
+ return b.getUnsignedShort(index);
+ }
+
+ @Override
+ public int getMedium(int index) {
+ return b.getMedium(index);
+ }
+
+ @Override
+ public int getUnsignedMedium(int index) {
+ return b.getUnsignedMedium(index);
+ }
+
+ @Override
+ public int getInt(int index) {
+ return b.getInt(index);
+ }
+
+ @Override
+ public long getUnsignedInt(int index) {
+ return b.getUnsignedInt(index);
+ }
+
+ @Override
+ public long getLong(int index) {
+ return b.getLong(index);
+ }
+
+ @Override
+ public char getChar(int index) {
+ return b.getChar(index);
+ }
+
+ @Override
+ public float getFloat(int index) {
+ return b.getFloat(index);
+ }
+
+ @Override
+ public double getDouble(int index) {
+ return b.getDouble(index);
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst) {
+ b.getBytes(index, dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int length) {
+ b.getBytes(index, dst, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ b.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst) {
+ b.getBytes(index, dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ b.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuffer dst) {
+ b.getBytes(index, dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+ b.getBytes(index, out, length);
+ return this;
+ }
+
+ @Override
+ public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+ return b.getBytes(index, out, length);
+ }
+
+ @Override
+ public ByteBuf setBoolean(int index, boolean value) {
+ b.setBoolean(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setByte(int index, int value) {
+ b.setByte(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setShort(int index, int value) {
+ b.setShort(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setMedium(int index, int value) {
+ b.setMedium(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setInt(int index, int value) {
+ b.setInt(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setLong(int index, long value) {
+ b.setLong(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setChar(int index, int value) {
+ b.setChar(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setFloat(int index, float value) {
+ b.setFloat(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setDouble(int index, double value) {
+ b.setDouble(index, value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src) {
+ b.setBytes(index, src);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int length) {
+ b.setBytes(index, src, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ b.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src) {
+ b.setBytes(index, src);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ b.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuffer src) {
+ b.setBytes(index, src);
+ return this;
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length) throws IOException {
+ return b.setBytes(index, in, length);
+ }
+
+ @Override
+ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+ return b.setBytes(index, in, length);
+ }
+
+ @Override
+ public ByteBuf setZero(int index, int length) {
+ b.setZero(index, length);
+ return this;
+ }
+
+ @Override
+ public boolean readBoolean() {
+ return b.readBoolean();
+ }
+
+ @Override
+ public byte readByte() {
+ return b.readByte();
+ }
+
+ @Override
+ public short readUnsignedByte() {
+ return b.readUnsignedByte();
+ }
+
+ @Override
+ public short readShort() {
+ return b.readShort();
+ }
+
+ @Override
+ public int readUnsignedShort() {
+ return b.readUnsignedShort();
+ }
+
+ @Override
+ public int readMedium() {
+ return b.readMedium();
+ }
+
+ @Override
+ public int readUnsignedMedium() {
+ return b.readUnsignedMedium();
+ }
+
+ @Override
+ public int readInt() {
+ return b.readInt();
+ }
+
+ @Override
+ public long readUnsignedInt() {
+ return b.readUnsignedInt();
+ }
+
+ @Override
+ public long readLong() {
+ return b.readLong();
+ }
+
+ @Override
+ public char readChar() {
+ return b.readChar();
+ }
+
+ @Override
+ public float readFloat() {
+ return b.readFloat();
+ }
+
+ @Override
+ public double readDouble() {
+ return b.readDouble();
+ }
+
+ @Override
+ public ByteBuf readBytes(int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuf readSlice(int length) {
+ ByteBuf slice = slice(readerIndex(), length);
+ readerIndex(readerIndex() + length);
+ return slice;
+ }
+
+
+ @Override
+ public ByteBuf readBytes(ByteBuf dst) {
+ b.readBytes(dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf readBytes(ByteBuf dst, int length) {
+ b.readBytes(dst, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
+ b.readBytes(dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf readBytes(byte[] dst) {
+ b.readBytes(dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
+ b.readBytes(dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf readBytes(ByteBuffer dst) {
+ b.readBytes(dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf readBytes(OutputStream out, int length) throws IOException {
+ b.readBytes(out, length);
+ return null;
+ }
+
+ @Override
+ public int readBytes(GatheringByteChannel out, int length) throws IOException {
+ return b.readBytes(out, length);
+ }
+
+ @Override
+ public ByteBuf skipBytes(int length) {
+ b.skipBytes(length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeBoolean(boolean value) {
+ b.writeBoolean(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeByte(int value) {
+ b.writeByte(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeShort(int value) {
+ b.writeShort(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeMedium(int value) {
+ b.writeMedium(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeInt(int value) {
+ b.writeInt(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeLong(long value) {
+ b.writeLong(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeChar(int value) {
+ b.writeChar(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeFloat(float value) {
+ b.writeFloat(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeDouble(double value) {
+ b.writeDouble(value);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeBytes(ByteBuf src) {
+ b.writeBytes(src);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeBytes(ByteBuf src, int length) {
+ b.writeBytes(src, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
+ b.writeBytes(src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeBytes(byte[] src) {
+ b.writeBytes(src);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
+ b.writeBytes(src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf writeBytes(ByteBuffer src) {
+ b.writeBytes(src);
+ return this;
+ }
+
+ @Override
+ public int writeBytes(InputStream in, int length) throws IOException {
+ return b.writeBytes(in, length);
+ }
+
+ @Override
+ public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
+ return b.writeBytes(in, length);
+ }
+
+ @Override
+ public ByteBuf writeZero(int length) {
+ b.writeZero(length);
+ return this;
+ }
+
+ @Override
+ public int indexOf(int fromIndex, int toIndex, byte value) {
+ return b.indexOf(fromIndex, toIndex, value);
+ }
+
+ @Override
+ public int bytesBefore(byte value) {
+ return b.bytesBefore(value);
+ }
+
+ @Override
+ public int bytesBefore(int length, byte value) {
+ return b.bytesBefore(length, value);
+ }
+
+ @Override
+ public int bytesBefore(int index, int length, byte value) {
+ return b.bytesBefore(index, length, value);
+ }
+
+ @Override
+ public int forEachByte(ByteBufProcessor processor) {
+ return b.forEachByte(processor);
+ }
+
+ @Override
+ public int forEachByte(int index, int length, ByteBufProcessor processor) {
+ return b.forEachByte(index, length, processor);
+ }
+
+ @Override
+ public int forEachByteDesc(ByteBufProcessor processor) {
+ return b.forEachByteDesc(processor);
+ }
+
+ @Override
+ public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
+ return b.forEachByteDesc(index, length, processor);
+ }
+
+ @Override
+ public ByteBuf copy() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuf slice() {
+ return slice(b.readerIndex(), readableBytes());
+ }
+
+ @Override
+ public ByteBuf slice(int index, int length) {
+ if (length == 0) {
+ return Unpooled.EMPTY_BUFFER;
+ }
+
+ return new SlicedByteBuf(this, index, length);
+ }
+
+ @Override
+ public ByteBuf duplicate() {
+ return new DuplicatedByteBuf(this);
+ }
+
+ @Override
+ public int nioBufferCount() {
+ return b.nioBufferCount();
+ }
+
+ @Override
+ public ByteBuffer nioBuffer() {
+ return b.nioBuffer();
+ }
+
+ @Override
+ public ByteBuffer nioBuffer(int index, int length) {
+ return b.nioBuffer(index, length);
+ }
+
+ @Override
+ public ByteBuffer internalNioBuffer(int index, int length) {
+ return b.internalNioBuffer(index, length);
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers() {
+ return b.nioBuffers();
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers(int index, int length) {
+ return b.nioBuffers(index, length);
+ }
+
+ @Override
+ public boolean hasArray() {
+ return b.hasArray();
+ }
+
+ @Override
+ public byte[] array() {
+ return b.array();
+ }
+
+ @Override
+ public int arrayOffset() {
+ return b.arrayOffset();
+ }
+
+ @Override
+ public boolean hasMemoryAddress() {
+ return b.hasMemoryAddress();
+ }
+
+ @Override
+ public long memoryAddress() {
+ return b.memoryAddress();
+ }
+
+ @Override
+ public String toString(Charset charset) {
+ return b.toString(charset);
+ }
+
+ @Override
+ public String toString(int index, int length, Charset charset) {
+ return b.toString(index, length, charset);
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(this);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ // identity equals only.
+ return this == obj;
+ }
+
+ @Override
+ public String toString() {
+ return "AccountingByteBuf [Inner buffer=" + b + ", size=" + size + "]";
+ }
+
+ @Override
+ public int compareTo(ByteBuf buffer) {
+ return b.compareTo(buffer);
+ }
+
+
+ @Override
+ public ByteBuf retain(int increment) {
+ b.retain(increment);
+ return this;
+ }
+
+ @Override
+ public ByteBuf retain() {
+ b.retain();
+ return this;
+ }
+
+
+}
View
192 exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.util.AssertionUtil;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
+public class Accountor {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountor.class);
+
+ private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled();
+ private final AtomicRemainder remainder;
+ private final long total;
+ private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap();
+ private final FragmentHandle handle;
+
+ public Accountor(FragmentHandle handle, Accountor parent, long max, long preAllocated) {
+ // TODO: fix preallocation stuff
+ AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
+ this.remainder = new AtomicRemainder(parentRemainder, max, preAllocated);
+ this.total = max;
+ this.handle = handle;
+ if (ENABLE_ACCOUNTING) {
+ buffers = Maps.newConcurrentMap();
+ } else {
+ buffers = null;
+ }
+ }
+
+ public long getCapacity() {
+ return total;
+ }
+
+ public long getAllocation() {
+ return remainder.getUsed();
+ }
+
+ public boolean reserve(long size) {
+ return remainder.get(size);
+ }
+
+ public void forceAdditionalReservation(long size) {
+ remainder.forceGet(size);
+ }
+
+ public void reserved(long expected, AccountingByteBuf buf, String desc){
+ // make sure to take away the additional memory that happened due to rounding.
+
+ long additional = buf.capacity() - expected;
+ remainder.forceGet(additional);
+
+ if (ENABLE_ACCOUNTING) {
+ buffers.put(buf, new DebugStackTrace(desc, buf.capacity(), Thread.currentThread().getStackTrace()));
+ }
+ }
+ public void reserved(long expected, AccountingByteBuf buf) {
+ reserved(expected, buf, null);
+ }
+
+ public void release(AccountingByteBuf buf, long size) {
+ remainder.returnAllocation(size);
+ if (ENABLE_ACCOUNTING) {
+ if(buf != null && buffers.remove(buf) == null) throw new IllegalStateException("Releasing a buffer that has already been released. Buffer: " + buf);
+ }
+ }
+
+ public void close() {
+ if (ENABLE_ACCOUNTING && !buffers.isEmpty()) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Attempted to close accountor with ");
+ sb.append(buffers.size());
+ sb.append(" buffer(s) still allocated");
+ if(handle != null){
+ sb.append("for QueryId: ");
+ sb.append(QueryIdHelper.getQueryId(handle.getQueryId()));
+ sb.append(", MajorFragmentId: ");
+ sb.append(handle.getMajorFragmentId());
+ sb.append(", MinorFragmentId: ");
+ sb.append(handle.getMinorFragmentId());
+ }
+ sb.append(".\n");
+
+
+ Multimap<DebugStackTrace, DebugStackTrace> multi = LinkedListMultimap.create();
+ for (DebugStackTrace t : buffers.values()) {
+ multi.put(t, t);
+ }
+
+ for (DebugStackTrace entry : multi.keySet()) {
+ Collection<DebugStackTrace> allocs = multi.get(entry);
+
+ sb.append("\n\n\tTotal ");
+ sb.append(allocs.size());
+ sb.append(" allocation(s) of byte size(s): ");
+ for(DebugStackTrace alloc : allocs){
+ sb.append(alloc.size);
+ if(alloc.desc != null){
+ sb.append(" (");
+ sb.append(alloc.desc);
+ sb.append(")");
+ }
+ sb.append(", ");
+ }
+
+ sb.append("at stack location:\n");
+ entry.addToString(sb);
+ }
+
+ throw new IllegalStateException(sb.toString());
+
+ }
+
+
+ }
+
+ private class DebugStackTrace {
+ private static final String CAUSE_CAPTION = "Caused by: ";
+ private static final String SUPPRESSED_CAPTION = "Suppressed: ";
+
+ private StackTraceElement[] elements;
+ private long size;
+ private String desc;
+
+ public DebugStackTrace(String desc, long size, StackTraceElement[] elements) {
+ super();
+ this.elements = elements;
+ this.size = size;
+ this.desc = desc;
+ }
+
+ public void addToString(StringBuffer sb) {
+ for (int i = 3; i < elements.length; i++) {
+ sb.append("\t\t");
+ sb.append(elements[i]);
+ sb.append("\n");
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(elements);
+// result = prime * result + (int) (size ^ (size >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ DebugStackTrace other = (DebugStackTrace) obj;
+ if (!Arrays.equals(elements, other.elements))
+ return false;
+ // weird equal where size doesn't matter for multimap purposes.
+// if (size != other.size)
+// return false;
+ return true;
+ }
+
+ }
+}
View
112 exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ *
+ * TODO: Fix this so that preallocation can never be released back to general pool until allocator is closed.
+ */
+public class AtomicRemainder {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AtomicRemainder.class);
+
+ private final AtomicRemainder parent;
+ private final AtomicLong total;
+ private final AtomicLong unaccountable;
+ private final long max;
+ private final long pre;
+
+ public AtomicRemainder(AtomicRemainder parent, long max, long pre) {
+ this.parent = parent;
+ this.total = new AtomicLong(max - pre);
+ this.unaccountable = new AtomicLong(pre);
+ this.max = max;
+ this.pre = pre;
+ }
+
+ public long getRemainder() {
+ return total.get() + unaccountable.get();
+ }
+
+ public long getUsed() {
+ return max - getRemainder();
+ }
+
+ public void forceGet(long size){
+ total.addAndGet(size);
+ if(parent != null) parent.forceGet(size);
+ }
+
+
+ public boolean get(long size) {
+ if (unaccountable.get() < 1) {
+ // if there is no preallocated memory, we can operate normally.
+ long outcome = total.addAndGet(-size);
+ if (outcome < 0) {
+ total.addAndGet(size);
+ return false;
+ } else {
+ return true;
+ }
+ } else {
+ // if there is preallocated memory, use that first.
+ long unaccount = unaccountable.getAndAdd(-size);
+ if (unaccount > -1) {
+ return true;
+ } else {
+
+ // if there is a parent allocator, check it before allocating.
+ if (parent != null && !parent.get(-unaccount)) {
+ unaccountable.getAndAdd(size);
+ return false;
+ }
+ ;
+
+ long account = total.addAndGet(unaccount);
+ if (account >= 0) {
+ unaccountable.getAndAdd(unaccount);
+ return true;
+ } else {
+ unaccountable.getAndAdd(size);
+ total.addAndGet(-unaccount);
+ return false;
+ }
+ }
+
+ }
+
+ }
+
+ /**
+ * Return the memory accounting to the allocation pool. Make sure to first maintain hold of the preallocated memory.
+ *
+ * @param size
+ */
+ public void returnAllocation(long size) {
+ long preSize = unaccountable.get();
+ long preChange = Math.min(size, pre - preSize);
+ long totalChange = size - preChange;
+ unaccountable.addAndGet(preChange);
+ total.addAndGet(totalChange);
+ if (parent != null)
+ parent.returnAllocation(totalChange);
+ }
+
+}
View
42 exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -17,49 +17,40 @@
*/
package org.apache.drill.exec.memory;
-import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.Closeable;
-import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
/**
* Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods. Also allows inser
*/
-public abstract class BufferAllocator implements Closeable{
+public interface BufferAllocator extends Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferAllocator.class);
/**
* Allocate a new or reused buffer of the provided size. Note that the buffer may technically be larger than the requested size for rounding purposes. However, the buffers capacity will be set to the configured size.
* @param size The size in bytes.
* @return A new ByteBuf.
*/
- public abstract ByteBuf buffer(int size);
+ public abstract AccountingByteBuf buffer(int size);
- public abstract ByteBufAllocator getUnderlyingAllocator();
- public abstract BufferAllocator getChildAllocator(long initialReservation, long maximumReservation);
+ public abstract AccountingByteBuf buffer(int size, String desc);
+
+ public abstract ByteBufAllocator getUnderlyingAllocator();
- protected abstract boolean pre(int bytes);
+ public abstract BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, long maximumReservation) throws OutOfMemoryException;
- public PreAllocator getPreAllocator(){