diff --git a/src/main/java/su/interference/core/GenericObject.java b/src/main/java/su/interference/core/GenericObject.java index 16bad32..6283160 100644 --- a/src/main/java/su/interference/core/GenericObject.java +++ b/src/main/java/su/interference/core/GenericObject.java @@ -35,13 +35,14 @@ this software and associated documentation files (the "Software"), to deal in */ public class GenericObject implements Serializable, GenericResult { - private final Map vmap; + private final static long serialVersionUID = 4330809121118587364L; + private final Map vmap; - protected GenericObject(Map vmap) { + protected GenericObject(Map vmap) { this.vmap = vmap; } - public Object getValueByName(String name) { + public Object getValueByName(String name) { return vmap.get(name); } } diff --git a/src/main/java/su/interference/core/IndexFrame.java b/src/main/java/su/interference/core/IndexFrame.java index 43aecb3..5161950 100644 --- a/src/main/java/su/interference/core/IndexFrame.java +++ b/src/main/java/su/interference/core/IndexFrame.java @@ -24,6 +24,8 @@ this software and associated documentation files (the "Software"), to deal in package su.interference.core; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import su.interference.exception.*; import su.interference.persistent.*; import su.interference.serialize.ByteString; @@ -36,22 +38,32 @@ this software and associated documentation files (the "Software"), to deal in */ public class IndexFrame extends Frame { + private final static Logger logger = LoggerFactory.getLogger(IndexFrame.class); private boolean sorted = false; + private final boolean terminate; public static final int INDEX_FRAME_NODE = 2; public static final int INDEX_FRAME_LEAF = 1; public static final int INITIALIZE_DURING_CONSTRUCT = 1; public IndexFrame(int file, long pointer, int size, int objectId, Table t) throws InternalException { super(file, pointer, size, t); + this.terminate = false; } public IndexFrame(FrameData bd, int frameType, Table t) throws InternalException { super(bd, t); this.setType(frameType); + this.terminate = false; + } + + public IndexFrame() { + super(0, 0, 0, null); + this.terminate = true; } public IndexFrame(int file, long pointer, int size, FrameData bd, Table t, Class c, List uframes) throws Exception { super(null, file, pointer, size, bd, t, c); + this.terminate = false; Map ucs = new HashMap<>(); for (FrameData uframe : uframes) { @@ -97,6 +109,7 @@ public IndexFrame(int file, long pointer, int size, FrameData bd, Table t, Class public IndexFrame(byte[] b, int file, long pointer, Map imap, Map hmap, Table t) { super(b, file, pointer, t); int ptr = FRAME_HEADER_SIZE; + this.terminate = false; final ByteString bs = new ByteString(this.b); while (ptr imap, Map 0) { //IOT does not contains frameptr final long allocId = imap.get(h.getFramePtr()); final long bptr = hmap.get(allocId) != null ? hmap.get(allocId) : Instance.getInstance().getFrameByAllocId(allocId).getFrameId(); - h.getFramePtrRowId().setFileId((int) bptr % 4096); - h.getFramePtrRowId().setFramePointer(bptr - (bptr % 4096)); + final long fbptr = bptr%4096; + h.getFramePtrRowId().setFileId((int) fbptr); + h.getFramePtrRowId().setFramePointer(bptr - fbptr); } final DataChunk dc = new DataChunk(bs.substring(ptr, ptr+INDEX_HEADER_SIZE+h.getLen()), this.getFile(), this.getPointer(), INDEX_HEADER_SIZE, this.getDataObject(), this.getEntityClass()); dc.setHeader(h); @@ -402,11 +416,16 @@ public synchronized ValueSet getMaxValue() throws InternalException { } public HashMap getAllocateMap() { - final HashMap imap = new HashMap(); + final HashMap imap = new HashMap<>(); for (Chunk c : data.getChunks()) { if (c.getHeader().getFramePtr() > 0) { //IOT does not contains frameptr - final long allocId = Instance.getInstance().getFrameById(c.getHeader().getFramePtr()).getAllocId(); - imap.put(c.getHeader().getFramePtr(), allocId); + final FrameData bd = Instance.getInstance().getFrameById(c.getHeader().getFramePtr()); + if (bd != null) { + final long allocId = bd.getAllocId(); + imap.put(c.getHeader().getFramePtr(), allocId); + } else { + logger.error("getAllocaleMap found null data frame for frame id " + c.getHeader().getFramePtr()); + } } } return imap; @@ -461,4 +480,8 @@ public synchronized void setLcId(long lcId) { this.setRes05((int)lcF); this.setRes07(lcId - lcF); } + + public boolean isTerminate() { + return terminate; + } } diff --git a/src/main/java/su/interference/core/Instance.java b/src/main/java/su/interference/core/Instance.java index 5fea3c6..ffe3921 100644 --- a/src/main/java/su/interference/core/Instance.java +++ b/src/main/java/su/interference/core/Instance.java @@ -135,6 +135,7 @@ public void rollback (Session s) { public static boolean initParams (String[] params) { final Config cfg = Config.getConfig(); + /* try { new HTTPServer(cfg.MMPORT); @@ -145,6 +146,7 @@ public static boolean initParams (String[] params) { return false; } */ + return true; } @@ -625,7 +627,7 @@ public DataFile getDataFileById (int id) { public ArrayList getDataFilesByType (int id) { final Table t = getTableByName("su.interference.persistent.DataFile"); - final ArrayList r = new ArrayList(); + final ArrayList r = new ArrayList<>(); for (Object o : t.getIndexFieldByColumn("type").getIndex().getObjectsByKey(id)) { r.add((DataFile)((DataChunk)o).getEntity()); } @@ -649,13 +651,22 @@ public synchronized Session getSessionBySid (long sid) { public synchronized Transaction getTransactionById (long transId) { if (transId == 0) { return null; } Table t = getTableByName("su.interference.persistent.Transaction"); - DataChunk dc = ((DataChunk)t.getIndexFieldByColumn("transId").getIndex().getObjectByKey(transId)); + DataChunk dc = ((DataChunk)t.getMapFieldByColumn("transId").getMap().get(transId)); if (dc==null) { return null; } return (Transaction)dc.getEntity(); } + public List getTransactionsBySid (long id) { + final Table t = getTableByName("su.interference.persistent.Transaction"); + final ArrayList r = new ArrayList<>(); + for (Object o : t.getIndexFieldByColumn("sid").getIndex().getObjectsByKey(id)) { + r.add((Transaction)((DataChunk)o).getEntity()); + } + return r; + } + public FreeFrame getFreeFrameById (long id) { final Table t = getTableByName("su.interference.persistent.FreeFrame"); final DataChunk dc = (DataChunk)t.getIndexFieldByColumn("frameId").getIndex().getObjectByKey(id); @@ -722,6 +733,17 @@ public List getTransFramesByTransId(long transId) { return res; } + public String getEventSubscriberByEntityId (String id) { + final Table t = getTableByName("su.interference.persistent.EventSubscriber"); + final MapField ixf = t.getMapFieldByColumn("entityId"); + final Map ixl = ixf.getMap(); + final DataChunk dc = (DataChunk)ixl.get(id); + if (dc != null) { + return ((EventSubscriber) dc.getEntity()).getSubscriberId(); + } + return null; + } + //used in unlock table mechanism @Deprecated public synchronized ArrayList getTransFrameByObjectId (int objectId) { @@ -752,7 +774,7 @@ public RetrieveLock getRetrieveLockById(int obj, long tran) { } public ArrayList getRetrieveLocksByObjectId(int obj) { - final ArrayList r = new ArrayList(); + final ArrayList r = new ArrayList<>(); final Table t = getTableByName("su.interference.persistent.RetrieveLock"); for (Object o : t.getIndexFieldByColumn("objectId").getIndex().getObjectsByKey(obj)) { final RetrieveLock rl = (RetrieveLock)((DataChunk)o).getEntity(); @@ -763,9 +785,9 @@ public ArrayList getRetrieveLocksByObjectId(int obj) { public synchronized List getTransactions() { final Table t = getTableByName("su.interference.persistent.Transaction"); - final ArrayList res = new ArrayList(); - for (Object o : t.getIndexFieldByColumn("transId").getIndex().getContent()) { - res.add((Transaction)((DataChunk)o).getEntity()); + final ArrayList res = new ArrayList<>(); + for (Object o : t.getMapFieldByColumn("transId").getMap().entrySet()) { + res.add((Transaction)((DataChunk)((Map.Entry)o).getValue()).getEntity()); } return res; } diff --git a/src/main/java/su/interference/core/SyncFrame.java b/src/main/java/su/interference/core/SyncFrame.java index 899754c..c34aff2 100644 --- a/src/main/java/su/interference/core/SyncFrame.java +++ b/src/main/java/su/interference/core/SyncFrame.java @@ -55,6 +55,7 @@ public class SyncFrame implements Comparable, Serializable, AllowRPredicate { private final boolean allowR; private final boolean proc; private final boolean started; + private final boolean distributed; private final Map imap; private final Map rtran; private final Map> uframes; @@ -73,6 +74,7 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb, boolean proc) throws Exce final FrameData bd = Instance.getInstance().getFrameById(frame.getPtr()); allowR = frame.isLocal() ? !t.isNoTran() || t.getName().equals("su.interference.persistent.UndoChunk") : false; this.proc = bd == null ? false : proc; + distributed = t.isDistributed(); if (bd == null && allowR) { final FreeFrame fframe = Instance.getInstance().getFreeFrameById(frame.getPtr()); @@ -243,6 +245,10 @@ public Map> getUFrames() { return uframes; } + public boolean isDistributed() { + return distributed; + } + public boolean equals (SyncFrame bl) { return (this.getFile() == bl.getFile()) && (this.getPointer() == bl.getPointer()); } diff --git a/src/main/java/su/interference/core/SyncQueue.java b/src/main/java/su/interference/core/SyncQueue.java index 9a5bb50..aeca4ae 100644 --- a/src/main/java/su/interference/core/SyncQueue.java +++ b/src/main/java/su/interference/core/SyncQueue.java @@ -26,6 +26,7 @@ this software and associated documentation files (the "Software"), to deal in import java.util.concurrent.*; import java.util.*; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,8 +118,11 @@ private synchronized boolean syncFramesFromQueue() throws Exception { for (FreeFrame fb : fframes) { s.persist(fb); } + //todo async process must depends from stop() method - pool2.submit(new TransportSyncTask(frames)); + + final List dframes = frames.stream().filter(p -> p.isDistributed()).collect(Collectors.toList()); + pool2.submit(new TransportSyncTask(dframes)); running = false; return true; diff --git a/src/main/java/su/interference/core/SystemCleanUp.java b/src/main/java/su/interference/core/SystemCleanUp.java index 2d49fbd..8ac7a9f 100644 --- a/src/main/java/su/interference/core/SystemCleanUp.java +++ b/src/main/java/su/interference/core/SystemCleanUp.java @@ -92,7 +92,7 @@ private void cleanUpFrames() { final long frameAmount = f.getDataObject().getFrameAmount(); if (f.getDataFile().isData() && cleanupDataEnabled()) { f.decreasePriority(); - if (f.isSynced() && f.getObjectId() > 999 && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) { + if (f.isSynced() && f.getObjectId() > 999 && frameAmount > getThr()) { if (f.clearFrame()) { d++; } @@ -107,7 +107,7 @@ private void cleanUpFrames() { if (f.isSynced() && f.getFrameType() == IndexFrame.INDEX_FRAME_NODE) { xn++; } - if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && !f.isRbck() && frameAmount > Config.getConfig().IX_CLEANUP_PROTECTION_THR) { + if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && !f.isRbck() && frameAmount > getIxThr()) { if (f.clearFrame()) { x++; } @@ -117,7 +117,7 @@ private void cleanUpFrames() { } } if (f.getDataFile().isTemp() && cleanupTempEnabled()) { - if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) { + if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && frameAmount > getThr()) { if (f.clearFrame()) { i++; } @@ -127,7 +127,7 @@ private void cleanUpFrames() { } } if (f.getDataFile().isUndo() && cleanupUndoEnabled()) { - if (f.isSynced() && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) { + if (f.isSynced() && frameAmount > getThr()) { if (f.clearFrame()) { u++; } @@ -185,4 +185,12 @@ private boolean cleanupTempEnabled() { // todo cleanup affects temp indices, disable until fix is released return false; } + + private int getThr() { + return Config.getConfig().CLEANUP_PROTECTION_THR/(Config.getConfig().FRAMESIZE/4096); + } + + private int getIxThr() { + return Config.getConfig().IX_CLEANUP_PROTECTION_THR/(Config.getConfig().FRAMESIZE/4096); + } } diff --git a/src/main/java/su/interference/core/SystemInit.java b/src/main/java/su/interference/core/SystemInit.java index 1209380..5e2de6e 100644 --- a/src/main/java/su/interference/core/SystemInit.java +++ b/src/main/java/su/interference/core/SystemInit.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -44,7 +44,7 @@ this software and associated documentation files (the "Software"), to deal in */ public class SystemInit { - private final static int INITIAL_CLASSES_AMT = 15; + private final static int INITIAL_CLASSES_AMT = 16; private final static Logger logger = LoggerFactory.getLogger(SystemInit.class); public static Table initSystem (boolean initStorage, int nodeType, Session s) throws Exception { @@ -69,6 +69,7 @@ public static String[] getInitialClasses() { initialClasses[12] = "su.interference.persistent.MgmtModule"; initialClasses[13] = "su.interference.persistent.Cursor"; initialClasses[14] = "su.interference.persistent.FrameSync"; + initialClasses[15] = "su.interference.persistent.EventSubscriber"; return initialClasses; } diff --git a/src/main/java/su/interference/core/TransFrameId.java b/src/main/java/su/interference/core/TransFrameId.java index 8bd333a..a57652b 100644 --- a/src/main/java/su/interference/core/TransFrameId.java +++ b/src/main/java/su/interference/core/TransFrameId.java @@ -1,8 +1,38 @@ +/** + The MIT License (MIT) + + Copyright (c) 2010-2021 head systems, ltd + + Permission is hereby granted, free of charge, to any person obtaining a copy of + this software and associated documentation files (the "Software"), to deal in + the Software without restriction, including without limitation the rights to + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + the Software, and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + */ + package su.interference.core; import java.io.Serializable; +/** + * @author Yuriy Glotanov + * @since 1.0 + */ + public class TransFrameId implements Serializable { + private final static long serialVersionUID = 2122989901000213555L; private final long cframeId; private final long uframeId; private final long transId; diff --git a/src/main/java/su/interference/core/ValueSet.java b/src/main/java/su/interference/core/ValueSet.java index dcb38e1..46ba112 100644 --- a/src/main/java/su/interference/core/ValueSet.java +++ b/src/main/java/su/interference/core/ValueSet.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -46,7 +46,7 @@ public Object[] getValueSet() { } public int compareTo (Object obj) { - return compare(obj, vs.length); + return compare(obj, ((ValueSet) obj).getValueSet().length); } //used in partial compare datachunks in sql group algorithm @@ -55,12 +55,12 @@ public int compareTo (Object obj) { public int compare (Object obj, int thr) { final ValueSet j = (ValueSet)obj; - for (int i=0; i trans = Instance.getInstance().getTransactionsBySid(this.sid); + for (Transaction tran : trans) { + this.delete(tran); + } + } + public long getSid() { return sid; } diff --git a/src/main/java/su/interference/persistent/Table.java b/src/main/java/su/interference/persistent/Table.java index 500b54c..f1dccb6 100644 --- a/src/main/java/su/interference/persistent/Table.java +++ b/src/main/java/su/interference/persistent/Table.java @@ -42,6 +42,7 @@ this software and associated documentation files (the "Software"), to deal in import su.interference.core.MapField; import su.interference.core.*; import su.interference.exception.*; +import su.interference.metrics.Meter; import su.interference.metrics.Metrics; import su.interference.mgmt.MgmtClassIdColumn; import su.interference.mgmt.MgmtColumn; @@ -149,6 +150,10 @@ public class Table implements ResultSet { public String[] fieldtypes; @Transient private static final ExecutorService framepool = Executors.newCachedThreadPool(); + @Transient + private volatile Meter persistMeter; + @Transient + private volatile Meter findMeter; public Class getSc() { return sc; @@ -251,6 +256,15 @@ public boolean isIndex() { } public boolean isDistributed() { + if (isNoTran()) { + if (this.name.equals(UndoChunk.class.getName())) { + return true; + } + return false; + } + if (isIndex()) { + return Instance.getInstance().getTableById(this.parentId).isDistributed(); + } final NoDistribute ca = (NoDistribute)this.genericClass.getAnnotation(NoDistribute.class); return ca == null; } @@ -264,10 +278,10 @@ public String getSystemName() { return this.name; } - public LinkedBlockingQueue getFrames(Session s) { + public LinkedBlockingQueue getFrames(Session s, String caller) { if (this.isIndex()) { //get ordered frame sequence - return this.getLeafFrames(s); + return this.getLeafFrames(s, caller); } final LinkedBlockingQueue q = new LinkedBlockingQueue<>(1000); final IndexList ixl = Instance.getInstance().getFrameDataTable().getIndexFieldByColumn("objectId").getIndex(); @@ -275,7 +289,7 @@ public LinkedBlockingQueue getFrames(Session s) { Runnable r = new Runnable() { @Override public void run() { - Thread.currentThread().setName("interference-retrieve-frames-thread-"+Thread.currentThread().getId()); + Thread.currentThread().setName("interference-retrieve-frames-"+caller+"-"+Thread.currentThread().getId()); try { for (Object o : ixl.getObjectsByKey(id)) { q.put((FrameData) ((DataChunk) o).getEntity(s)); @@ -1345,6 +1359,14 @@ protected void delete (final Object o, final Session s, LLT extllt, boolean igno if (bd == null) { logger.error("cannot found frame " + dc.getHeader().getRowID().getFileId() + ":" + dc.getHeader().getRowID().getFramePointer() + " during delete " + o.getClass().getSimpleName() + Thread.currentThread().getName()); } + + if (noTran) { + if (o instanceof OnDelete) { + Method m = this.genericClass.getMethod("onDelete", null); + m.invoke(o, null); + } + } + if (ignoreTransaction || udc == null) { deleteIndexes(dc, noTran, true, s, llt); bd.removeChunk(dc.getHeader().getRowID().getRowPointer(), s, llt); @@ -1946,7 +1968,7 @@ public synchronized void storeFrames(List frames, int sourceNodeId, L @Deprecated public synchronized List getContent(Session s) throws Exception { - ArrayList res = new ArrayList(); + ArrayList res = new ArrayList<>(); res.addAll(getLocalContent(this.fileStart+this.frameStart, s)); for (Map.Entry entry : ixstartfs.entrySet()) { res.addAll(getLocalContent(entry.getValue(), s)); @@ -1957,7 +1979,7 @@ public synchronized List getContent(Session s) throws Exception { @Deprecated private synchronized List getLocalContent(long start, Session s) throws Exception { ArrayList res = new ArrayList(); - ArrayList levelNodes = new ArrayList(); + ArrayList levelNodes = new ArrayList<>(); boolean cnue = true; final FrameData bd = Instance.getInstance().getFrameById(start); if (bd == null) { @@ -1971,7 +1993,7 @@ private synchronized List getLocalContent(long start, Session s) throws E el.sort(); levelNodes.add(el); while (cnue) { - ArrayList inNodes = new ArrayList(); + ArrayList inNodes = new ArrayList<>(); for (int k=0; k getLocalContent(long start, Session s) throws E return res; } - private synchronized LinkedBlockingQueue getLeafFrames (Session s) throws InternalException, EmptyFrameHeaderFound { + private synchronized LinkedBlockingQueue getLeafFrames (Session s, String caller) throws InternalException, EmptyFrameHeaderFound { final LinkedBlockingQueue q = new LinkedBlockingQueue<>(1000); final ArrayList startfs = new ArrayList<>(); startfs.add(this.fileStart+this.frameStart); @@ -2008,7 +2030,7 @@ private synchronized LinkedBlockingQueue getLeafFrames (Session s) th Runnable r = new Runnable() { @Override public void run() { - Thread.currentThread().setName("interference-retrieve-index-frames-thread-"+Thread.currentThread().getId()); + Thread.currentThread().setName("interference-retrieve-index-frames-"+caller+"-"+Thread.currentThread().getId()); try { for (Long start : startfs) { getLocalLeafFrames(q, start, s); @@ -2024,8 +2046,9 @@ public void run() { return q; } + // todo which possibility of refactor IndexFrame -> FrameData to decrease heap load private synchronized LinkedBlockingQueue getLocalLeafFrames (LinkedBlockingQueue q, long start, Session s) throws Exception { - ArrayList levelNodes = new ArrayList(); + ArrayList levelNodes = new ArrayList<>(); boolean cnue = true; FrameData bd = Instance.getInstance().getFrameById(start); //frame must be local or remote chain started (RCS) @@ -2036,31 +2059,44 @@ private synchronized LinkedBlockingQueue getLocalLeafFrames (LinkedBl el.sort(); levelNodes.add(el); while (cnue) { - ArrayList inNodes = new ArrayList(); - for (int k=0; k inNodes = new ArrayList<>(); + IndexFrame ixf = null; + for (int k = 0; k < levelNodes.size(); k++) { //levelNodes.get(k).sort(); - if (levelNodes.get(k).getType()==1) { + if (levelNodes.get(k).getType() == 1) { cnue = false; q.put(Instance.getInstance().getFrameById(levelNodes.get(k).getPtr())); } else { - for (int i=0; i0) { + if (lcId > 0) { inNodes.add(Instance.getInstance().getFrameById(lcId).getIndexFrame()); } } } } + if (ixf != null && ixf.getType() == 1 && inNodes.size() == 0) { + final long lcId = ixf.getLcId(); + if (lcId > 0) { + q.put(Instance.getInstance().getFrameById(lcId)); + cnue = false; + } + } levelNodes = inNodes; } return q; } public synchronized String getInfo(Session s) throws Exception { - ArrayList levelNodes = new ArrayList(); + ArrayList levelNodes = new ArrayList<>(); boolean cnue = true; IndexFrame el = Instance.getInstance().getFrameById(this.fileStart+this.frameStart).getIndexFrame(); el.sort(); @@ -2070,7 +2106,7 @@ public synchronized String getInfo(Session s) throws Exception { int nodeamt = 0; int leafamt = 0; while (cnue) { - ArrayList inNodes = new ArrayList(); + ArrayList inNodes = new ArrayList<>(); for (int k=0; k { private final static Logger logger = LoggerFactory.getLogger(FrameApiJoin.class); + private final static long serialVersionUID = 2355717070212234856L; private final int nodeId; private final transient FrameApi bd1; private final transient FrameApi bd2; @@ -48,8 +49,9 @@ public class FrameApiJoin implements Serializable, Callable { private final long leftAllocId; private final long rightAllocId; private final transient CountDownLatch latch = new CountDownLatch(1); - private List result; + private BlockingQueue result; private boolean failed; + private final boolean terminate; public FrameApiJoin(int nodeId, SQLCursor cur, FrameApi bd1, FrameApi bd2) { this.nodeId = nodeId; @@ -58,10 +60,21 @@ public FrameApiJoin(int nodeId, SQLCursor cur, FrameApi bd1, FrameApi bd2) { this.leftAllocId = bd1.getAllocId(); this.rightAllocId = bd2 == null ? 0 : bd2 instanceof SQLIndexFrame ? 0 : bd2.getAllocId(); if (nodeId == Config.getConfig().LOCAL_NODE_ID) { - frameJoinTask = cur.buildFrameJoinTask(nodeId, bd1, bd2); + this.frameJoinTask = cur.buildFrameJoinTask(nodeId, bd1, bd2); } else { - frameJoinTask = null; + this.frameJoinTask = null; } + this.terminate = false; + } + + public FrameApiJoin() { + this.nodeId = 0; + this.bd1 = null; + this.bd2 = null; + this.leftAllocId = 0; + this.rightAllocId = 0; + this.frameJoinTask = null; + this.terminate = true; } public FrameApiJoin call() throws InterruptedException { @@ -119,15 +132,15 @@ public FrameApi getBd2() { return bd2; } - public List getResult() { + public BlockingQueue getResult() { return result; } - public void setResult(List result) { + public void setResult(BlockingQueue result) { this.result = result; } - public void setResultWithCountDown(List result) { + public void setResultWithCountDown(BlockingQueue result) { this.result = result; latch.countDown(); } @@ -140,4 +153,8 @@ public void setFailed(boolean failed) { this.failed = failed; latch.countDown(); } + + public boolean isTerminate() { + return terminate; + } } diff --git a/src/main/java/su/interference/sql/FrameHolder.java b/src/main/java/su/interference/sql/FrameHolder.java index cd6ca9b..99a7ec5 100644 --- a/src/main/java/su/interference/sql/FrameHolder.java +++ b/src/main/java/su/interference/sql/FrameHolder.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -46,15 +46,14 @@ public class FrameHolder { @SuppressWarnings("unchecked") public FrameHolder(ResultSet rs) { final Table t = rs instanceof ResultSetImpl ? ((ResultSetImpl)rs).getTarget() : null; + persistent = rs.isPersistent(); if (t != null && t instanceof Table) { cframes = new AtomicReference[((Table)t).getLbs().length]; for (int i = 0; i< ((Table)t).getLbs().length; i++) { - cframes[i] = new AtomicReference(); + cframes[i] = new AtomicReference<>(); cframes[i].set(((Table)t).getLbs()[i].getBd()); } - persistent = true; } else { - persistent = false; cframes = null; } } diff --git a/src/main/java/su/interference/sql/FrameJoinTask.java b/src/main/java/su/interference/sql/FrameJoinTask.java index d5b38ce..31c6fb7 100644 --- a/src/main/java/su/interference/sql/FrameJoinTask.java +++ b/src/main/java/su/interference/sql/FrameJoinTask.java @@ -32,18 +32,17 @@ this software and associated documentation files (the "Software"), to deal in import su.interference.api.GenericResult; import java.util.List; -import java.util.concurrent.Callable; +import java.util.concurrent.*; import java.util.ArrayList; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.concurrent.ConcurrentHashMap; /** * @author Yuriy Glotanov * @since 1.0 */ -public class FrameJoinTask implements Callable> { +public class FrameJoinTask implements Callable> { private final static Logger logger = LoggerFactory.getLogger(FrameJoinTask.class); private final FrameApi bd1; @@ -52,7 +51,7 @@ public class FrameJoinTask implements Callable> { private final List cols; private final NestedCondition nc; private final Session s; - private final List res; + private final LinkedBlockingQueue q; private final int sqlcid; private final boolean last; private final static ConcurrentHashMap cache = new ConcurrentHashMap(); @@ -73,7 +72,7 @@ public FrameJoinTask(Cursor cur, FrameApi bd1, FrameApi bd2, ResultSet target, L this.cols = cols; this.nc = nc; this.s = s; - this.res = new ArrayList(); + this.q = new LinkedBlockingQueue<>(Config.getConfig().RETRIEVE_QUEUE_SIZE); this.sqlcid = sqlcid; this.last = last; this.j = j; @@ -81,7 +80,7 @@ public FrameJoinTask(Cursor cur, FrameApi bd1, FrameApi bd2, ResultSet target, L } @SuppressWarnings("unchecked") - public List call() throws Exception { + public BlockingQueue call() throws Exception { final Thread thread = Thread.currentThread(); thread.setName("interference-sql-join-task-" + thread.getId()); final Class r = target instanceof ResultSetImpl ? ((ResultSetImpl)target).getTableClass() : target instanceof StreamQueue ? ((StreamQueue) target).getRstable().getSc() : null; @@ -94,63 +93,7 @@ public List call() throws Exception { if (bd2 != null && bd2.getImpl() == FrameApi.IMPL_INDEX && bd1.getImpl() == FrameApi.IMPL_INDEX) { if (((SQLIndexFrame) bd2).isMerged()) { - if (hmap.getJoin() == SQLJoinDispatcher.RIGHT_MERGE) { - final ArrayList drs1 = bd1.getFrameEntities(s); - final ArrayList drs2 = bd2 == null ? null : bd2.getFrameEntities(s); - final int s1 = drs1.size(); - final int s2 = drs2.size(); - int p1 = 0; - int p2 = 0; - boolean cnue = true; - while (cnue) { - if (s1 > 0 && s2 > 0) { - final Comparable o1 = getKeyValue(drs1.get(p1), ((SQLIndexFrame) bd2).getLkey(), s); - final Comparable o2 = getKeyValue(drs2.get(p2), ((SQLIndexFrame) bd2).getRkey(), s); - final int cmp = o1.compareTo(o2); - if (cmp < 0) { - p1++; - } else if (cmp > 0) { - p2++; - } else { - Object j = null; - final IndexChunk ib1 = (IndexChunk) drs1.get(p1); - final IndexChunk ib2 = (IndexChunk) drs2.get(p2); - if (ib1.getDataChunk() == null) { - logger.error(bd1.getAllocId() + " " + bd1.getFrameId() + " found ib1chunk = null, drs1.size = " + drs1.size() + " p1 = " + p1); - } - if (ib2.getDataChunk() == null) { - logger.error(bd2.getAllocId() + " " + bd2.getFrameId() + " found ib2chunk = null, drs2.size = " + drs2.size() + " p2 = " + p2); - } - final Object oo1 = ib1.getDataChunk() == null ? null : ib1.getDataChunk().getEntity(s); - final Object oo2 = ib2.getDataChunk() == null ? null : ib2.getDataChunk().getEntity(s); - j = joinDataRecords(r, c1, c2, t1, t2, oo1, oo2, cols, c1rs, s); - if (hmap.skipCheckNC()) { - res.add(j); - } else { - if (nc.checkNC(j, sqlcid, last, s)) { - res.add(j); - } - } - final int n = p2 + 1; - if (n == s2) { - p1++; - } else { - final Comparable next = getKeyValue(drs2.get(n), ((SQLIndexFrame) bd2).getRkey(), s); - if (o1.compareTo(next) < 0) { - p1++; - } else { - p2++; - } - } - } - if (p1 == s1 || p2 == s2) { - cnue = false; - } - } else { - cnue = false; - } - } - } else if (hmap.getJoin() == SQLJoinDispatcher.MERGE) { + if (hmap.getJoin() == SQLJoinDispatcher.MERGE) { IndexChunk ib1 = (IndexChunk) ((SQLIndexFrame) bd1).poll(s); IndexChunk ib2 = (IndexChunk) ((SQLIndexFrame) bd2).poll(s); boolean cnue = true; @@ -174,28 +117,20 @@ public List call() throws Exception { final Object oo2 = ib2.getDataChunk() == null ? null : ib2.getDataChunk().getEntity(s); Object j = joinDataRecords(r, c1, c2, t1, t2, oo1, oo2, cols, c1rs, s); if (hmap.skipCheckNC()) { - res.add(j); + q.put(j); } else { if (nc.checkNC(j, sqlcid, last, s)) { - res.add(j); + q.put(j); } } - IndexChunk nc = (IndexChunk) ((SQLIndexFrame) bd2).poll(s); - if (nc == null) { + IndexChunk nc2 = (IndexChunk) ((SQLIndexFrame) bd2).poll(s); + if (nc2 == null) { ib1 = (IndexChunk) ((SQLIndexFrame) bd1).poll(s); } else { - final Comparable next = getKeyValue(nc, ((SQLIndexFrame) bd2).getRkey(), s); - ib2 = nc; - - /* - if (o1.compareTo(next) < 0) { - ib1 = (IndexChunk) ((SQLIndexFrame) bd1).poll(s); - } else { - ib2 = nc; - } -*/ + ib2 = nc2; } + } if (ib1 == null || ib2 == null) { cnue = false; @@ -208,7 +143,7 @@ public List call() throws Exception { } } else { final ArrayList drs1 = bd1.getFrameEntities(s); - final ArrayList drs2 = bd2 == null ? null : bd2.getFrameEntities(s); + final ArrayList drs2 = bd2 == null ? null : bd2.getImpl() == FrameApi.IMPL_HASH ? null : bd2.getImpl() == FrameApi.IMPL_INDEX ? null : bd2.getFrameEntities(s); for (Object o1 : drs1) { if (bd1.getImpl() == FrameApi.IMPL_INDEX) { @@ -224,10 +159,10 @@ public List call() throws Exception { if (!(o2.get(0) == null && last)) { Object j = joinDataRecords(r, c1, c2, t1, t2, o1, o2.get(0), cols, c1rs, s); if (hmap.skipCheckNC()) { - res.add(j); + q.put(j); } else { if (nc.checkNC(j, sqlcid, last, s)) { - res.add(j); + q.put(j); } } } @@ -242,10 +177,10 @@ public List call() throws Exception { final Object oo2 = ib.getDataChunk().getEntity(s); final Object j = joinDataRecords(r, c1, c2, t1, t2, o1, oo2, cols, c1rs, s); if (hmap.skipCheckNC()) { - res.add(j); + q.put(j); } else { if (nc.checkNC(j, sqlcid, last, s)) { - res.add(j); + q.put(j); } } } @@ -256,12 +191,12 @@ public List call() throws Exception { if (r == null || target instanceof StreamQueue) { //todo need to cast o1 to RS type if (nc.checkNC(o1, sqlcid, last, s)) { - res.add(o1); //target table is null -> result class is null -> returns generic entities + q.put(o1); //target table is null -> result class is null -> returns generic entities } } else { Object j = joinDataRecords(r, c1, c2, t1, t2, o1, null, cols, c1rs, s); if (nc.checkNC(j, sqlcid, last, s)) { - res.add(j); + q.put(j); } } } @@ -270,17 +205,18 @@ public List call() throws Exception { for (Object o2 : drs2) { Object j = joinDataRecords(r, c1, c2, t1, t2, o1, o2, cols, c1rs, s); if (nc.checkNC(j, sqlcid, last, s)) { - res.add(j); + q.put(j); } } } } } - Metrics.get("recordLCount").put(res.size()); + Metrics.get("recordLCount").put(q.size()); + q.put(new ResultSetTerm()); if (j != null) { - j.setResult(res); + j.setResult(q); } - return res; + return q; } private Object joinDataRecords (Class r, Class c1, Class c2, int t1, int t2, Object o1, Object o2, List cols, boolean isrs, Session s) diff --git a/src/main/java/su/interference/sql/ResultSet.java b/src/main/java/su/interference/sql/ResultSet.java index 2a071b5..79655df 100644 --- a/src/main/java/su/interference/sql/ResultSet.java +++ b/src/main/java/su/interference/sql/ResultSet.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -26,11 +26,8 @@ this software and associated documentation files (the "Software"), to deal in import su.interference.core.Chunk; import su.interference.core.DataChunk; -import su.interference.exception.InternalException; import su.interference.persistent.Session; -import java.net.MalformedURLException; - /** * @author Yuriy Glotanov * @since 1.0 @@ -41,9 +38,10 @@ public interface ResultSet { Object poll(Session s) throws Exception; Chunk cpoll(Session s) throws Exception; int getObjectId(); - boolean isIndex() throws ClassNotFoundException, MalformedURLException; - Class getTableClass() throws ClassNotFoundException, MalformedURLException; - java.lang.reflect.Field[] getFields() throws ClassNotFoundException, InternalException, MalformedURLException; + boolean isIndex(); + Class getTableClass(); + java.lang.reflect.Field[] getFields(); void deallocate(Session s) throws Exception; + boolean isPersistent(); } diff --git a/src/main/java/su/interference/sql/ResultSetImpl.java b/src/main/java/su/interference/sql/ResultSetImpl.java index b5a81d8..4671048 100644 --- a/src/main/java/su/interference/sql/ResultSetImpl.java +++ b/src/main/java/su/interference/sql/ResultSetImpl.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -33,7 +33,6 @@ this software and associated documentation files (the "Software"), to deal in import su.interference.persistent.Session; import su.interference.persistent.Table; -import java.net.MalformedURLException; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -48,7 +47,7 @@ public class ResultSetImpl implements ResultSet { private final Table target; private final boolean persistent; private final LinkedBlockingQueue q = new LinkedBlockingQueue(Config.getConfig().RETRIEVE_QUEUE_SIZE); - private boolean started; + private boolean started = true; private CountDownLatch latch; private final SQLCursor sqlc; private Queue q_; @@ -66,7 +65,6 @@ public DataChunk persist(Object o, Session s) throws Exception { } else { final boolean success = q.offer(o); if (!success) { - latch.countDown(); q.put(o); } } @@ -74,24 +72,25 @@ public DataChunk persist(Object o, Session s) throws Exception { } public Object poll(Session s) throws Exception { - if (!started) { - if (sqlc != null) { - sqlc.flushTarget(); + if (persistent) { + if (sqlc != null && latch == null) { latch = new CountDownLatch(1); + sqlc.flushTarget(); } if (latch != null) { latch.await(); } - started = true; - } - if (persistent) { return target.poll(s); } else { - final Object o = q.take(); - if (o instanceof ResultSetTerm) { - started = false; + if (sqlc != null && !sqlc.isFlush()) { + sqlc.flushTarget(); } if (started) { + final Object o = q.take(); + if (o instanceof ResultSetTerm) { + started = false; + return null; + } return o; } else { return null; @@ -100,17 +99,14 @@ public Object poll(Session s) throws Exception { } public Chunk cpoll(Session s) throws Exception { - if (!started) { - if (sqlc != null) { - sqlc.flushTarget(); + if (persistent) { + if (sqlc != null && latch == null) { latch = new CountDownLatch(1); + sqlc.flushTarget(); } if (latch != null) { latch.await(); } - started = true; - } - if (persistent) { return target.cpoll(s); } else { return null; @@ -135,15 +131,15 @@ public int getObjectId() { return target.getObjectId(); } - public boolean isIndex() throws ClassNotFoundException, MalformedURLException { + public boolean isIndex() { return target.isIndex(); } - public Class getTableClass() throws ClassNotFoundException, MalformedURLException { + public Class getTableClass() { return target.getTableClass(); } - public java.lang.reflect.Field[] getFields() throws ClassNotFoundException, InternalException, MalformedURLException { + public java.lang.reflect.Field[] getFields() throws InternalException { return target.getFields(); } diff --git a/src/main/java/su/interference/sql/ResultSetTerm.java b/src/main/java/su/interference/sql/ResultSetTerm.java index 1c9b088..974ee86 100644 --- a/src/main/java/su/interference/sql/ResultSetTerm.java +++ b/src/main/java/su/interference/sql/ResultSetTerm.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -24,10 +24,13 @@ this software and associated documentation files (the "Software"), to deal in package su.interference.sql; +import java.io.Serializable; + /** * @author Yuriy Glotanov * @since 1.0 */ -public class ResultSetTerm { +public class ResultSetTerm implements Serializable { + private final static long serialVersionUID = 7302274901293874556L; } diff --git a/src/main/java/su/interference/sql/SQLCursor.java b/src/main/java/su/interference/sql/SQLCursor.java index 51fff33..a007a9c 100644 --- a/src/main/java/su/interference/sql/SQLCursor.java +++ b/src/main/java/su/interference/sql/SQLCursor.java @@ -52,13 +52,13 @@ public class SQLCursor implements FrameIterator { private static ExecutorService streampool = Executors.newCachedThreadPool(); private static ExecutorService groupspool = Executors.newCachedThreadPool(); private static ExecutorService rspool = Executors.newCachedThreadPool(); - private List ltasks; - private List rtasks; - private List tasks_; + private final LinkedBlockingQueue ltasks; + private final LinkedBlockingQueue rtasks; + private final LinkedBlockingQueue rtasks_; + boolean ldone = false; + boolean rdone = false; private FrameData bdnext; private FrameHolder current; - private int lptr = 0; - private int rptr = 0; private boolean sent; private Cursor cur; private Session s; @@ -79,6 +79,7 @@ public class SQLCursor implements FrameIterator { private static Map>> sfmap = new ConcurrentHashMap<>(); private static final int BATCH_SIZE = 8; + private static final int REMOTE_TASK_SIZE = 1000; private final static Logger logger = LoggerFactory.getLogger(SQLCursor.class); public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition nc, List rscols, boolean ixflag, boolean last, Cursor cur, Session s) throws Exception { @@ -122,9 +123,9 @@ public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition } current = new FrameHolder(target); - ltasks = new ArrayList<>(); - rtasks = new ArrayList<>(); - tasks_ = new ArrayList<>(); + ltasks = new LinkedBlockingQueue<>(); + rtasks = new LinkedBlockingQueue<>(); + rtasks_ = new LinkedBlockingQueue<>(); //rebuild column set for sqlcursor iterator final SQLCursor cursor_ = lbi.getType() == FrameIterator.TYPE_CURSOR ? (SQLCursor) lbi : rbi != null && rbi.getType() == FrameIterator.TYPE_CURSOR ? (SQLCursor) rbi : null; @@ -150,7 +151,7 @@ public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition final ValueCondition vc = nc.getIndexVC(lbi, null); if (vc != null) { final Table lt = Instance.getInstance().getTableById(lbi.getObjectId()); - this.lbi = new SQLIndex(vc.getConditionColumn().getIndex(), lt, false, vc.getConditionColumn(), vc.getConditionColumn(), true, nc, 0, s); + this.lbi = new SQLIndex(vc.getConditionColumn().getIndex(), lt, true, vc.getConditionColumn(), vc.getConditionColumn(), false, nc, 0, s); this.rbi = rbi; } else { this.lbi = lbi; @@ -170,7 +171,7 @@ public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition this.lbi.setLeftfs(true); } if (this.lbi instanceof SQLCursor) { - logger.info("use full scan for "+lt.getName()); + logger.info("use full scan for " + lt.getName() +" persistent = "+((SQLCursor) lbi).getTarget().isPersistent()); } if (this.rbi != null) { @@ -179,7 +180,7 @@ public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition logger.info("use full scan for "+rt.getName()); } if (this.rbi instanceof SQLCursor) { - logger.info("use full scan for "+rt.getName()); + logger.info("use full scan for " + rt.getName() +" persistent = "+((SQLCursor) lbi).getTarget().isPersistent()); } } @@ -198,67 +199,119 @@ protected FrameJoinTask buildFrameJoinTask(int nodeId, FrameApi bd1, FrameApi bd return new FrameJoinTask(cur, bd1, bd2, target, rscols, nc, id, nodeId, last, lbi.isLeftfs(), hmap, null, s); } - public void build() throws Exception { + public void build() { final Integer[] ns = Config.getConfig().TEST_DISTRIBUTE_MODE == 0 ? TransportContext.getInstance().getOnlineNodesWithLocal() : TransportContext.getInstance().getNodesWithLocal(); - int i = 0; - boolean isc = this.lbi instanceof SQLCursor || this.rbi instanceof SQLCursor; - //int tnode = 0; + final boolean isc = this.lbi instanceof SQLCursor || this.rbi instanceof SQLCursor; + final SQLCursor c1 = this; + final LinkedBlockingQueue rtasks__ = new LinkedBlockingQueue<>(); - while (lbi.hasNextFrame()) { - final FrameApi bd1 = lbi.nextFrame(); + Runnable build = new Runnable() { + @Override + public void run() { + Thread.currentThread().setName("interference sql cursor build "+Thread.currentThread().getId()); + int i = 0; + try { + while (lbi.hasNextFrame()) { + final FrameApi bd1 = lbi.nextFrame(); - if (bd1 != null) { - if (rbi == null) { - if (ns[i] == Config.getConfig().LOCAL_NODE_ID) { - ltasks.add(new FrameApiJoin(ns[i], this, bd1, null)); - } else { - rtasks.add(new FrameApiJoin(ns[i], this, bd1, null)); - } - i++; - if (i == ns.length) { - i = 0; - } - } else { - while (rbi.hasNextFrame()) { - final FrameApi bd2 = rbi.nextFrame(); - if (bd2 != null) { - if (rightType == null) { - rightType = bd2.getClass().getSimpleName(); - } - if (lbi.noDistribute() || rbi.noDistribute()) { - ltasks.add(new FrameApiJoin(Config.getConfig().LOCAL_NODE_ID, this, bd1, bd2)); - } else { + if (bd1 != null) { + if (rbi == null) { if (ns[i] == Config.getConfig().LOCAL_NODE_ID) { - ltasks.add(new FrameApiJoin(ns[i], this, bd1, bd2)); + ltasks.put(new FrameApiJoin(ns[i], c1, bd1, null)); } else { - rtasks.add(new FrameApiJoin(ns[i], this, bd1, bd2)); + rtasks__.put(new FrameApiJoin(ns[i], c1, bd1, null)); } - } - i++; - if (i == ns.length) { - i = 0; + i++; + if (i == ns.length) { + i = 0; + } + } else { + while (rbi.hasNextFrame()) { + final FrameApi bd2 = rbi.nextFrame(); + if (bd2 != null) { + if (rightType == null) { + rightType = bd2.getClass().getSimpleName(); + } + if (lbi.noDistribute() || rbi.noDistribute()) { + ltasks.put(new FrameApiJoin(Config.getConfig().LOCAL_NODE_ID, c1, bd1, bd2)); + } else { + if (ns[i] == Config.getConfig().LOCAL_NODE_ID) { + ltasks.put(new FrameApiJoin(ns[i], c1, bd1, bd2)); + } else { + rtasks__.put(new FrameApiJoin(ns[i], c1, bd1, bd2)); + } + } + i++; + if (i == ns.length) { + i = 0; + } + } + } + rbi.resetIterator(); } } } - rbi.resetIterator(); + ltasks.put(new FrameApiJoin()); + rtasks__.put(new FrameApiJoin()); + } catch (Exception e) { + logger.error("exception occured during cursor build", e); } } - } - logger.info("SQL cursor is build: ltasks/rtasks amount = "+ltasks.size()+"/" + rtasks.size() + ", use NC check = " + last); + }; + remotepool.submit(build); + if (!sent) { - for (Integer nodeId : ns) { - if (nodeId != Config.getConfig().LOCAL_NODE_ID) { - final Map joins = new HashMap<>(); - for (FrameApiJoin j : rtasks) { - if (j.getNodeId() == nodeId) { - joins.put(j.getKey(), j); + Runnable send = new Runnable() { + @Override + public void run() { + Thread.currentThread().setName("interference sql cursor remote send "+Thread.currentThread().getId()); + try { + final Map> joins = new HashMap<>(); + for (Integer nodeId : ns) { + if (nodeId != Config.getConfig().LOCAL_NODE_ID) { + joins.put(nodeId, new HashMap<>()); + } } + + boolean cnue = true; + while (cnue) { + final FrameApiJoin j = rtasks__.take(); + if (j.isTerminate()) { + cnue = false; + } else { + if (j.getNodeId() != Config.getConfig().LOCAL_NODE_ID) { + joins.get(j.getNodeId()).put(j.getKey(), j); + if (joins.get(j.getNodeId()).size() == REMOTE_TASK_SIZE) { + final RemoteTask rt = new RemoteTask(cur, j.getNodeId(), joins.get(j.getNodeId()), rightType, target.getTableClass() == null ? null : target.getTableClass().getName()); + remotepool.submit(rt); + for (Map.Entry entry : joins.get(j.getNodeId()).entrySet()) { + rtasks.put(entry.getValue()); + } + joins.put(j.getNodeId(), new HashMap<>()); + } + } + } + } + + for (Integer nodeId : ns) { + if (nodeId != Config.getConfig().LOCAL_NODE_ID) { + if (joins.get(nodeId).size() > 0) { + final RemoteTask rt = new RemoteTask(cur, nodeId, joins.get(nodeId), rightType, target.getTableClass() == null ? null : target.getTableClass().getName()); + remotepool.submit(rt); + for (Map.Entry entry : joins.get(nodeId).entrySet()) { + rtasks.put(entry.getValue()); + } + } + } + } + rtasks.put(new FrameApiJoin()); + } catch (Exception e) { + logger.error("exception occured during remote send of tasks", e); } - final RemoteTask rt = new RemoteTask(cur, nodeId, joins, rightType, target.getTableClass() == null ? null : target.getTableClass().getName()); - remotepool.submit(rt); + sent = true; } - } - sent = true; + }; + remotepool.submit(send); } } @@ -302,13 +355,20 @@ public void run() { if (flist.size() > 0) { final ContainerFrame cf = new ContainerFrame(lbi.getObjectId(), flist); final FrameJoinTask task = new FrameJoinTask(cur, cf, null, target, ((StreamQueue) target).getRscols(), nc, id, Config.getConfig().LOCAL_NODE_ID, last, lbi.isLeftfs(), null, null, s); - final Future> ft = exec.submit(task); - - if (cur.getSqlStmt().isGroupedResult()) { - q_in.addAll(ft.get()); - } else { - for (Object o : ft.get()) { - target.persist(o, s); + final Future> ft = exec.submit(task); + final BlockingQueue q = ft.get(); + boolean cnue = true; + + while (cnue) { + final Object o = q.take(); + if (o instanceof ResultSetTerm) { + cnue = false; + } else { + if (cur.getSqlStmt().isGroupedResult()) { + q_in.add(o); + } else { + target.persist(o, s); + } } } } @@ -317,13 +377,20 @@ public void run() { FrameApi f = q.poll(); if (f != null) { final FrameJoinTask task = new FrameJoinTask(cur, f, null, target, ((StreamQueue) target).getRscols(), nc, id, Config.getConfig().LOCAL_NODE_ID, last, lbi.isLeftfs(), null, null, s); - final Future> ft = exec.submit(task); - - if (cur.getSqlStmt().isGroupedResult()) { - q_in.addAll(ft.get()); - } else { - for (Object o : ft.get()) { - target.persist(o, s); + final Future> ft = exec.submit(task); + final BlockingQueue q = ft.get(); + boolean cnue = true; + + while (cnue) { + final Object o = q.take(); + if (o instanceof ResultSetTerm) { + cnue = false; + } else { + if (cur.getSqlStmt().isGroupedResult()) { + q_in.add(o); + } else { + target.persist(o, s); + } } } } @@ -368,7 +435,7 @@ public List execute(FrameApi bd1, FrameApi bd2) throws Exception { } */ - public Future> execute(FrameApi bd1, FrameApi bd2, FrameApiJoin j) { + public Future> execute(FrameApi bd1, FrameApi bd2, FrameApiJoin j) { final FrameJoinTask task = new FrameJoinTask(cur, bd1, bd2, target, rscols, nc, id, Config.getConfig().LOCAL_NODE_ID, last, lbi.isLeftfs(), hmap, j, s); return exec.submit(task); } @@ -378,94 +445,123 @@ public synchronized FrameData nextFrame() { } private synchronized FrameData nextFrame2() throws InternalException { - boolean ldone = lptr >= ltasks.size(); - boolean rdone = rptr >= rtasks.size(); boolean done = ldone && rdone; + boolean prcrj = true; FrameData ret = current.getFrame(ldone && rdone); - while (!done && ret==null) { - final ArrayList> flist = new ArrayList<>(); - final ArrayList> flist2 = new ArrayList<>(); - try { - for (int i=0; i < BATCH_SIZE; i++) { - if (ldone) { - if (!rdone) { - if ((rptr + i) < rtasks.size()) { - final FrameApiJoin j = rtasks.get(rptr + i); - flist2.add(exec2.submit(j)); - } - } - } else { - if ((lptr + i) < ltasks.size()) { - final FrameApiJoin j = ltasks.get(lptr + i); - flist.add(exec.submit(j)); + if (ret != null) { + return ret; + } + + try { + + while (!done) { + final ArrayList> flist = new ArrayList<>(); + final ArrayList> flist2 = new ArrayList<>(); + + for (int i = 0; i < BATCH_SIZE; i++) { + if (!ldone) { + final FrameApiJoin j = ltasks.take(); + if (j.isTerminate()) { + ldone = true; + } else { + flist.add(exec2.submit(j)); } } } - if (ldone) { + + for (int i = 0; i < BATCH_SIZE; i++) { if (!rdone) { - for (Future f : flist2) { - try { - final FrameApiJoin j = f.get(); - if (j.isFailed()) { - FrameApiJoin j_ = new FrameApiJoin(Config.getConfig().LOCAL_NODE_ID, this, j.getBd1(), j.getBd2()); - tasks_.add(j_); - } else { - for (Object o : j.getResult()) { - target.persist(o, s); - } - } - } catch (Exception e) { - if (e instanceof ExecutionException) { - e.getCause().printStackTrace(); - } else { - e.printStackTrace(); - } - } + final FrameApiJoin j = rtasks.take(); + if (j.isTerminate()) { + rdone = true; + } else { + flist2.add(exec2.submit(j)); } } - } else { - for (Future f : flist) { - try { - final FrameApiJoin j = f.get(); - for (Object o : j.getResult()) { - target.persist(o, s); - } - } catch (Exception e) { - if (e instanceof ExecutionException) { - e.getCause().printStackTrace(); + } + +// for (int i=0; i < BATCH_SIZE; i++) { + if (!prcrj) { + final FrameApiJoin j = rtasks_.take(); + if (j.isTerminate()) { + prcrj = true; + } else { + flist.add(exec2.submit(j)); + } + } +// } + + for (Future f : flist) { + try { + final FrameApiJoin j = f.get(); + final BlockingQueue q = j.getResult(); + boolean cnue = true; + while (cnue) { + final Object o = q.take(); + if (o instanceof ResultSetTerm) { + cnue = false; } else { - e.printStackTrace(); + target.persist(o, s); } } + } catch (Exception e) { + if (e instanceof ExecutionException) { + e.getCause().printStackTrace(); + } else { + e.printStackTrace(); + } } } - if (ldone) { - if (!rdone) { - rptr = rptr + BATCH_SIZE; + + for (Future f : flist2) { + try { + final FrameApiJoin j = f.get(); + if (j.isFailed()) { + FrameApiJoin j_ = new FrameApiJoin(Config.getConfig().LOCAL_NODE_ID, this, j.getBd1(), j.getBd2()); + rtasks_.put(j_); + prcrj = false; + } else { + final BlockingQueue q = j.getResult(); + boolean cnue = true; + while (cnue) { + final Object o = q.take(); + if (o instanceof ResultSetTerm) { + cnue = false; + } else { + target.persist(o, s); + } + } + } + } catch (Exception e) { + if (e instanceof ExecutionException) { + e.getCause().printStackTrace(); + } else { + e.printStackTrace(); + } } - } else { - lptr = lptr + BATCH_SIZE; } - ldone = lptr >= ltasks.size(); - rdone = rptr >= rtasks.size(); - - if (ldone && rdone && tasks_.size() > 0) { - ldone = false; - ltasks = tasks_; - tasks_ = new ArrayList<>(); - lptr = 0; + done = ldone && rdone && prcrj; + if (rdone && !prcrj) { + rtasks_.put(new FrameApiJoin()); } + } + + ret = current.getFrame(ldone && rdone); - done = ldone && rdone; - ret = current.getFrame(ldone && rdone); - } catch (Exception e) { - e.printStackTrace(); + if (ret != null) { + logger.debug("SQL cursor " + cur.getCursorId() + " returned next frame " + ret.getFrameId()); } - logger.debug("SQL cursor "+cur.getCursorId()+" next frame returned frame "+(ret==null?"null":ret.getFrameId())); + if (!target.isPersistent()) { + target.persist(new ResultSetTerm(), s); + logger.info("non-persistent cursor terminated"); + } + } catch (Exception e) { + logger.error("exception occured during cursor processing ", e); } + return ret; } @@ -514,7 +610,7 @@ public void run() { ((ResultSetImpl)target).release(); - if (!((ResultSetImpl)target).isPersistent()) { + if (!target.isPersistent()) { target.persist(new ResultSetTerm(), s); } } catch (Exception e) { @@ -528,6 +624,10 @@ public void run() { return target; } + public boolean isFlush() { + return flush; + } + public int getId() { return id; } diff --git a/src/main/java/su/interference/sql/SQLHashMap.java b/src/main/java/su/interference/sql/SQLHashMap.java index 079b6b7..53f2c76 100644 --- a/src/main/java/su/interference/sql/SQLHashMap.java +++ b/src/main/java/su/interference/sql/SQLHashMap.java @@ -24,14 +24,10 @@ this software and associated documentation files (the "Software"), to deal in package su.interference.sql; -import su.interference.core.IndexChunk; import su.interference.exception.InternalException; import su.interference.persistent.Session; import su.interference.persistent.Table; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -46,7 +42,6 @@ public class SQLHashMap implements FrameIterator { private final SQLColumn cmap; private final SQLColumn ckey; private final FrameIterator rbi; - private final AtomicBoolean complete; private final AtomicBoolean returned; private final Table t; private final Class c; @@ -57,41 +52,17 @@ public SQLHashMap(SQLColumn cmap, SQLColumn ckey, FrameIterator rbi, Table t, Se this.cmap = cmap; this.ckey = ckey; this.rbi = rbi; - this.complete = new AtomicBoolean(false); this.returned = new AtomicBoolean(false); this.t = t; this.c = t.getTableClass(); this.s = s; } - @SuppressWarnings("unchecked") - public Comparable getKeyValue(Class c, Object o, SQLColumn sqlc, Session s) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { - Method y = c.getMethod("get"+sqlc.getColumn().getName().substring(0,1).toUpperCase()+sqlc.getColumn().getName().substring(1,sqlc.getColumn().getName().length()), null); - return (Comparable)y.invoke(o, null); - } - public FrameApi nextFrame() throws Exception { if (!returned.get()) { synchronized (this) { if (hframe == null) { - if (!complete.get()) { - while (rbi.hasNextFrame()) { - FrameApi bd = rbi.nextFrame(); - if (bd != null) { - ArrayList drs = bd.getFrameEntities(s); - for (Object o : drs) { - if (bd.getImpl() == FrameApi.IMPL_INDEX) { - final IndexChunk ib1 = (IndexChunk) o; - o = ib1.getDataChunk().getEntity(s); - } - - hmap.put(getKeyValue(c, o, cmap, s), o); - } - } - } - complete.compareAndSet(false, true); - } - hframe = new SQLHashMapFrame(hmap, cmap, ckey, t); + hframe = new SQLHashMapFrame(rbi, cmap, ckey, t, c, s); } returned.compareAndSet(false, true); return hframe; diff --git a/src/main/java/su/interference/sql/SQLHashMapFrame.java b/src/main/java/su/interference/sql/SQLHashMapFrame.java index c23cdbd..3487d1a 100644 --- a/src/main/java/su/interference/sql/SQLHashMapFrame.java +++ b/src/main/java/su/interference/sql/SQLHashMapFrame.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -25,10 +25,13 @@ this software and associated documentation files (the "Software"), to deal in package su.interference.sql; import su.interference.core.Chunk; -import su.interference.exception.InternalException; +import su.interference.core.DataChunk; +import su.interference.core.IndexChunk; import su.interference.persistent.Session; import su.interference.persistent.Table; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -44,22 +47,66 @@ public class SQLHashMapFrame implements FrameApi, Finder { private final SQLColumn cmap; private final SQLColumn ckey; private final Table t; + private final SQLIndexFrame ix; - public SQLHashMapFrame(ConcurrentHashMap hmap, SQLColumn cmap, SQLColumn ckey, Table t) { - this.hmap = hmap; + public SQLHashMapFrame(FrameIterator rbi, SQLColumn cmap, SQLColumn ckey, Table t, Class c, Session s) throws Exception { + this.hmap = new ConcurrentHashMap<>(); this.cmap = cmap; this.ckey = ckey; this.t = t; + if (rbi instanceof SQLIndex) { + if (rbi.hasNextFrame()) { + FrameApi fa = rbi.nextFrame(); + if (fa instanceof SQLIndexFrame) { + ix = (SQLIndexFrame) fa; + } else { + throw new RuntimeException("internal issue occured during sql processing"); + } + } else { + throw new RuntimeException("internal issue occured during sql processing"); + } + } else { + ix = null; + while (rbi.hasNextFrame()) { + FrameApi bd = rbi.nextFrame(); + if (bd != null) { + ArrayList drs = bd.getFrameEntities(s); + for (Object o : drs) { + hmap.put(getKeyValue(c, o, cmap, s), o); + } + } + } + } } public int getImpl() { return FrameApi.IMPL_HASH; } - public List get(Object key, Session s) { - return Arrays.asList(new Object[]{hmap.get(key)}); + public List get(Object key, Session s) throws Exception { + Object o = hmap.get(key); + if (o == null && ix != null) { + List l = ix.get(key, s); + if (l != null) { + for (Object o_ : l) { + if (o_ != null) { + Object e = ((IndexChunk) ((DataChunk) o_).getEntity()).getDataChunk().getEntity(); + hmap.put((Comparable) key, e); + return Arrays.asList(new Object[]{e}); + } + } + } + } + return o == null ? null : Arrays.asList(new Object[]{o}); } + @SuppressWarnings("unchecked") + public Comparable getKeyValue(Class c, Object o, SQLColumn sqlc, Session s) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Method y = c.getMethod("get"+sqlc.getColumn().getName().substring(0,1).toUpperCase()+sqlc.getColumn().getName().substring(1,sqlc.getColumn().getName().length()), null); + return (Comparable)y.invoke(o, null); + } + + public SQLColumn getCmap() { return cmap; } diff --git a/src/main/java/su/interference/sql/SQLIndex.java b/src/main/java/su/interference/sql/SQLIndex.java index 677eed3..6514aba 100644 --- a/src/main/java/su/interference/sql/SQLIndex.java +++ b/src/main/java/su/interference/sql/SQLIndex.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2020 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -25,7 +25,6 @@ this software and associated documentation files (the "Software"), to deal in package su.interference.sql; import su.interference.core.Instance; -import su.interference.core.ValueSet; import su.interference.exception.InternalException; import su.interference.persistent.FrameData; import su.interference.persistent.Session; @@ -33,7 +32,6 @@ this software and associated documentation files (the "Software"), to deal in import java.io.IOException; import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @@ -44,7 +42,7 @@ this software and associated documentation files (the "Software"), to deal in * @since 1.0 */ -public class SQLIndex implements FrameIterator, Finder { +public class SQLIndex implements FrameIterator { private final Table t; private final Session s; private final int join; @@ -59,6 +57,7 @@ public class SQLIndex implements FrameIterator, Finder { private final AtomicBoolean terminate; private final ValueCondition vc; private SQLIndexFrame mframe; + private SQLIndexFrame rframe; public SQLIndex(Table t, Table parent, boolean left, SQLColumn lkey, SQLColumn rkey, boolean merged, NestedCondition nc, int join, Session s) throws InternalException, IOException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { if (!t.isIndex()) throw new InternalException(); @@ -71,40 +70,43 @@ public SQLIndex(Table t, Table parent, boolean left, SQLColumn lkey, SQLColumn r this.left = left; this.unique = left?lkey.isUnique():rkey.isUnique(); this.merged = merged; - this.frames = join == SQLJoinDispatcher.MERGE ? null : left ? t.getFrames(s) : null; + this.frames = join == SQLJoinDispatcher.MERGE ? null : join == SQLJoinDispatcher.RIGHT_INDEX ? null : left == false ? null : t.getFrames(s, t.getTableClass().getSimpleName()); this.returned = new AtomicBoolean(false); this.terminate = new AtomicBoolean(false); this.vc = nc.getIndexVC(this, t); } - public List get(Object key, Session s) throws Exception { - List res = new ArrayList<>(); - if (!left&&unique) { - res.add(t.getObjectByKey(new ValueSet(key), s)); - return res; - } else if (!left) { - res.addAll(t.getObjectsByKey(new ValueSet(key), s)); - return res; - } - return null; - } - //may returns null public FrameApi nextFrame() throws Exception { - if (!left || join == SQLJoinDispatcher.MERGE) { + if (join == SQLJoinDispatcher.MERGE) { if (!returned.get()) { returned.compareAndSet(false, true); if (merged) { synchronized (this) { if (mframe == null) { - mframe = new SQLIndexFrame(t, parent, null, lkey, rkey, vc, left, unique, merged, join); + mframe = new SQLIndexFrame(t, parent, null, lkey, rkey, vc, left, unique, merged, join, s); } return mframe; } } else { - return new SQLIndexFrame(t, parent, null, lkey, rkey, vc, left, unique, merged, join); + return new SQLIndexFrame(t, parent, null, lkey, rkey, vc, left, unique, merged, join, s); } } + } else if (join == SQLJoinDispatcher.RIGHT_INDEX) { + terminate.compareAndSet(false, true); + if (rframe == null) { + rframe = new SQLIndexFrame(t, parent, null, lkey, rkey, vc, left, unique, merged, join, s); + } + return rframe; + } else if (join == SQLJoinDispatcher.RIGHT_HASH && !left) { + terminate.compareAndSet(false, true); + if (rframe == null) { + rframe = new SQLIndexFrame(t, parent, null, lkey, rkey, vc, left, unique, merged, join, s); + } + return rframe; + } else if ((join == 0 || left) && vc != null && (vc.getCondition() == Condition.C_EQUAL || vc.getCondition() == Condition.C_IN)) { + terminate.compareAndSet(false, true); + return new SQLIndexFrame(t, parent, null, lkey, rkey, vc, left, unique, merged, join, s); } else { if (hasNextFrame()) { final FrameData bd = frames.take(); @@ -112,27 +114,20 @@ public FrameApi nextFrame() throws Exception { terminate.compareAndSet(false, true); return null; } - return new SQLIndexFrame(t, parent, bd, lkey, rkey, vc, left, unique, merged, join); + return new SQLIndexFrame(t, parent, bd, lkey, rkey, vc, left, unique, merged, join, s); } } return null; } - public FrameApi getFrameByAllocId(long allocId) { + public FrameApi getFrameByAllocId(long allocId) throws Exception { final FrameData bd = Instance.getInstance().getFrameByAllocId(allocId); - return new SQLIndexFrame(t, parent, bd, lkey, rkey, vc, left, unique, merged, join); + return new SQLIndexFrame(t, parent, bd, lkey, rkey, vc, left, unique, merged, join, s); } public void resetIterator() { - if (left) { -/* optional (possibly will be need) - try { - frames = t.getFrames(s); - terminate.compareAndSet(true, false); - } catch (Exception e) { - throw new RuntimeException(); - } -*/ + if (terminate.get()) { + terminate.compareAndSet(true, false); } if (returned.get()) { returned.compareAndSet(true, false); @@ -140,7 +135,7 @@ public void resetIterator() { } public boolean hasNextFrame() { - if (!left || join == SQLJoinDispatcher.MERGE) { + if (join == SQLJoinDispatcher.MERGE) { return !returned.get(); } else { return !terminate.get(); diff --git a/src/main/java/su/interference/sql/SQLIndexFrame.java b/src/main/java/su/interference/sql/SQLIndexFrame.java index 5eaf5da..b2e7fd6 100644 --- a/src/main/java/su/interference/sql/SQLIndexFrame.java +++ b/src/main/java/su/interference/sql/SQLIndexFrame.java @@ -32,9 +32,7 @@ this software and associated documentation files (the "Software"), to deal in import su.interference.persistent.Session; import su.interference.persistent.Table; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.*; /** * @author Yuriy Glotanov @@ -50,14 +48,14 @@ public class SQLIndexFrame implements FrameApi, Finder { private final boolean left; private final boolean unique; private final boolean merged; + private final boolean vccheck; private final int join; - private ArrayList mcontent; private RetrieveQueue rqueue; - private AtomicInteger mptr = new AtomicInteger(); private final ValueCondition vc; + private final Map vcmap; - public SQLIndexFrame(Table t, Table parent, FrameData bd, SQLColumn lkey, SQLColumn rkey, ValueCondition vc, boolean left, boolean unique, boolean merged, int join) - throws InternalException { + public SQLIndexFrame(Table t, Table parent, FrameData bd, SQLColumn lkey, SQLColumn rkey, ValueCondition vc, boolean left, boolean unique, boolean merged, int join, Session s) + throws Exception { if (!t.isIndex()) { throw new InternalException(); } @@ -71,6 +69,18 @@ public SQLIndexFrame(Table t, Table parent, FrameData bd, SQLColumn lkey, SQLCol this.unique = unique; this.merged = merged; this.join = join; + this.vcmap = new HashMap<>(); + if (vc != null && (vc.getCondition() == Condition.C_EQUAL || vc.getCondition() == Condition.C_IN)) { + this.vccheck = true; + for (Object o : vc.getValues()) { + final ValueSet vs = new ValueSet(o); + for (Chunk c : t.getObjectsByKey(vs, s)) { + vcmap.put(vs, c.getEntity(s)); + } + } + } else { + vccheck = false; + } } public int getImpl() { @@ -78,15 +88,19 @@ public int getImpl() { } public List get(Object key, Session s) throws Exception { + if (vccheck) { + return Arrays.asList(vcmap.get(new ValueSet(key))); + } List res = new ArrayList<>(); - if (!left&&unique) { - res.add(t.getObjectByKey(new ValueSet(key), s)); - return res; - } else if (!left) { + if (unique) { + Object o = t.getObjectByKey(new ValueSet(key), s); + if (o != null) { + res.add(o); + } + } else { res.addAll(t.getObjectsByKey(new ValueSet(key), s)); - return res; } - return null; + return res; } public long getFrameId() { @@ -112,7 +126,7 @@ public ArrayList getFrameChunks(Session s) { public ArrayList getFrameEntities(Session s) throws Exception { //todo need refactor on SQLIndex level - returns one to many frames by value synchronized (this) { - if (vc != null && (vc.getCondition() == Condition.C_EQUAL || vc.getCondition() == Condition.C_IN)) { + if (vccheck) { ArrayList res = new ArrayList<>(); for (Object o : vc.getValues()) { for (Chunk c : t.getObjectsByKey(new ValueSet(o), s)) { @@ -121,45 +135,19 @@ public ArrayList getFrameEntities(Session s) throws Exception { } return res; } else { - if (left) { - return bd.getFrameEntities(s); - } else { - if (merged) { - //todo deprecated implementation - if (mcontent == null) { - mcontent = new ArrayList<>(); - for (Chunk c : t.getContent(s)) { - mcontent.add(c.getEntity(s)); - } - } - return mcontent; - } - } - return null; + return bd.getFrameEntities(s); } } } - public Object poll(Session s) throws Exception { + public Object poll(Session s) { if (!left || join == SQLJoinDispatcher.MERGE) { if (rqueue == null) { rqueue = s.getContentQueue(t); } return rqueue.poll(s); } else { - synchronized (this) { - if (mcontent == null) { - mcontent = new ArrayList<>(); - for (Chunk c : t.getContent(s)) { - mcontent.add(c.getEntity(s)); - } - } - } - if (mptr.get() == mcontent.size()) { - return null; - } else { - return mcontent.get(mptr.getAndIncrement()); - } + throw new RuntimeException("Wrong issue of internal join mechanism occured"); } } diff --git a/src/main/java/su/interference/sql/SQLJoinDispatcher.java b/src/main/java/su/interference/sql/SQLJoinDispatcher.java index e89a50b..4df34d4 100644 --- a/src/main/java/su/interference/sql/SQLJoinDispatcher.java +++ b/src/main/java/su/interference/sql/SQLJoinDispatcher.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2020 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -76,35 +76,21 @@ public SQLJoinDispatcher(FrameIterator lbi, FrameIterator rbi, SQLColumn c1, SQL FrameIterator rbi_ = null; //deprecated - this.joinedCC = lbi.getType()== FrameIterator.TYPE_CURSOR?c1:c2; + this.joinedCC = lbi.getType() == FrameIterator.TYPE_CURSOR?c1:c2; if (c1.isIndexOrUnique()||c2.isIndexOrUnique()) { - if (merged && ix1 != null && ix2 != null) { - if (c1.isUnique()) { - this.join = MERGE; - final Table lt = Instance.getInstance().getTableById(lbi.getObjectId()); - final Table rt = Instance.getInstance().getTableById(rbi.getObjectId()); - logger.info("use merge join for " + lt.getName() + "." + c1.getColumn().getName() + " * " + rt.getName() + "." + c2.getColumn().getName()); - lbi_ = new SQLIndex(ix1, lt, true, c1, c2, true, nc, MERGE, s); - rbi_ = new SQLIndex(ix2, rt, false, c1, c2, true, nc, MERGE, s); - this.weight = 100; - } else { - this.join = RIGHT_MERGE; - final Table lt = Instance.getInstance().getTableById(lbi.getObjectId()); - final Table rt = Instance.getInstance().getTableById(rbi.getObjectId()); - logger.info("use right merge join for " + lt.getName() + "." + c1.getColumn().getName() + " * " + rt.getName() + "." + c2.getColumn().getName()); - lbi_ = new SQLIndex(ix1, lt, true, c1, c2, true, nc, RIGHT_MERGE, s); - rbi_ = new SQLIndex(ix2, rt, false, c1, c2, true, nc, RIGHT_MERGE, s); - this.weight = 90; - } - } else if (c1.isUnique() || c2.isUnique()) { + if (c1.isUnique() || c2.isUnique()) { this.join = RIGHT_HASH; final Table lt = Instance.getInstance().getTableById(lbi.getObjectId()); final Table rt = Instance.getInstance().getTableById(rbi.getObjectId()); - final FrameIterator lbi__ = ix1 == null ? lbi : new SQLIndex(ix1, lt, false, c1, c2, true, nc, RIGHT_HASH, s); - final FrameIterator rbi__ = ix2 == null ? rbi : new SQLIndex(ix2, rt, false, c1, c2, true, nc, RIGHT_HASH, s); - final FrameIterator hbi = c1.isUnique() ? lbi__ : c2.isUnique() ? rbi__ : null; - lbi_ = hbi == lbi__ ? rbi__ : lbi__; + FrameIterator hbi = null; + if (c1.isUnique()) { + lbi_ = ix2 == null ? rbi : new SQLIndex(ix2, rt, true, c1, c2, false, nc, RIGHT_HASH, s); + hbi = ix1 == null ? lbi : new SQLIndex(ix1, lt, false, c1, c2, false, nc, RIGHT_HASH, s); + } else if (c2.isUnique()) { + lbi_ = ix1 == null ? lbi : new SQLIndex(ix1, lt, true, c1, c2, false, nc, RIGHT_HASH, s); + hbi = ix2 == null ? rbi : new SQLIndex(ix2, rt, false, c1, c2, false, nc, RIGHT_HASH, s); + } final Table lt_ = Instance.getInstance().getTableById(lbi_.getObjectId()); final Table rt_ = Instance.getInstance().getTableById(hbi.getObjectId()); SQLColumn cmap_ = hbi.getObjectId() == c1.getObjectId() ? c1 : c2; @@ -112,6 +98,22 @@ public SQLJoinDispatcher(FrameIterator lbi, FrameIterator rbi, SQLColumn c1, SQL logger.info("use right hash join for " + lt_.getName() + "." + c1.getColumn().getName() + " * " + rt_.getName() + "." + c2.getColumn().getName()); rbi_ = new SQLHashMap(cmap_, ckey_, hbi, rt_, s); this.weight = 60; + } else if (c1.isUnique() && merged && ix1 != null && ix2 != null) { + this.join = MERGE; + final Table lt = Instance.getInstance().getTableById(lbi.getObjectId()); + final Table rt = Instance.getInstance().getTableById(rbi.getObjectId()); + logger.info("use merge join for " + lt.getName() + "." + c1.getColumn().getName() + " * " + rt.getName() + "." + c2.getColumn().getName()); + lbi_ = new SQLIndex(ix1, lt, true, c1, c2, true, nc, MERGE, s); + rbi_ = new SQLIndex(ix2, rt, false, c1, c2, true, nc, MERGE, s); + this.weight = 100; + } else if (c2.isUnique() && merged && ix1 != null && ix2 != null) { + this.join = MERGE; + final Table lt = Instance.getInstance().getTableById(rbi.getObjectId()); + final Table rt = Instance.getInstance().getTableById(lbi.getObjectId()); + logger.info("use merge join for " + lt.getName() + "." + c2.getColumn().getName() + " * " + rt.getName() + "." + c1.getColumn().getName()); + lbi_ = new SQLIndex(ix2, lt, true, c2, c1, true, nc, MERGE, s); + rbi_ = new SQLIndex(ix1, rt, false, c2, c1, true, nc, MERGE, s); + this.weight = 100; } else { this.join = RIGHT_INDEX; final Table lt = Instance.getInstance().getTableById(lbi.getObjectId()); diff --git a/src/main/java/su/interference/sql/SQLTable.java b/src/main/java/su/interference/sql/SQLTable.java index 7375288..21e42fa 100644 --- a/src/main/java/su/interference/sql/SQLTable.java +++ b/src/main/java/su/interference/sql/SQLTable.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2020 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -79,7 +79,7 @@ public int getObjectId() { } public List getObjectIds() { - final List objectIds = new ArrayList(); + final List objectIds = new ArrayList<>(); objectIds.add(table.getObjectId()); return objectIds; } diff --git a/src/main/java/su/interference/sql/StreamQueue.java b/src/main/java/su/interference/sql/StreamQueue.java index a858aad..28c4db3 100644 --- a/src/main/java/su/interference/sql/StreamQueue.java +++ b/src/main/java/su/interference/sql/StreamQueue.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -102,19 +102,23 @@ public int getObjectId() { return 0; } - public boolean isIndex() throws ClassNotFoundException, MalformedURLException { + public boolean isIndex() { return false; } - public Class getTableClass() throws ClassNotFoundException, MalformedURLException { + public Class getTableClass() { return null; } - public java.lang.reflect.Field[] getFields() throws ClassNotFoundException, InternalException, MalformedURLException { + public java.lang.reflect.Field[] getFields() { return null; } public void deallocate(Session s) throws Exception { //todo } + public boolean isPersistent() { + return false; + } + } diff --git a/src/main/java/su/interference/transport/EventResult.java b/src/main/java/su/interference/transport/EventResult.java index 37139a9..5d62e2b 100644 --- a/src/main/java/su/interference/transport/EventResult.java +++ b/src/main/java/su/interference/transport/EventResult.java @@ -35,6 +35,7 @@ this software and associated documentation files (the "Software"), to deal in */ public class EventResult implements Serializable { + private final static long serialVersionUID = 3376632100784532102L; private final int result; private final Object resultObject; private final long slaveCursorid; diff --git a/src/main/java/su/interference/transport/SQLEvent.java b/src/main/java/su/interference/transport/SQLEvent.java index 5e94617..b68b5ab 100644 --- a/src/main/java/su/interference/transport/SQLEvent.java +++ b/src/main/java/su/interference/transport/SQLEvent.java @@ -82,11 +82,7 @@ public EventResult process() { FrameApi b = null; FrameApi b_ = null; if (c.getLbi() instanceof SQLIndex) { - if (d != null && d.getJoin() == SQLJoinDispatcher.RIGHT_MERGE) { - b = ((SQLIndex) c.getLbi()).getFrameByAllocId(j.getLeftAllocId()); - } else { - b = c.getLbi().nextFrame(); - } + b = ((SQLIndex) c.getLbi()).getFrameByAllocId(j.getLeftAllocId()); } if (j.getRightAllocId() == 0 && c.getRbi() != null && (rightType.equals("SQLHashMapFrame") || rightType.equals("SQLIndexFrame"))) { c.getRbi().resetIterator(); diff --git a/src/main/java/su/interference/transport/TransportCallback.java b/src/main/java/su/interference/transport/TransportCallback.java index 996fc67..ab57ce4 100644 --- a/src/main/java/su/interference/transport/TransportCallback.java +++ b/src/main/java/su/interference/transport/TransportCallback.java @@ -34,6 +34,7 @@ this software and associated documentation files (the "Software"), to deal in public class TransportCallback implements Serializable { public static final int SUCCESS = 0; public static final int FAILURE = 1; + private final static long serialVersionUID = 1010992982563811125L; private final int nodeId; private final String messageUUID; diff --git a/src/main/java/su/interference/transport/TransportChannel.java b/src/main/java/su/interference/transport/TransportChannel.java index 640d3fd..f541d0a 100644 --- a/src/main/java/su/interference/transport/TransportChannel.java +++ b/src/main/java/su/interference/transport/TransportChannel.java @@ -150,6 +150,7 @@ public void run() { e_.printStackTrace(); } logger.error("channel id = " + channelId + " stopped by connection failure"); + logger.error("failure cause: ", e); } } } catch (IOException e) { diff --git a/src/main/java/su/interference/transport/TransportMessage.java b/src/main/java/su/interference/transport/TransportMessage.java index a8f0a7f..09ac29f 100644 --- a/src/main/java/su/interference/transport/TransportMessage.java +++ b/src/main/java/su/interference/transport/TransportMessage.java @@ -48,6 +48,8 @@ public class TransportMessage implements Serializable, Delayed { private final static Logger logger = LoggerFactory.getLogger(TransportMessage.class); private final static Field[] fields = TransportMessage.class.getDeclaredFields(); private final static SerializerApi sr = new CustomSerializer(); + private final static long serialVersionUID = 8226547655108763221L; + private final int type; private final TransportEvent transportEvent; private final TransportCallback transportCallback; diff --git a/src/main/java/su/interference/transport/TransportServer.java b/src/main/java/su/interference/transport/TransportServer.java index 73930df..73e566f 100644 --- a/src/main/java/su/interference/transport/TransportServer.java +++ b/src/main/java/su/interference/transport/TransportServer.java @@ -109,10 +109,11 @@ public void run() { transportContext.onMessage((TransportMessage) transportMessage, inetAddress); } catch (EOFException eof) { running = false; - logger.warn("event server will be restarted due to " + eof.getMessage()); + logger.warn("event server will be restarted due to " + eof); } catch (Exception e) { running = false; - logger.warn("event server will be restarted due to " + e.getMessage()); + logger.warn("event server will be restarted due to " + e); + logger.error("exception occured during message process ", e); } } } diff --git a/src/main/java/su/interference/transport/TransportSyncTask.java b/src/main/java/su/interference/transport/TransportSyncTask.java index 1a0f70b..094f578 100644 --- a/src/main/java/su/interference/transport/TransportSyncTask.java +++ b/src/main/java/su/interference/transport/TransportSyncTask.java @@ -44,10 +44,10 @@ public class TransportSyncTask implements Runnable { private final static Logger logger = LoggerFactory.getLogger(TransportSyncTask.class); public final static int REMOTE_SYNC_DEFERRED_AMOUNT = 10000; - private final ArrayList frames; + private final List frames; private final Session s; - public TransportSyncTask(ArrayList frames) { + public TransportSyncTask(List frames) { this.frames = frames; this.s = Session.getDntmSession(); }