Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

DRILL-334: Subdivide Drillbit control and data messages. Add memory T…

…opLevel and Child memory allocator with debug mode to capture memory leaks. Various memory leak fixes to get build to complete.
  • Loading branch information...
commit 72c5553f2bea22b5b0fe58380c446737256a0d90 1 parent 4a83dae
Jacques Nadeau authored

Showing 182 changed files with 13,475 additions and 9,048 deletions. Show diff stats Hide diff stats

  1. +26 0 common/src/main/java/org/apache/drill/common/config/DrillConfig.java
  2. +1 1  exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
  3. +18 18 exec/java-exec/pom.xml
  4. +5 3 exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
  5. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
  6. +5 5 exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
  7. +3 3 exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
  8. +33 29 exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
  9. +31 33 exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
  10. +2 3 exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
  11. +2 6 exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
  12. +3 1 exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
  13. +5 5 exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
  14. +3 5 exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
  15. +858 0 exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java
  16. +192 0 exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
  17. +112 0 exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
  18. +14 28 exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
  19. +0 65 exec/java-exec/src/main/java/org/apache/drill/exec/memory/DirectBufferAllocator.java
  20. +51 0 exec/java-exec/src/main/java/org/apache/drill/exec/memory/OutOfMemoryException.java
  21. +161 0 exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
  22. +4 5 exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java
  23. +101 54 exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
  24. +62 0 exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
  25. +26 0 exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
  26. +10 3 exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
  27. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
  28. +4 4 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
  29. +4 0 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
  30. +22 7 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
  31. +25 28 ...ain/java/org/apache/drill/exec/{work/foreman/TunnelManager.java → physical/impl/SendingAccountor.java}
  32. +15 6 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
  33. +5 1 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
  34. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
  35. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
  36. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
  37. +9 0 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
  38. +250 180 ...-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
  39. +35 16 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
  40. +23 8 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
  41. +22 21 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
  42. +22 14 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
  43. +7 0 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
  44. +9 1 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
  45. +5 1 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
  46. +11 23 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
  47. +5 0 exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
  48. +3 3 exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
  49. 0  {protocol → exec/java-exec}/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
  50. +14 4 exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
  51. +8 2 exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
  52. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
  53. +2 2 exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
  54. +10 2 exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
  55. +2 0  exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
  56. +3 1 exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
  57. +10 7 exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
  58. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
  59. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
  60. +12 8 exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
  61. +11 12 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
  62. +19 6 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
  63. +51 22 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
  64. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
  65. +5 10 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit → }/FutureBitCommand.java
  66. +7 11 ...va-exec/src/main/java/org/apache/drill/exec/rpc/{bit/ListeningBitCommand.java → ListeningCommand.java}
  67. +22 4 .../main/java/org/apache/drill/exec/rpc/{ZeroCopyProtobufLengthDecoder.java → ProtobufLengthDecoder.java}
  68. +242 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
  69. +64 24 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
  70. +85 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ResettableBarrier.java
  71. +0 2  exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Response.java
  72. +20 10 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
  73. +3 3 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitCommand.java → RpcCommand.java}
  74. +8 3 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java
  75. +0 99 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitClient.java
  76. +0 189 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnectionManager.java
  77. +0 59 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListenerPool.java
  78. +2 2 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit → control}/AvailabilityListener.java
  79. +11 19 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit → control}/ConnectionManagerRegistry.java
  80. +105 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
  81. +28 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlCommand.java
  82. +37 46 ...exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitConnection.java → control/ControlConnection.java}
  83. +60 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
  84. +40 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java
  85. +9 11 ...a-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitRpcConfig.java → control/ControlRpcConfig.java}
  86. +31 26 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitServer.java → control/ControlServer.java}
  87. +16 43 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitTunnel.java → control/ControlTunnel.java}
  88. +6 16 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitCom.java → control/Controller.java}
  89. +14 27 .../java-exec/src/main/java/org/apache/drill/exec/rpc/{bit/BitComImpl.java → control/ControllerImpl.java}
  90. +8 8 ...rg/apache/drill/exec/rpc/{bit/BitComDefaultInstanceHandler.java → control/DefaultInstanceHandler.java}
  91. +22 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/SendProgress.java
  92. +113 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
  93. +55 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
  94. +100 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
  95. +80 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
  96. +63 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
  97. +53 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
  98. +58 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java
  99. +38 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java
  100. +31 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
  101. +64 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
  102. +42 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
  103. +128 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
  104. +63 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
  105. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/rpc/{bit → data}/SendProgress.java
  106. +10 8 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
  107. +38 0 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java
  108. +20 5 exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
  109. +8 1 exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
  110. +3 3 exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
  111. +23 8 exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
  112. +22 11 exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
  113. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
  114. +35 0 exec/java-exec/src/main/java/org/apache/drill/exec/util/AssertionUtil.java
  115. +0 2  exec/java-exec/src/main/java/org/apache/drill/exec/util/AtomicState.java
  116. +0 1  exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
  117. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/work/{foreman → }/ErrorHelper.java
  118. +1 2  exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
  119. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
  120. +33 16 exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
  121. +6 8 ...in/java/org/apache/drill/exec/work/batch/{AbstractFragmentCollector.java → AbstractDataCollector.java}
  122. +0 211 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
  123. +150 0 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
  124. +7 7 ...-exec/src/main/java/org/apache/drill/exec/work/batch/{BitComHandler.java → ControlMessageHandler.java}
  125. +6 6 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/{BatchCollector.java → DataCollector.java}
  126. +13 15 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
  127. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java
  128. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java
  129. +19 8 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
  130. +17 16 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
  131. +33 13 exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
  132. +8 7 exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
  133. +1 1  exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
  134. +29 22 ...xec/src/main/java/org/apache/drill/exec/work/foreman/{RunningFragmentManager.java → QueryManager.java}
  135. +39 0 exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RootStatusReporter.java
  136. +8 8 ...rg/apache/drill/exec/work/{AbstractFragmentRunnerListener.java → fragment/AbstractStatusReporter.java}
  137. +27 15 ...a-exec/src/main/java/org/apache/drill/exec/work/{FragmentRunner.java → fragment/FragmentExecutor.java}
  138. +14 9 ...rc/main/java/org/apache/drill/exec/work/fragment/{IncomingFragmentHandler.java → FragmentManager.java}
  139. +17 17 ...in/java/org/apache/drill/exec/work/fragment/{RemoteFragmentHandler.java → NonRootFragmentManager.java}
  140. +8 9 ...a/org/apache/drill/exec/work/{RemoteFragmentRunnerListener.java → fragment/NonRootStatusReporter.java}
  141. +13 10 ...c/main/java/org/apache/drill/exec/work/fragment/{LocalFragmentHandler.java → RootFragmentManager.java}
  142. +6 3 .../src/main/java/org/apache/drill/exec/work/{FragmentRunnerListener.java → fragment/StatusReporter.java}
  143. +2 2 exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
  144. +6 3 exec/java-exec/src/main/resources/drill-module.conf
  145. +6 5 exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
  146. +8 7 exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestRepeatedFunction.java
  147. +2 2 exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestEndianess.java
  148. +8 1 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
  149. +4 2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
  150. +105 72 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java
  151. +12 10 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFunctions.java
  152. +4 2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestAgg.java
  153. +6 4 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
  154. +10 8 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoin.java
  155. +7 5 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
  156. +1 1  exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/mergereceiver/TestMergingReceiver.java
  157. +7 8 ...exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java
  158. +4 2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
  159. +6 4 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/sort/TestSimpleSort.java
  160. +4 2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
  161. +4 2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
  162. +4 2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
  163. +4 2 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/union/TestSimpleUnion.java
  164. +2 4 exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
  165. +5 7 exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
  166. +2 2 exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
  167. +110 38 exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
  168. +7 7 exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
  169. +9 4 exec/java-exec/src/test/resources/logback.xml
  170. +5,438 0 protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java
  171. +2,228 0 protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
  172. +160 79 protocol/src/main/java/org/apache/drill/exec/proto/CoordinationProtos.java
  173. +212 6,566 protocol/src/main/java/org/apache/drill/exec/proto/ExecProtos.java
  174. +102 330 protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
  175. +136 45 protocol/src/main/java/org/apache/drill/exec/proto/UserProtos.java
  176. +83 0 protocol/src/main/protobuf/BitControl.proto
  177. +34 0 protocol/src/main/protobuf/BitData.proto
  178. +3 2 protocol/src/main/protobuf/Coordination.proto
  179. +0 83 protocol/src/main/protobuf/ExecutionProtos.proto
  180. +3 2 protocol/src/main/protobuf/User.proto
  181. +18 17 protocol/src/main/protobuf/UserBitShared.proto
  182. +1 1  sqlparser/src/main/java/org/apache/drill/optiq/DrillOptiq.java
26 common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -17,6 +17,8 @@
17 17 */
18 18 package org.apache.drill.common.config;
19 19
  20 +import java.lang.management.ManagementFactory;
  21 +import java.lang.management.RuntimeMXBean;
20 22 import java.net.URL;
21 23 import java.util.Collection;
22 24 import java.util.List;
@@ -35,6 +37,7 @@
35 37 import com.fasterxml.jackson.databind.SerializationFeature;
36 38 import com.fasterxml.jackson.databind.module.SimpleModule;
37 39 import com.google.common.annotations.VisibleForTesting;
  40 +import com.google.common.collect.ImmutableList;
38 41 import com.typesafe.config.Config;
39 42 import com.typesafe.config.ConfigFactory;
40 43
@@ -42,13 +45,18 @@
42 45
43 46 static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
44 47 private final ObjectMapper mapper;
  48 + private final ImmutableList<String> startupArguments;
  49 + @SuppressWarnings("restriction") private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory();
45 50
46 51 @SuppressWarnings("unchecked")
47 52 private volatile List<Queue<Object>> sinkQueues = new CopyOnWriteArrayList<Queue<Object>>(new Queue[1]);
48 53
  54 +
  55 + @SuppressWarnings("restriction")
49 56 @VisibleForTesting
50 57 public DrillConfig(Config config) {
51 58 super(config);
  59 +
52 60 mapper = new ObjectMapper();
53 61 SimpleModule deserModule = new SimpleModule("LogicalExpressionDeserializationModule")
54 62 .addDeserializer(LogicalExpression.class, new LogicalExpression.De(this));
@@ -61,8 +69,16 @@ public DrillConfig(Config config) {
61 69 mapper.registerSubtypes(LogicalOperatorBase.getSubTypes(this));
62 70 mapper.registerSubtypes(StorageEngineConfigBase.getSubTypes(this));
63 71
  72 + RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
  73 + this.startupArguments = ImmutableList.copyOf(bean.getInputArguments());
64 74
65 75 };
  76 +
  77 +
  78 +
  79 + public List<String> getStartupArguments(){
  80 + return startupArguments;
  81 + }
66 82
67 83 /**
68 84 * Create a DrillConfig object using the default config file name
@@ -155,4 +171,14 @@ public ObjectMapper getMapper(){
155 171 public String toString(){
156 172 return this.root().render();
157 173 }
  174 +
  175 + public static void main(String[] args) throws Exception{
  176 + //"-XX:MaxDirectMemorySize"
  177 + DrillConfig config = DrillConfig.create();
  178 +
  179 + }
  180 +
  181 + public static long getMaxDirectMemory(){
  182 + return MAX_DIRECT_MEMORY;
  183 + }
158 184 }
2  exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
@@ -30,7 +30,7 @@
30 30 import java.nio.channels.GatheringByteChannel;
31 31 import java.nio.channels.ScatteringByteChannel;
32 32
33   -final class PooledUnsafeDirectByteBufL extends PooledByteBufL<ByteBuffer> {
  33 +public final class PooledUnsafeDirectByteBufL extends PooledByteBufL<ByteBuffer> {
34 34
35 35 private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
36 36
36 exec/java-exec/pom.xml
... ... @@ -1,21 +1,16 @@
1 1 <?xml version="1.0"?>
2   -<!--
3   - Licensed to the Apache Software Foundation (ASF) under one or more
4   - contributor license agreements. See the NOTICE file distributed with
5   - this work for additional information regarding copyright ownership.
6   - The ASF licenses this file to You under the Apache License, Version 2.0
7   - (the "License"); you may not use this file except in compliance with
8   - the License. You may obtain a copy of the License at
9   -
10   - http://www.apache.org/licenses/LICENSE-2.0
11   -
12   - Unless required by applicable law or agreed to in writing, software
13   - distributed under the License is distributed on an "AS IS" BASIS,
14   - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   - See the License for the specific language governing permissions and
16   - limitations under the License.
17   --->
18   -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  2 +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
  3 + license agreements. See the NOTICE file distributed with this work for additional
  4 + information regarding copyright ownership. The ASF licenses this file to
  5 + You under the Apache License, Version 2.0 (the "License"); you may not use
  6 + this file except in compliance with the License. You may obtain a copy of
  7 + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
  8 + by applicable law or agreed to in writing, software distributed under the
  9 + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
  10 + OF ANY KIND, either express or implied. See the License for the specific
  11 + language governing permissions and limitations under the License. -->
  12 +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  13 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19 14 <modelVersion>4.0.0</modelVersion>
20 15 <parent>
21 16 <artifactId>exec-parent</artifactId>
@@ -37,6 +32,11 @@
37 32 <version>4.1</version>
38 33 </dependency>
39 34 <dependency>
  35 + <groupId>org.apache.commons</groupId>
  36 + <artifactId>commons-pool2</artifactId>
  37 + <version>2.1</version>
  38 + </dependency>
  39 + <dependency>
40 40 <groupId>com.thoughtworks.paranamer</groupId>
41 41 <artifactId>paranamer</artifactId>
42 42 <version>2.5.6</version>
@@ -273,7 +273,7 @@
273 273 </resource>
274 274 <resource>
275 275 <directory>target/generated-sources</directory>
276   -<!-- <include>*/org</include> -->
  276 + <!-- <include>*/org</include> -->
277 277 <filtering>true</filtering>
278 278 </resource>
279 279 </resources>
8 exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -47,7 +47,9 @@
47 47 public static final String TRACE_DUMP_FILESYSTEM = "drill.exec.trace.filesystem";
48 48 public static final String TEMP_DIRECTORIES = "drill.exec.tmp.directories";
49 49 public static final String TEMP_FILESYSTEM = "drill.exec.tmp.filesystem";
50   - public static final String SPOOLING_BUFFER_IMPL = "drill.exec.spooling.impl";
51   - public static final String SPOOLING_BUFFER_DELETE = "drill.exec.spooling.delete";
52   - public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.spooling.size";
  50 + public static final String INCOMING_BUFFER_IMPL = "drill.exec.buffer.impl";
  51 + public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; // incoming buffer size (number of batches)
  52 + public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete";
  53 + public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
  54 +
53 55 }
2  exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -20,8 +20,8 @@
20 20 import java.io.Closeable;
21 21
22 22 import org.apache.drill.exec.exception.DrillbitStartupException;
  23 +import org.apache.drill.exec.proto.BitControl.PlanFragment;
23 24 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
24   -import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
25 25
26 26
27 27 public interface DistributedCache extends Closeable{
10 exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
@@ -19,10 +19,10 @@
19 19
20 20 import java.util.concurrent.TimeUnit;
21 21
22   -public interface DistributedMap<V> {
  22 +public interface DistributedMap<V extends DrillSerializable> {
23 23 static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class);
24   - public DrillSerializable get(String key);
25   - public void put(String key, DrillSerializable value);
26   - public void putIfAbsent(String key, DrillSerializable value);
27   - public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeUnit);
  24 + public V get(String key);
  25 + public void put(String key, V value);
  26 + public void putIfAbsent(String key, V value);
  27 + public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit);
28 28 }
6 exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
@@ -19,8 +19,8 @@
19 19
20 20 import java.util.Collection;
21 21
22   -public interface DistributedMultiMap<V> {
  22 +public interface DistributedMultiMap<V extends DrillSerializable> {
23 23 static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMultiMap.class);
24   - public Collection<DrillSerializable> get(String key);
25   - public void put(String key, DrillSerializable value);
  24 + public Collection<V> get(String key);
  25 + public void put(String key, V value);
26 26 }
62 exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -22,23 +22,28 @@
22 22 import java.util.List;
23 23 import java.util.concurrent.TimeUnit;
24 24
25   -import com.google.common.base.Preconditions;
26   -import com.google.common.collect.Lists;
27   -import com.hazelcast.config.SerializerConfig;
28   -import com.hazelcast.core.*;
29 25 import org.apache.drill.common.config.DrillConfig;
30 26 import org.apache.drill.exec.ExecConstants;
31 27 import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
32 28 import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
33 29 import org.apache.drill.exec.memory.BufferAllocator;
  30 +import org.apache.drill.exec.proto.BitControl.PlanFragment;
  31 +import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
34 32 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
35   -import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
36   -import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
37 33
38 34 import com.google.common.cache.Cache;
39 35 import com.google.common.cache.CacheBuilder;
  36 +import com.google.common.collect.Lists;
40 37 import com.hazelcast.config.Config;
41   -import org.apache.drill.exec.server.DrillbitContext;
  38 +import com.hazelcast.config.SerializerConfig;
  39 +import com.hazelcast.core.DuplicateInstanceNameException;
  40 +import com.hazelcast.core.Hazelcast;
  41 +import com.hazelcast.core.HazelcastInstance;
  42 +import com.hazelcast.core.IAtomicLong;
  43 +import com.hazelcast.core.IMap;
  44 +import com.hazelcast.core.ITopic;
  45 +import com.hazelcast.core.Message;
  46 +import com.hazelcast.core.MessageListener;
42 47
43 48 public class HazelCache implements DistributedCache {
44 49 static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
@@ -67,8 +72,9 @@ public void onMessage(Message<HWorkQueueStatus> wrapped) {
67 72
68 73 public void run() {
69 74 Config c = new Config();
70   - SerializerConfig sc = new SerializerConfig().setImplementation(new HCVectorAccessibleSerializer(allocator))
71   - .setTypeClass(VectorAccessibleSerializable.class);
  75 + SerializerConfig sc = new SerializerConfig() //
  76 + .setImplementation(new HCVectorAccessibleSerializer(allocator)) //
  77 + .setTypeClass(VectorAccessibleSerializable.class);
72 78 c.setInstanceName(instanceName);
73 79 c.getSerializationConfig().addSerializerConfig(sc);
74 80 instance = getInstanceOrCreateNew(c);
@@ -119,12 +125,14 @@ public void storeFragment(PlanFragment fragment) {
119 125
120 126 @Override
121 127 public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
122   - return new HCDistributedMultiMapImpl(this.instance.getMultiMap(clazz.toString()), clazz);
  128 + com.hazelcast.core.MultiMap<String, V> mmap = this.instance.getMultiMap(clazz.toString());
  129 + return new HCDistributedMultiMapImpl<V>(mmap, clazz);
123 130 }
124 131
125 132 @Override
126 133 public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
127   - return new HCDistributedMapImpl(this.instance.getMap(clazz.toString()), clazz);
  134 + IMap<String, V> imap = this.instance.getMap(clazz.toString());
  135 + return new HCDistributedMapImpl<V>(imap, clazz);
128 136 }
129 137
130 138 @Override
@@ -132,51 +140,47 @@ public Counter getCounter(String name) {
132 140 return new HCCounterImpl(this.instance.getAtomicLong(name));
133 141 }
134 142
135   - public static class HCDistributedMapImpl<V> implements DistributedMap<V> {
136   - private IMap<String, DrillSerializable> m;
137   - private Class<V> clazz;
  143 + public static class HCDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
  144 + private IMap<String, V> m;
138 145
139   - public HCDistributedMapImpl(IMap m, Class<V> clazz) {
  146 + public HCDistributedMapImpl(IMap<String, V> m, Class<V> clazz) {
140 147 this.m = m;
141   - this.clazz = clazz;
142 148 }
143 149
144   - public DrillSerializable get(String key) {
  150 + public V get(String key) {
145 151 return m.get(key);
146 152 }
147 153
148   - public void put(String key, DrillSerializable value) {
  154 + public void put(String key, V value) {
149 155 m.put(key, value);
150 156 }
151 157
152   - public void putIfAbsent(String key, DrillSerializable value) {
  158 + public void putIfAbsent(String key, V value) {
153 159 m.putIfAbsent(key, value);
154 160 }
155 161
156   - public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeunit) {
  162 + public void putIfAbsent(String key, V value, long ttl, TimeUnit timeunit) {
157 163 m.putIfAbsent(key, value, ttl, timeunit);
158 164 }
159 165 }
160 166
161   - public static class HCDistributedMultiMapImpl<V> implements DistributedMultiMap<V> {
162   - private com.hazelcast.core.MultiMap<String, DrillSerializable> mmap;
163   - private Class<V> clazz;
  167 + public static class HCDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
  168 + private com.hazelcast.core.MultiMap<String, V> mmap;
164 169
165   - public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap mmap, Class<V> clazz) {
  170 + public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap<String, V> mmap, Class<V> clazz) {
166 171 this.mmap = mmap;
167   - this.clazz = clazz;
168 172 }
169 173
170   - public Collection<DrillSerializable> get(String key) {
171   - List<DrillSerializable> list = Lists.newArrayList();
172   - for (DrillSerializable v : mmap.get(key)) {
  174 + public Collection<V> get(String key) {
  175 + List<V> list = Lists.newArrayList();
  176 + for (V v : mmap.get(key)) {
173 177 list.add(v);
174 178 }
175 179 return list;
176 180 }
177 181
178 182 @Override
179   - public void put(String key, DrillSerializable value) {
  183 + public void put(String key, V value) {
180 184 mmap.put(key, value);
181 185 }
182 186 }
64 exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -17,7 +17,6 @@
17 17 */
18 18 package org.apache.drill.exec.cache;
19 19
20   -import java.io.ByteArrayOutputStream;
21 20 import java.io.IOException;
22 21 import java.lang.reflect.InvocationTargetException;
23 22 import java.util.Collection;
@@ -27,29 +26,28 @@
27 26 import java.util.concurrent.TimeUnit;
28 27 import java.util.concurrent.atomic.AtomicLong;
29 28
30   -import com.google.common.collect.ArrayListMultimap;
31   -import com.google.common.collect.Lists;
32   -import com.google.common.io.ByteArrayDataInput;
33   -import com.google.common.io.ByteArrayDataOutput;
34   -import com.google.common.io.ByteStreams;
35 29 import org.apache.drill.common.config.DrillConfig;
36 30 import org.apache.drill.exec.exception.DrillbitStartupException;
37 31 import org.apache.drill.exec.memory.BufferAllocator;
  32 +import org.apache.drill.exec.memory.TopLevelAllocator;
  33 +import org.apache.drill.exec.proto.BitControl.PlanFragment;
38 34 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
39   -import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
40 35
  36 +import com.google.common.collect.ArrayListMultimap;
  37 +import com.google.common.collect.Lists;
41 38 import com.google.common.collect.Maps;
42   -import org.apache.drill.exec.server.BootStrapContext;
43   -import org.apache.drill.exec.server.DrillbitContext;
  39 +import com.google.common.io.ByteArrayDataInput;
  40 +import com.google.common.io.ByteArrayDataOutput;
  41 +import com.google.common.io.ByteStreams;
44 42
45 43 public class LocalCache implements DistributedCache {
46 44 static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
47 45
48 46 private volatile Map<FragmentHandle, PlanFragment> handles;
49   - private volatile ConcurrentMap<Class, DistributedMap> maps;
50   - private volatile ConcurrentMap<Class, DistributedMultiMap> multiMaps;
  47 + private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps;
  48 + private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps;
51 49 private volatile ConcurrentMap<String, Counter> counters;
52   - private static final BufferAllocator allocator = BufferAllocator.getAllocator(DrillConfig.create());
  50 + private static final BufferAllocator allocator = new TopLevelAllocator();
53 51
54 52 @Override
55 53 public void close() throws IOException {
@@ -66,22 +64,22 @@ public void run() throws DrillbitStartupException {
66 64
67 65 @Override
68 66 public PlanFragment getFragment(FragmentHandle handle) {
69   - logger.debug("looking for fragment with handle: {}", handle);
  67 +// logger.debug("looking for fragment with handle: {}", handle);
70 68 return handles.get(handle);
71 69 }
72 70
73 71 @Override
74 72 public void storeFragment(PlanFragment fragment) {
75   - logger.debug("Storing fragment: {}", fragment);
  73 +// logger.debug("Storing fragment: {}", fragment);
76 74 handles.put(fragment.getHandle(), fragment);
77 75 }
78 76
79 77 @Override
80 78 public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
81   - DistributedMultiMap mmap = multiMaps.get(clazz);
  79 + DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) multiMaps.get(clazz);
82 80 if (mmap == null) {
83 81 multiMaps.putIfAbsent(clazz, new LocalDistributedMultiMapImpl<V>(clazz));
84   - return multiMaps.get(clazz);
  82 + return (DistributedMultiMap<V>) multiMaps.get(clazz);
85 83 } else {
86 84 return mmap;
87 85 }
@@ -91,8 +89,8 @@ public void storeFragment(PlanFragment fragment) {
91 89 public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
92 90 DistributedMap m = maps.get(clazz);
93 91 if (m == null) {
94   - maps.putIfAbsent(clazz, new LocalDistributedMapImpl(clazz));
95   - return maps.get(clazz);
  92 + maps.putIfAbsent(clazz, new LocalDistributedMapImpl<V>(clazz));
  93 + return (DistributedMap<V>) maps.get(clazz);
96 94 } else {
97 95 return m;
98 96 }
@@ -119,10 +117,10 @@ public static ByteArrayDataOutput serialize(DrillSerializable obj) {
119 117 return out;
120 118 }
121 119
122   - public static DrillSerializable deserialize(byte[] bytes, Class clazz) {
  120 + public static <V extends DrillSerializable> V deserialize(byte[] bytes, Class<V> clazz) {
123 121 ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
124 122 try {
125   - DrillSerializable obj = (DrillSerializable)clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
  123 + V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
126 124 obj.read(in);
127 125 return obj;
128 126 } catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) {
@@ -130,18 +128,18 @@ public static DrillSerializable deserialize(byte[] bytes, Class clazz) {
130 128 }
131 129 }
132 130
133   - public static class LocalDistributedMultiMapImpl<V> implements DistributedMultiMap<V> {
  131 + public static class LocalDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
134 132 private ArrayListMultimap<String, ByteArrayDataOutput> mmap;
135   - private Class<DrillSerializable> clazz;
  133 + private Class<V> clazz;
136 134
137   - public LocalDistributedMultiMapImpl(Class clazz) {
  135 + public LocalDistributedMultiMapImpl(Class<V> clazz) {
138 136 mmap = ArrayListMultimap.create();
139 137 this.clazz = clazz;
140 138 }
141 139
142 140 @Override
143   - public Collection<DrillSerializable> get(String key) {
144   - List<DrillSerializable> list = Lists.newArrayList();
  141 + public Collection<V> get(String key) {
  142 + List<V> list = Lists.newArrayList();
145 143 for (ByteArrayDataOutput o : mmap.get(key)) {
146 144 list.add(deserialize(o.toByteArray(), this.clazz));
147 145 }
@@ -154,33 +152,33 @@ public void put(String key, DrillSerializable value) {
154 152 }
155 153 }
156 154
157   - public static class LocalDistributedMapImpl<V> implements DistributedMap<V> {
  155 + public static class LocalDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
158 156 private ConcurrentMap<String, ByteArrayDataOutput> m;
159   - private Class<DrillSerializable> clazz;
  157 + private Class<V> clazz;
160 158
161   - public LocalDistributedMapImpl(Class clazz) {
  159 + public LocalDistributedMapImpl(Class<V> clazz) {
162 160 m = Maps.newConcurrentMap();
163 161 this.clazz = clazz;
164 162 }
165 163
166 164 @Override
167   - public DrillSerializable get(String key) {
  165 + public V get(String key) {
168 166 if (m.get(key) == null) return null;
169   - return deserialize(m.get(key).toByteArray(), this.clazz);
  167 + return (V) deserialize(m.get(key).toByteArray(), this.clazz);
170 168 }
171 169
172 170 @Override
173   - public void put(String key, DrillSerializable value) {
  171 + public void put(String key, V value) {
174 172 m.put(key, serialize(value));
175 173 }
176 174
177 175 @Override
178   - public void putIfAbsent(String key, DrillSerializable value) {
  176 + public void putIfAbsent(String key, V value) {
179 177 m.putIfAbsent(key, serialize(value));
180 178 }
181 179
182 180 @Override
183   - public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeUnit) {
  181 + public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit) {
184 182 m.putIfAbsent(key, serialize(value));
185 183 logger.warn("Expiration not implemented in local map cache");
186 184 }
5 exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
@@ -17,11 +17,10 @@
17 17 */
18 18 package org.apache.drill.exec.cache;
19 19
  20 +import org.apache.drill.exec.proto.BitControl.PlanFragment;
  21 +import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
20 22 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
21   -import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
22   -import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
23 23
24   -import com.google.protobuf.Parser;
25 24 import com.hazelcast.core.HazelcastInstance;
26 25
27 26 public class ProtoBufImpl {
8 exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
@@ -17,16 +17,12 @@
17 17 */
18 18 package org.apache.drill.exec.cache;
19 19
20   -import java.io.DataInput;
21   -import java.io.DataOutput;
22 20 import java.io.IOException;
23 21
24   -import com.hazelcast.nio.ObjectDataInput;
25   -import com.hazelcast.nio.ObjectDataOutput;
26   -import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
27   -
28 22 import com.google.protobuf.MessageLite;
29 23 import com.google.protobuf.Parser;
  24 +import com.hazelcast.nio.ObjectDataInput;
  25 +import com.hazelcast.nio.ObjectDataOutput;
30 26 import com.hazelcast.nio.serialization.DataSerializable;
31 27
32 28 public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
4 exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -133,6 +133,7 @@ public void writeToStreamAndRetain(OutputStream output) throws IOException {
133 133 retain = true;
134 134 writeToStream(output);
135 135 }
  136 +
136 137
137 138 /**
138 139 * Serializes the VectorAccessible va and writes it to an output stream
@@ -189,9 +190,10 @@ public void writeToStream(OutputStream output) throws IOException {
189 190 }
190 191 }
191 192
192   - private void clear() {
  193 + public void clear() {
193 194 if (!retain) {
194 195 batch.clear();
  196 + if(sv2 != null) sv2.clear();
195 197 }
196 198 }
197 199
10 exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -35,7 +35,7 @@
35 35 import org.apache.drill.exec.ExecConstants;
36 36 import org.apache.drill.exec.coord.ClusterCoordinator;
37 37 import org.apache.drill.exec.coord.ZKClusterCoordinator;
38   -import org.apache.drill.exec.memory.DirectBufferAllocator;
  38 +import org.apache.drill.exec.memory.TopLevelAllocator;
39 39 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
40 40 import org.apache.drill.exec.proto.UserBitShared.QueryId;
41 41 import org.apache.drill.exec.proto.UserProtos;
@@ -59,7 +59,7 @@
59 59 private UserClient client;
60 60 private volatile ClusterCoordinator clusterCoordinator;
61 61 private volatile boolean connected = false;
62   - private final DirectBufferAllocator allocator = new DirectBufferAllocator();
  62 + private final TopLevelAllocator allocator = new TopLevelAllocator(Long.MAX_VALUE);
63 63 private int reconnectTimes;
64 64 private int reconnectDelay;
65 65
@@ -107,7 +107,7 @@ public synchronized void connect() throws RpcException {
107 107 checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
108 108 // just use the first endpoint for now
109 109 DrillbitEndpoint endpoint = endpoints.iterator().next();
110   - this.client = new UserClient(allocator.getUnderlyingAllocator(), new NioEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS), new NamedThreadFactory("Client-")));
  110 + this.client = new UserClient(allocator, new NioEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS), new NamedThreadFactory("Client-")));
111 111 logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
112 112 connect(endpoint);
113 113 connected = true;
@@ -146,7 +146,7 @@ private void connect(DrillbitEndpoint endpoint) throws RpcException {
146 146 }
147 147 }
148 148
149   - public DirectBufferAllocator getAllocator() {
  149 + public TopLevelAllocator getAllocator() {
150 150 return allocator;
151 151 }
152 152
@@ -218,7 +218,7 @@ private void fail(Exception ex) {
218 218
219 219 @Override
220 220 public void resultArrived(QueryResultBatch result) {
221   - logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
  221 +// logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
222 222 results.add(result);
223 223 if(result.getHeader().getIsLastChunk()){
224 224 future.set(results);
8 exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
@@ -19,19 +19,17 @@
19 19
20 20 import java.io.File;
21 21 import java.io.FileInputStream;
22   -import java.io.IOException;
23 22 import java.util.List;
24 23
25   -import org.apache.commons.lang.StringUtils;
26   -import org.apache.drill.common.config.DrillConfig;
27 24 import org.apache.drill.exec.cache.VectorAccessibleSerializable;
28 25 import org.apache.drill.exec.memory.BufferAllocator;
  26 +import org.apache.drill.exec.memory.TopLevelAllocator;
29 27 import org.apache.drill.exec.record.BatchSchema;
  28 +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
30 29 import org.apache.drill.exec.record.MaterializedField;
31 30 import org.apache.drill.exec.record.VectorAccessible;
32 31 import org.apache.drill.exec.record.VectorContainer;
33 32 import org.apache.drill.exec.record.VectorWrapper;
34   -import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
35 33 import org.apache.drill.exec.util.VectorUtil;
36 34
37 35 import com.beust.jcommander.IParameterValidator;
@@ -42,7 +40,7 @@
42 40
43 41 public class DumpCat {
44 42
45   - private final static BufferAllocator allocator = BufferAllocator.getAllocator(DrillConfig.create());
  43 + private final static BufferAllocator allocator = new TopLevelAllocator();
46 44
47 45 public static void main(String args[]) throws Exception {
48 46 DumpCat dumpCat = new DumpCat();
858 exec/java-exec/src/main/java/org/apache/drill/exec/memory/AccountingByteBuf.java
... ... @@ -0,0 +1,858 @@
  1 +/**
  2 + * Licensed to the Apache Software Foundation (ASF) under one
  3 + * or more contributor license agreements. See the NOTICE file
  4 + * distributed with this work for additional information
  5 + * regarding copyright ownership. The ASF licenses this file
  6 + * to you under the Apache License, Version 2.0 (the
  7 + * "License"); you may not use this file except in compliance
  8 + * with the License. You may obtain a copy of the License at
  9 + *
  10 + * http://www.apache.org/licenses/LICENSE-2.0
  11 + *
  12 + * Unless required by applicable law or agreed to in writing, software
  13 + * distributed under the License is distributed on an "AS IS" BASIS,
  14 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15 + * See the License for the specific language governing permissions and
  16 + * limitations under the License.
  17 + */
  18 +package org.apache.drill.exec.memory;
  19 +
  20 +import io.netty.buffer.ByteBuf;
  21 +import io.netty.buffer.ByteBufAllocator;
  22 +import io.netty.buffer.ByteBufProcessor;
  23 +import io.netty.buffer.DuplicatedByteBuf;
  24 +import io.netty.buffer.PooledUnsafeDirectByteBufL;
  25 +import io.netty.buffer.SlicedByteBuf;
  26 +import io.netty.buffer.SwappedByteBuf;
  27 +import io.netty.buffer.Unpooled;
  28 +
  29 +import java.io.IOException;
  30 +import java.io.InputStream;
  31 +import java.io.OutputStream;
  32 +import java.nio.ByteBuffer;
  33 +import java.nio.ByteOrder;
  34 +import java.nio.channels.GatheringByteChannel;
  35 +import java.nio.channels.ScatteringByteChannel;
  36 +import java.nio.charset.Charset;
  37 +
  38 +public class AccountingByteBuf extends ByteBuf{
  39 + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AccountingByteBuf.class);
  40 +
  41 + private final PooledUnsafeDirectByteBufL b;
  42 + private final Accountor acct;
  43 + private int size;
  44 +
  45 + public AccountingByteBuf(Accountor a, PooledUnsafeDirectByteBufL b) {
  46 + super();
  47 + this.b = b;
  48 + this.acct = a;
  49 + this.size = b.capacity();
  50 + }
  51 +
  52 + @Override
  53 + public int refCnt() {
  54 + return b.refCnt();
  55 + }
  56 +
  57 + @Override
  58 + public boolean release() {
  59 + if(b.release()){
  60 + acct.release(this, size);
  61 + return true;
  62 + }
  63 + return false;
  64 + }
  65 +
  66 + @Override
  67 + public boolean release(int decrement) {
  68 + if(b.release(decrement)){
  69 + acct.release(this, size);
  70 + return true;
  71 + }
  72 + return false;
  73 + }
  74 +
  75 + @Override
  76 + public int capacity() {
  77 + return b.capacity();
  78 + }
  79 +
  80 + @Override
  81 + public ByteBuf capacity(int newCapacity) {
  82 + if(newCapacity < size){
  83 + // do trim, update size and return
  84 + }
  85 + throw new UnsupportedOperationException();
  86 + }
  87 +
  88 + @Override
  89 + public int maxCapacity() {
  90 + return size;
  91 + }
  92 +
  93 + @Override
  94 + public ByteBufAllocator alloc() {
  95 + return b.alloc();
  96 + }
  97 +
  98 + @Override
  99 + public ByteOrder order() {
  100 + return ByteOrder.LITTLE_ENDIAN;
  101 + }
  102 +
  103 + @Override
  104 + public ByteBuf order(ByteOrder endianness) {
  105 + return new SwappedByteBuf(this);
  106 + }
  107 +
  108 + @Override
  109 + public ByteBuf unwrap() {
  110 + return this;
  111 + }
  112 +
  113 + @Override
  114 + public boolean isDirect() {
  115 + return true;
  116 + }
  117 +
  118 + @Override
  119 + public int readerIndex() {
  120 + return b.readerIndex();
  121 + }
  122 +
  123 + @Override
  124 + public ByteBuf readerIndex(int readerIndex) {
  125 + b.readerIndex(readerIndex);
  126 + return this;
  127 + }
  128 +
  129 + @Override
  130 + public int writerIndex() {
  131 + return b.writerIndex();
  132 + }
  133 +
  134 + @Override
  135 + public ByteBuf writerIndex(int writerIndex) {
  136 + b.writerIndex(writerIndex);
  137 + return this;
  138 + }
  139 +
  140 + @Override
  141 + public ByteBuf setIndex(int readerIndex, int writerIndex) {
  142 + b.setIndex(readerIndex, writerIndex);
  143 + return this;
  144 + }
  145 +
  146 + @Override
  147 + public int readableBytes() {
  148 + return b.readableBytes();
  149 + }
  150 +
  151 + @Override
  152 + public int writableBytes() {
  153 + return b.writableBytes();
  154 + }
  155 +
  156 + @Override
  157 + public int maxWritableBytes() {
  158 + return b.maxWritableBytes();
  159 + }
  160 +
  161 + @Override
  162 + public boolean isReadable() {
  163 + return b.isReadable();
  164 + }
  165 +
  166 + @Override
  167 + public boolean isReadable(int size) {
  168 + return b.isReadable(size);
  169 + }
  170 +
  171 + @Override
  172 + public boolean isWritable() {
  173 + return b.isWritable();
  174 + }
  175 +
  176 + @Override
  177 + public boolean isWritable(int size) {
  178 + return b.isWritable(size);
  179 + }
  180 +
  181 + @Override
  182 + public ByteBuf clear() {
  183 + b.clear();
  184 + return this;
  185 + }
  186 +
  187 + @Override
  188 + public ByteBuf markReaderIndex() {
  189 + b.markReaderIndex();
  190 + return this;
  191 + }
  192 +
  193 + @Override
  194 + public ByteBuf resetReaderIndex() {
  195 + b.resetReaderIndex();
  196 + return this;
  197 + }
  198 +
  199 + @Override
  200 + public ByteBuf markWriterIndex() {
  201 + b.markWriterIndex();
  202 + return this;
  203 + }
  204 +
  205 + @Override
  206 + public ByteBuf resetWriterIndex() {
  207 + b.resetWriterIndex();
  208 + return this;
  209 + }
  210 +
  211 + @Override
  212 + public ByteBuf discardReadBytes() {
  213 + b.discardReadBytes();
  214 + return this;
  215 + }
  216 +
  217 + @Override
  218 + public ByteBuf discardSomeReadBytes() {
  219 + b.discardSomeReadBytes();
  220 + return this;
  221 + }
  222 +
  223 + @Override
  224 + public ByteBuf ensureWritable(int minWritableBytes) {
  225 + b.ensureWritable(minWritableBytes);
  226 + return this;
  227 + }
  228 +
  229 + @Override
  230 + public int ensureWritable(int minWritableBytes, boolean force) {
  231 + return b.ensureWritable(minWritableBytes, false);
  232 + }
  233 +
  234 + @Override
  235 + public boolean getBoolean(int index) {
  236 + return b.getBoolean(index);
  237 + }
  238 +
  239 + @Override
  240 + public byte getByte(int index) {
  241 + return b.getByte(index);
  242 + }
  243 +
  244 + @Override
  245 + public short getUnsignedByte(int index) {
  246 + return b.getUnsignedByte(index);
  247 + }
  248 +