diff --git a/src/main/java/su/interference/core/Config.java b/src/main/java/su/interference/core/Config.java index 97c11ec..df82a61 100644 --- a/src/main/java/su/interference/core/Config.java +++ b/src/main/java/su/interference/core/Config.java @@ -107,18 +107,19 @@ public class Config { public final String CODEPAGE; public final String DATEFORMAT; public final int TEST_DISTRIBUTE_MODE = 1; - public final int CHECK_AVAIL_FRAME_TIMEOUT = 200; + public final int CHECK_AVAIL_FRAME_TIMEOUT = 3000; // transport public final int REMOTE_SYNC_TIMEOUT = 60000; public final int READ_BUFFER_SIZE = 33554432; public final int WRITE_BUFFER_SIZE = 33554432; // cleanup public final int TRANS_CLEANUP_TIMEOUT = 5000; + public final int CLEANUP_ENABLE = 0; public final int CLEANUP_TIMEOUT = 3000; public final int CLEANUP_PROTECTION_THR = 1000; - public final int IX_CLEANUP_PROTECTION_THR = 5000; + public final int IX_CLEANUP_PROTECTION_THR = 2000; public final int HEAP_USE_THR_DATA = 60; - public final int HEAP_USE_THR_INDX = 80; + public final int HEAP_USE_THR_INDX = 70; public final int HEAP_USE_THR_TEMP = 40; public final int HEAP_USE_THR_UNDO = 50; diff --git a/src/main/java/su/interference/core/DataChunk.java b/src/main/java/su/interference/core/DataChunk.java index a67a9d1..2d309e2 100644 --- a/src/main/java/su/interference/core/DataChunk.java +++ b/src/main/java/su/interference/core/DataChunk.java @@ -64,6 +64,7 @@ public class DataChunk implements Chunk { private Comparable id; private byte[] serializedId; private volatile Object entity; + private volatile ValueSet dcs; private Object undoentity; private Map ics = new HashMap<>(); private UndoChunk uc; @@ -73,10 +74,21 @@ public class DataChunk implements Chunk { //returns datacolumn set public ValueSet getDcs() { + if (t.isIndex() && dcs != null) { + return dcs; + } if (state == INIT_STATE) { + if (t.isIndex()) { + dcs = getDcsFromBytes(); + return dcs; + } return getDcsFromBytes(); } if (state == NORMAL_STATE) { + if (t.isIndex()) { + dcs = getDcsFromEntity(); + return dcs; + } return getDcsFromEntity(); } return null; @@ -176,6 +188,7 @@ public Table getT() { return t; } + @Deprecated public Comparable getId (Field idfield, Session s) throws InvocationTargetException, NoSuchMethodException, IllegalAccessException { if (serializedId==null) { if (entity==null) { @@ -220,30 +233,19 @@ public byte[] getSerializedId (Session s) throws InvocationTargetException, NoSu if (entity==null) { getEntity(); } - Class c = entity.getClass(); - final TransEntity ta = (TransEntity)c.getAnnotation(TransEntity.class); - final SystemEntity sa = (SystemEntity)c.getAnnotation(SystemEntity.class); - if (ta!=null) { - //for Transactional Wrapper Entity we must get superclass (original Entity class) - c = c.getSuperclass(); - } - Field[] f = c.getDeclaredFields(); - for (int i=0; i[]{Session.class}); - Object v = z.invoke(entity, new Object[]{s}); - id = (Comparable) v; - serializedId = sr.serialize(f[i].getType().getName(), v); - } + final Field idf = t.getIdField(); + if (idf != null) { + if (t.isNoTran()) { + final Method z = t.getIdmethod(); + id = (Comparable) z.invoke(entity, null); + serializedId = sr.serialize(idf.getType().getName(), id); + } else { + final Method z = t.getIdmethod_(); + id = (Comparable) z.invoke(entity, new Object[]{s}); + serializedId = sr.serialize(idf.getType().getName(), id); } } + } return serializedId; } @@ -693,8 +695,8 @@ private synchronized DataChunk insertUC (UndoChunk uc, Session s, LLT llt) throw final int p = ub.getDataFrame().insertChunk(dc, s, true, llt); if (p == 0) { final Table t = Instance.getInstance().getTableByName(UndoChunk.class.getName()); - final FrameData nb = t.createNewFrame(ub, ub.getFile(), 0, 0, false, false, false, s, llt); - s.getTransaction().setNewLB(ub, nb, false); + final FrameData nb = t.createNewFrame(ub, ubw, ub.getFile(), 0, 0, false, false, false, s, llt); + s.getTransaction().setNewLB(ub, nb); nb.getDataFrame().insertChunk(dc, s, true, llt); dc.uframe = nb; } else { diff --git a/src/main/java/su/interference/core/DataChunkId.java b/src/main/java/su/interference/core/DataChunkId.java index e385117..2327348 100644 --- a/src/main/java/su/interference/core/DataChunkId.java +++ b/src/main/java/su/interference/core/DataChunkId.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2020 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -26,10 +26,9 @@ this software and associated documentation files (the "Software"), to deal in import su.interference.persistent.Session; import su.interference.exception.InternalException; +import su.interference.persistent.Table; import su.interference.serialize.CustomSerializer; -import javax.persistence.Entity; -import javax.persistence.Id; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.Field; @@ -63,32 +62,17 @@ private byte[] getBytes(Field f, Object o) throws IllegalAccessException, Unsupp //serializer @SuppressWarnings("unchecked") - public DataChunkId (Object o, Session s) throws IOException, InvocationTargetException, NoSuchMethodException, InternalException, ClassNotFoundException, InstantiationException, IllegalAccessException { - Class c = o.getClass(); - final SystemEntity sa = (SystemEntity)c.getAnnotation(SystemEntity.class); - final TransEntity ta = (TransEntity)c.getAnnotation(TransEntity.class); - Entity ea = (Entity)c.getAnnotation(Entity.class); - if (ta!=null) { - //for Transactional Wrapper Entity we must get superclass (original Entity class) - c = c.getSuperclass(); - ea = (Entity)c.getAnnotation(Entity.class); - } - if (ea==null) { - throw new InternalException(); - } - final Field[] f = c.getDeclaredFields(); - for (int i=0; i[]{Session.class}); - id = z.invoke(o, new Object[]{s}); - idb = sr.serialize(f[i].getType().getName(), id); - } + public DataChunkId (Object o, Table t, Session s) throws IOException, InvocationTargetException, InternalException, ClassNotFoundException, InstantiationException, IllegalAccessException { + final Field idf = t.getIdField(); + if (idf != null) { + if (t.isNoTran()) { + final Method z = t.getIdmethod(); + id = z.invoke(o, null); + idb = sr.serialize(idf.getType().getName(), id); + } else { + final Method z = t.getIdmethod_(); + id = z.invoke(o, new Object[]{s}); + idb = sr.serialize(idf.getType().getName(), id); } } } diff --git a/src/main/java/su/interference/core/IndexFrame.java b/src/main/java/su/interference/core/IndexFrame.java index 9961dde..8aa9e18 100644 --- a/src/main/java/su/interference/core/IndexFrame.java +++ b/src/main/java/su/interference/core/IndexFrame.java @@ -136,11 +136,14 @@ public void cleanICEntities() { } } - public IndexFrame add (DataChunk e, Table t, Session s, LLT llt) throws Exception { + public synchronized IndexFrame add (DataChunk e, Table t, Session s, LLT llt) throws Exception { if (this.isFill(e)) { final int nfileId = t.getIndexFileId(this.getFrameData()); - final IndexFrame res = t.createNewFrame(this.getFrameData(), nfileId, this.getType(), 0, false, false, false, s, llt).getIndexFrame(); + final IndexFrame res = t.createNewFrame(this.getFrameData(), null, nfileId, this.getType(), 0, false, false, false, s, llt).getIndexFrame(); + //paranoid fix + llt.add(res); + llt.add(this); res.setParentF(this.getParentF()); res.setParentB(this.getParentB()); final ValueSet max = this.sort(); @@ -159,19 +162,18 @@ public IndexFrame add (DataChunk e, Table t, Session s, LLT llt) throws Exceptio } this.setHasMV(1); } - res.setLcF(this.getLcF()); - res.setLcB(this.getLcB()); - this.setLcF(0); - this.setLcB(0); + res.setLcId(this.getLcId()); + this.setLcId(0); } else { - final int cmv = e.getDcs().compareTo(this.getFrameData().getMv()); + //final int cmv = e.getDcs().compareTo(this.getFrameData().getMv()); + final int cmv = e.getDcs().compareTo(max); if (cmv > 0) { throw new InternalException(); } else { res.setDivided(1); this.insertChunk(e, s, false, llt); final ValueSet max2 = this.sort(); - final ArrayList nlist = new ArrayList(); + final ArrayList nlist = new ArrayList<>(); ValueSet pkey = null; boolean keyrpt = false; boolean norpt = false; @@ -222,7 +224,7 @@ private boolean isFill(DataChunk ie) { return ie.getBytesAmount() > this.getFrameFree(); } - public ValueSet sort() throws InternalException { + public synchronized ValueSet sort() throws InternalException { if (!this.data.isSorted()) { this.data.sort(); } @@ -236,7 +238,7 @@ public ValueSet sort() throws InternalException { //accepted only to node element lists //for unique indexes - public DataChunk getChildElementPtr(ValueSet value) throws InternalException { + public synchronized DataChunk getChildElementPtr(ValueSet value) throws InternalException { //todo if (!this.sorted) { this.sort(); //} @@ -348,7 +350,7 @@ public synchronized int removeObjects(ValueSet key, Object o) throws InternalExc return len; } - public ValueSet getMaxValue() throws InternalException { + public synchronized ValueSet getMaxValue() throws InternalException { this.sort(); return ((DataChunk)this.data.get(this.data.size()-1)).getDcs(); } @@ -364,59 +366,52 @@ public HashMap getAllocateMap() { return imap; } - public int getType() { + public synchronized int getType() { return this.getRes01(); } - public void setType(int type) { + public synchronized void setType(int type) { this.setRes01(type); } - public int getHasMV() { + public synchronized int getHasMV() { return this.getRes02(); } - public void setHasMV(int hasMV) { + public synchronized void setHasMV(int hasMV) { this.setRes02(hasMV); } - public int getDivided() { + public synchronized int getDivided() { return this.getRes03(); } - public void setDivided(int divided) { + public synchronized void setDivided(int divided) { this.setRes03(divided); } - public int getParentF() { + public synchronized int getParentF() { return this.getRes04(); } - public void setParentF(int parentF) { + public synchronized void setParentF(int parentF) { this.setRes04(parentF); } - public long getParentB() { + public synchronized long getParentB() { return this.getRes06(); } - public void setParentB(long parentB) { + public synchronized void setParentB(long parentB) { this.setRes06(parentB); } - public int getLcF() { - return this.getRes05(); + public synchronized long getLcId() { + return this.getRes05() + this.getRes07(); } - public void setLcF(int lcF) { - this.setRes05(lcF); - } - - public long getLcB() { - return this.getRes07(); - } - - public void setLcB(long lcB) { - this.setRes07(lcB); + public synchronized void setLcId(long lcId) { + this.setRes05((int)lcId%4096); + this.setRes07(lcId - lcId%4096); } } diff --git a/src/main/java/su/interference/core/Instance.java b/src/main/java/su/interference/core/Instance.java index 2df766e..77411a4 100644 --- a/src/main/java/su/interference/core/Instance.java +++ b/src/main/java/su/interference/core/Instance.java @@ -29,7 +29,6 @@ this software and associated documentation files (the "Software"), to deal in import su.interference.persistent.Process; import su.interference.exception.*; -import java.beans.Transient; import java.io.IOException; import java.io.File; import java.net.URL; @@ -48,8 +47,8 @@ this software and associated documentation files (the "Software"), to deal in public class Instance implements Interference { - public static final String RELEASE = "2020.2"; - public static final int SYSTEM_VERSION = 20201205; + public static final String RELEASE = "2020.3"; + public static final int SYSTEM_VERSION = 20201220; public static final String DATA_FILE = "datafile"; public static final String INDX_FILE = "indxfile"; @@ -364,7 +363,7 @@ public void startupInstance(Session s) throws Exception, NoSuchMethodException, if (ok) { logger.info("interference is starting..."); - Thread.currentThread().setName("interference-main-thread"); + Thread.currentThread().setName("interference-main-thread-"+Thread.currentThread().getId()); Storage.getStorage().restoreJournal(); Storage.getStorage().openDataFiles(); initSystemTable(); @@ -433,7 +432,7 @@ private void checkInMemoryIndexes() { private void checkOpenTransactions(Session s) { for (Transaction t : getTransactions()) { - if (t.getCid() == 0 && t.getTransType() != Transaction.TRAN_THR) { + if (t.getCid() == 0 && t.getTransType() < Transaction.TRAN_THR) { logger.info("Rollback incomplete transaction id="+t.getTransId()); t.retrieveTframes(); if (t.isLocal()) { diff --git a/src/main/java/su/interference/core/LLT.java b/src/main/java/su/interference/core/LLT.java index e525f8d..ab9727f 100644 --- a/src/main/java/su/interference/core/LLT.java +++ b/src/main/java/su/interference/core/LLT.java @@ -64,7 +64,7 @@ public static long getSyncId() { return sync.get(); } - public static LLT getLLT() throws InterruptedException { + public static LLT getLLT() { final long id_ = Thread.currentThread().getId(); if (pool.get(id_) != null) { if (debug) { @@ -82,7 +82,7 @@ public static LLT getLLT() throws InterruptedException { return llt; } - public static LLT getLLTAndLock() throws InterruptedException { + public static LLT getLLTAndLock() { final long id_ = Thread.currentThread().getId(); if (pool.get(id_) != null) { logger.error("an unexpected attempt to get llt with id = "+id_+" which already exists"); diff --git a/src/main/java/su/interference/core/SyncFrame.java b/src/main/java/su/interference/core/SyncFrame.java index 055ad9f..f4bfdb6 100644 --- a/src/main/java/su/interference/core/SyncFrame.java +++ b/src/main/java/su/interference/core/SyncFrame.java @@ -72,18 +72,16 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb, boolean proc) throws Exce final Table t = Instance.getInstance().getTableById(frame.getObjectId()); final FrameData bd = Instance.getInstance().getFrameById(frame.getPtr()); allowR = frame.isLocal() ? !t.isNoTran() || t.getName().equals("su.interference.persistent.UndoChunk") : false; - this.proc = proc; + this.proc = bd == null ? false : proc; if (bd == null && allowR) { final FreeFrame fframe = Instance.getInstance().getFreeFrameById(frame.getPtr()); if (fframe == null) { - logger.error(frame.getClass().getSimpleName()+" does not match any system objects"); - throw new InternalException(); + logger.warn(frame.getClass().getSimpleName()+" does not match any system objects"); } else { fframe.setPassed(1); fb = fframe; } - //throw new MissingSyncFrameException(); } className = bd == null ? null : t.getName(); @@ -130,7 +128,7 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb, boolean proc) throws Exce prevId = 0; nextId = 0; final long parentId_ = ib.getParentF()+ib.getParentB(); - final long lcId_ = ib.getLcF()+ib.getLcB(); + final long lcId_ = ib.getLcId(); parentId = parentId_==0?0:Instance.getInstance().getFrameById(parentId_).getAllocId(); lcId = lcId_==0?0:Instance.getInstance().getFrameById(lcId_).getAllocId(); this.frameType = frame.getType(); diff --git a/src/main/java/su/interference/core/SyncQueue.java b/src/main/java/su/interference/core/SyncQueue.java index 707c759..000ac0b 100644 --- a/src/main/java/su/interference/core/SyncQueue.java +++ b/src/main/java/su/interference/core/SyncQueue.java @@ -106,9 +106,6 @@ private synchronized boolean syncFramesFromQueue() throws Exception { } } - //todo async process must depends from stop() method - pool2.submit(new TransportSyncTask(frames)); - long t1 = System.currentTimeMillis(); pool.invokeAll(Arrays.asList(tasklist)); long t2 = System.currentTimeMillis(); @@ -118,6 +115,8 @@ private synchronized boolean syncFramesFromQueue() throws Exception { for (FreeFrame fb : fframes) { s.persist(fb); } + //todo async process must depends from stop() method + pool2.submit(new TransportSyncTask(frames)); running = false; return true; @@ -134,7 +133,7 @@ public void commit() throws Exception { } public void run () { - Thread.currentThread().setName("interference-sync-thread"); + Thread.currentThread().setName("interference-sync-thread-"+Thread.currentThread().getId()); while (f) { latch = new CountDownLatch(1); try { diff --git a/src/main/java/su/interference/core/SyncTask.java b/src/main/java/su/interference/core/SyncTask.java index f1310a6..a0c5fda 100644 --- a/src/main/java/su/interference/core/SyncTask.java +++ b/src/main/java/su/interference/core/SyncTask.java @@ -54,7 +54,7 @@ public DataFile getDataFile() { } public Integer call() { - Thread.currentThread().setName("interference-sync-task-thread"); + Thread.currentThread().setName("interference-sync-task-thread-"+Thread.currentThread().getId()); Metrics.get("syncFrames").start(); try { diff --git a/src/main/java/su/interference/core/SystemCleanUp.java b/src/main/java/su/interference/core/SystemCleanUp.java index adf885f..7ccb84b 100644 --- a/src/main/java/su/interference/core/SystemCleanUp.java +++ b/src/main/java/su/interference/core/SystemCleanUp.java @@ -45,11 +45,15 @@ public class SystemCleanUp implements Runnable, ManagedProcess { public static final int INDEX_RETRIEVED_PRIORITY = 9; public void run () { - Thread.currentThread().setName("interference-cleanup-thread"); + Thread.currentThread().setName("interference-cleanup-thread-"+Thread.currentThread().getId()); while (f) { latch = new CountDownLatch(1); try { - cleanUpFrames(); + if (Config.getConfig().CLEANUP_ENABLE == 1) { + cleanUpFrames(); + } else { + logger.warn("system cleanup currently disabled"); + } } catch(Exception e) { e.printStackTrace(); } @@ -70,7 +74,7 @@ public void stop() throws InterruptedException{ } } - private void cleanUpFrames() { + private void cleanUpFrames() throws Exception { Metrics.get("systemCleanUp").start(); int i = 0; int d = 0; @@ -96,7 +100,7 @@ private void cleanUpFrames() { } if (f.getDataFile().isIndex() && cleanupIndxEnabled()) { f.decreasePriority(); - if (f.getPriority() == 0 && frameAmount > Config.getConfig().IX_CLEANUP_PROTECTION_THR) { + if (f.getFrameType() == IndexFrame.INDEX_FRAME_LEAF && f.getPriority() == 0 && frameAmount > Config.getConfig().IX_CLEANUP_PROTECTION_THR) { if (f.clearFrame()) { x++; } diff --git a/src/main/java/su/interference/core/SystemInit.java b/src/main/java/su/interference/core/SystemInit.java index 7cd898f..1209380 100644 --- a/src/main/java/su/interference/core/SystemInit.java +++ b/src/main/java/su/interference/core/SystemInit.java @@ -238,6 +238,7 @@ public static Table createInitialFrames(boolean initStorage, int nodeType, Sessi ret[framesCntr].insertChunk(new DataChunk(new Process(1, "hbeat","su.interference.transport.HeartBeatProcess"), s), s, true, null); ret[framesCntr].insertChunk(new DataChunk(new Process(2, "lsync","su.interference.core.SyncQueue"), s), s, true, null); ret[framesCntr].insertChunk(new DataChunk(new Process(3, "clean","su.interference.core.SystemCleanUp"), s), s, true, null); + ret[framesCntr].insertChunk(new DataChunk(new Process(4, "trcln","su.interference.core.TransCleanUp"), s), s, true, null); //ret[framesCntr].insertChunk(new DataChunk(new Process(2, "rsync","su.interference.remote.RemoteSync"), s), s, true, null); } if (tables[i].equals("su.interference.persistent.Node")) { diff --git a/src/main/java/su/interference/core/TransCleanUp.java b/src/main/java/su/interference/core/TransCleanUp.java new file mode 100644 index 0000000..317b074 --- /dev/null +++ b/src/main/java/su/interference/core/TransCleanUp.java @@ -0,0 +1,115 @@ +/** + The MIT License (MIT) + + Copyright (c) 2010-2020 head systems, ltd + + Permission is hereby granted, free of charge, to any person obtaining a copy of + this software and associated documentation files (the "Software"), to deal in + the Software without restriction, including without limitation the rights to + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + the Software, and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + */ + +package su.interference.core; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import su.interference.persistent.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * @author Yuriy Glotanov + * @since 1.0 + */ + +public class TransCleanUp implements Runnable, ManagedProcess { + private Session session = Session.getSession(); + private volatile boolean f = true; + CountDownLatch latch; + private final static Logger logger = LoggerFactory.getLogger(TransCleanUp.class); + + public void run () { + Thread.currentThread().setName("interference-transactions-cleanup-thread-"+Thread.currentThread().getId()); + while (f) { + latch = new CountDownLatch(1); + try { + cleanUp(); + } catch(Exception e) { + e.printStackTrace(); + } + + try { + Thread.sleep(Config.getConfig().TRANS_CLEANUP_TIMEOUT); + } catch (InterruptedException e) { + e.printStackTrace(); + } + latch.countDown(); + } + } + + public void stop() throws InterruptedException{ + f = false; + if (latch != null) { + latch.await(); + } + } + + private void cleanUp() { + try { + for (Transaction t : Instance.getInstance().getTransactions()) { + ArrayList fptr = new ArrayList<>(); + if (t.getTransType() == Transaction.TRAN_THR && t.getCid() > 0) { + int i = 0; + for (TransFrame tf : Instance.getInstance().getTransFramesByTransId(t.getTransId())) { + if (t.isLocal()) { + boolean hasfb = false; + if (tf.getUframeId() > 0) { + for (Long f : fptr) { + if (f == tf.getUframeId()) { + hasfb = true; + break; + } + } + final List rls = Instance.getInstance().getRetrieveLocksByObjectId(tf.getObjectId()); + if (rls.size() == 0) { + //deallocate undo frame + final FrameData ub = Instance.getInstance().getFrameById(tf.getUframeId()); + //store frame params as free + if (!hasfb) { + final FreeFrame fb = new FreeFrame(0, tf.getUframeId(), ub.getSize()); + session.persist(fb); //insert + session.delete(ub); + fptr.add(tf.getUframeId()); + } + } + } + } + session.delete(tf); + i++; + } + t.setTransType(Transaction.TRAN_LEGACY); + session.persist(t); + logger.info(i+" transaction frames removed for transaction id = "+t.getTransId()); + } + } + } catch (Exception e) { + logger.error("transaction cleanup process", e); + } + } + +} diff --git a/src/main/java/su/interference/core/WaitFrame.java b/src/main/java/su/interference/core/WaitFrame.java index 6898c31..9155d21 100644 --- a/src/main/java/su/interference/core/WaitFrame.java +++ b/src/main/java/su/interference/core/WaitFrame.java @@ -47,29 +47,21 @@ public WaitFrame(FrameData bd) { this.busy = new AtomicLong(0); } - public synchronized WaitFrame acquire(boolean force) { + public synchronized WaitFrame acquire() { if (this.bd == null) { return null; } - if (force) { - this.busy.set(Thread.currentThread().getId()); - return this; - } if (this.busy.compareAndSet(0, Thread.currentThread().getId())) { return this; } return null; } - public synchronized WaitFrame acquire(final int fileId, boolean force) { + public synchronized WaitFrame acquire(final int fileId) { if (this.bd == null) { return null; } if (this.bd.getFile() == fileId) { - if (force) { - this.busy.set(Thread.currentThread().getId()); - return this; - } if (this.busy.compareAndSet(0, Thread.currentThread().getId()) || this.busy.compareAndSet(Thread.currentThread().getId(), Thread.currentThread().getId())) { return this; } @@ -77,6 +69,10 @@ public synchronized WaitFrame acquire(final int fileId, boolean force) { return null; } + public synchronized void set(FrameData bd) { + this.bd = bd; + } + public synchronized boolean trySetBd(FrameData oldbd, FrameData newbd, int frameType) { if (oldbd==null) { if (this.bd == null) { @@ -90,20 +86,7 @@ public synchronized boolean trySetBd(FrameData oldbd, FrameData newbd, int frame return false; } - public synchronized boolean trySetBdAndAcquire(FrameData bd) { - if (this.busy.compareAndSet(0, Thread.currentThread().getId())) { - if (this.bd == null) { - this.bd = bd; - return true; - } else { - this.busy.compareAndSet(Thread.currentThread().getId(), 0); - return false; - } - } - return false; - } - - public FrameData getBd() { + public synchronized FrameData getBd() { return bd; } diff --git a/src/main/java/su/interference/persistent/DataFile.java b/src/main/java/su/interference/persistent/DataFile.java index 0091a4f..54e0ce5 100644 --- a/src/main/java/su/interference/persistent/DataFile.java +++ b/src/main/java/su/interference/persistent/DataFile.java @@ -161,72 +161,83 @@ public DataFile(int fileId, int type, String fileName) { //but, in case of allocate undo space this may looks as: //lock datafile - lock table - try lock datafile //todo deprecated started param - public synchronized FrameData createNewFrame(FrameData frame, int frameType, long allocId, boolean started, boolean external, Table t, Session s, LLT llt) throws Exception { + public FrameData createNewFrame(FrameData frame, WaitFrame wb, int frameType, long allocId, boolean started, boolean external, Table t, Session s, LLT llt) throws Exception { //deadlock bug fix //instead this.allocateFrame we lock Table first final FrameData bd = t.allocateFrame(this, t, s, llt); - final boolean setcurrenable = external?false:t.getName().equals("su.interference.persistent.UndoChunk")?false:true; - //allocated for rframe - if (allocId>0) { bd.setAllocId(allocId); } - - //todo deprecated - if (started) { bd.setStarted(1); } - - final Frame db = frameType == 0 ? new DataFrame(bd, t) : new IndexFrame(bd, frameType, t); - db.setObjectId(t.getObjectId()); - bd.setFrame(db); - if (setcurrenable) bd.markAsCurrent(); - int prevFile = 0; - long prevPtr = 0; - - if (frame != null && frameType == 0) { - frame.clearCurrent(); - prevFile = frame.getFile(); - prevPtr = frame.getPtr(); - frame.setNextFile(bd.getFile()); - frame.setNextFrame(bd.getPtr()); - frame.getDataFrame().setNextFile(bd.getFile()); - frame.getDataFrame().setNextFrame(bd.getPtr()); - s.persist(frame, llt); //update - } + bd.setFrameType(frameType); + synchronized (this) { + final boolean setcurrenable = external ? false : t.getName().equals("su.interference.persistent.UndoChunk") ? false : true; + //allocated for rframe + if (allocId > 0) { + bd.setAllocId(allocId); + } - bd.setPrevFile(prevFile); - bd.setPrevFrame(prevPtr); + //todo deprecated + if (started) { + bd.setStarted(1); + } - if (frameType==0) { - ((DataFrame)db).setPrevFile(prevFile); - ((DataFrame)db).setPrevFrame(prevPtr); - } + final Frame db = frameType == 0 ? new DataFrame(bd, t) : new IndexFrame(bd, frameType, t); + db.setObjectId(t.getObjectId()); + bd.setFrame(db); + if (setcurrenable) bd.markAsCurrent(); + int prevFile = 0; + long prevPtr = 0; + + if (frame != null && frameType == 0) { + frame.clearCurrent(); + prevFile = frame.getFile(); + prevPtr = frame.getPtr(); + frame.setNextFile(bd.getFile()); + frame.setNextFrame(bd.getPtr()); + frame.getDataFrame().setNextFile(bd.getFile()); + frame.getDataFrame().setNextFrame(bd.getPtr()); + s.persist(frame, llt); //update + } - if (t.getFileStart()==0&&t.getFrameStart()==0) { - t.setFileStart(bd.getFile()); - t.setFrameStart(bd.getPtr()); - } + bd.setPrevFile(prevFile); + bd.setPrevFrame(prevPtr); + + if (frameType == 0) { + ((DataFrame) db).setPrevFile(prevFile); + ((DataFrame) db).setPrevFrame(prevPtr); + } - t.setFileLast(bd.getFile()); - t.setFrameLast(bd.getPtr()); - t.incFrameAmount(); + if (t.getFileStart() == 0 && t.getFrameStart() == 0) { + t.setFileStart(bd.getFile()); + t.setFrameStart(bd.getPtr()); + } - if (!external) { - if (llt != null) { - llt.add(db); - if (frame != null) { - llt.add(frame.getFrame()); + t.setFileLast(bd.getFile()); + t.setFrameLast(bd.getPtr()); + t.incFrameAmount(); + + if (!external) { + if (llt != null) { + llt.add(db); + if (frame != null) { + llt.add(frame.getFrame()); + } } } - } - s.persist(t, llt); //update + s.persist(t, llt); //update - if (t.getName().equals("su.interference.persistent.FrameData")) { - DataChunk dc = new DataChunk(bd, s); - int len = dc.getBytesAmount(); - t.usedSpace(bd, len, false, s, llt); - //replace chunk after usedSpace - dc = new DataChunk(bd, s); - db.insertChunk(dc, s, true, llt); - t.addIndexValue(dc); - } else { + if (t.getName().equals("su.interference.persistent.FrameData")) { + DataChunk dc = new DataChunk(bd, s); + int len = dc.getBytesAmount(); + t.usedSpace(bd, len, false, s, llt); + //replace chunk after usedSpace + dc = new DataChunk(bd, s); + db.insertChunk(dc, s, true, llt); + t.addIndexValue(dc); + } + if (wb != null) { + wb.set(bd); + } + } + if (!t.getName().equals("su.interference.persistent.FrameData")) { //syncframe event should not persist new frame if (!external) { //fix deadlock by reorder access @@ -239,7 +250,6 @@ public synchronized FrameData createNewFrame(FrameData frame, int frameType, lon } } } - return bd; } @@ -415,7 +425,6 @@ public synchronized FrameData allocateFrame(Table t, Session s, LLT llt) throws final FrameData bd = new FrameData(this.getFileId(), ptr, size, t); // t should be updated mandatory during createFrame bd.setFrameOrder(t.getFrameOrder(s, llt)); - bd.setDistribution(t.isDistributed() ? 0 : Config.getConfig().LOCAL_NODE_ID); Metrics.get("allocateFrame").stop(); return bd; } diff --git a/src/main/java/su/interference/persistent/FrameData.java b/src/main/java/su/interference/persistent/FrameData.java index d23ce90..051e0dd 100644 --- a/src/main/java/su/interference/persistent/FrameData.java +++ b/src/main/java/su/interference/persistent/FrameData.java @@ -28,7 +28,6 @@ this software and associated documentation files (the "Software"), to deal in import org.slf4j.LoggerFactory; import su.interference.core.*; import su.interference.mgmt.MgmtColumn; -import su.interference.exception.InternalException; import su.interference.sql.FrameApi; import javax.persistence.Entity; @@ -90,7 +89,7 @@ public class FrameData implements Serializable, Comparable, FrameApi, FilePartit @MgmtColumn(width=10, show=true, form=false, edit=false) private volatile long allocId; //virtual Id field @Column - private int distribution; + private int frameType; @Column private AtomicInteger current; @Column @@ -102,16 +101,6 @@ public class FrameData implements Serializable, Comparable, FrameApi, FilePartit @Transient private long frameId; //virtual Id field @Transient - @MgmtColumn(width=10, show=true, form=false, edit=false) - private int frameUsed; - @Transient - @MgmtColumn(width=10, show=true, form=false, edit=false) - private int cUsed; - @Transient - private int undoId; //for UndoChunk: objectId - @Transient - private long transId; //for UndoChunk: transId - @Transient private final Map> tcounter = new ConcurrentHashMap<>(); @Transient private AtomicInteger priority = new AtomicInteger(2); @@ -174,7 +163,7 @@ public synchronized DataFrame getDataFrame() throws Exception { return (DataFrame) frame; } - public IndexFrame getIndexFrame() throws Exception { + public synchronized IndexFrame getIndexFrame() throws Exception { if (frame == null) { this.priority.set(SystemCleanUp.INDEX_RETRIEVED_PRIORITY); List uframes = new ArrayList<>(); @@ -222,6 +211,14 @@ public Frame getFrame() throws Exception { return frame; } + public int getFrameType() { + return frameType; + } + + public void setFrameType(int frameType) { + this.frameType = frameType; + } + public boolean isIndex() { return getDataObject().isIndex(); } @@ -269,16 +266,20 @@ public int getFrameUsed() { //real current non-tran amount of chunk bytes public int getCUsed() { - if (frame ==null) { + if (frame == null) { return -1; } return frame.getBytesAmount(); } - public int getFrameFree() throws InternalException { + public int getFrameFree() { return size - Frame.FRAME_HEADER_SIZE - getFrameUsed(); } + public int getFrameFreeNoTran() { + return size - Frame.FRAME_HEADER_SIZE - getUsed(); + } + public FrameData() { } @@ -439,7 +440,7 @@ public void setNextFrame(long nextFrame) { } } - public boolean clearFrame() { + public synchronized boolean clearFrame() { if (this.frame != null) { this.frame.cleanUpIcs(); this.frame = null; @@ -527,22 +528,10 @@ public void setSynced(boolean synced) { this.synced = synced; } - public int getDistribution() { - return distribution; - } - - public void setDistribution(int distribution) { - this.distribution = distribution; - } - public AtomicInteger getCurrent() { return current; } - public void setCurrent(AtomicInteger current) { - this.current = current; - } - public int getStarted() { return started; } @@ -559,11 +548,11 @@ public void setFrameOrder(long frameOrder) { this.frameOrder = frameOrder; } - public ValueSet getMv() { + public synchronized ValueSet getMv() { return mv; } - public void setMv(ValueSet mv) { + public synchronized void setMv(ValueSet mv) { this.mv = mv; } } diff --git a/src/main/java/su/interference/persistent/Session.java b/src/main/java/su/interference/persistent/Session.java index 68fcfd4..6426051 100644 --- a/src/main/java/su/interference/persistent/Session.java +++ b/src/main/java/su/interference/persistent/Session.java @@ -226,7 +226,7 @@ public Table registerTable (String n, Session s, List cols, java.lang final LLT llt = LLT.getLLT(); for (DataFile df : datafs) { - FrameData nb = w.createNewFrame(null, df.getFileId(), pt==null&&!ixflag?0:1, 0, false, setlbs, false, s, llt); + FrameData nb = w.createNewFrame(null, null, df.getFileId(), pt==null&&!ixflag?0:1, 0, false, setlbs, false, s, llt); } llt.commit(); diff --git a/src/main/java/su/interference/persistent/Table.java b/src/main/java/su/interference/persistent/Table.java index 5d44071..0b52194 100644 --- a/src/main/java/su/interference/persistent/Table.java +++ b/src/main/java/su/interference/persistent/Table.java @@ -130,6 +130,8 @@ public class Table implements ResultSet { @Transient private final java.lang.reflect.Method idmethod; @Transient + private final java.lang.reflect.Method idmethod_; + @Transient private final String idfieldtype; @Transient private final String idfieldgetter; @@ -274,7 +276,7 @@ public LinkedBlockingQueue getFrames(Session s) { Runnable r = new Runnable() { @Override public void run() { - Thread.currentThread().setName("interference-retrieve-frames-thread"); + Thread.currentThread().setName("interference-retrieve-frames-thread-"+Thread.currentThread().getId()); try { for (Object o : ixl.getObjectsByKey(id)) { q.put((FrameData) ((DataChunk) o).getEntity()); @@ -297,7 +299,7 @@ public LinkedBlockingQueue getFrames() { Runnable r = new Runnable() { @Override public void run() { - Thread.currentThread().setName("interference-retrieve-frames-thread"); + Thread.currentThread().setName("interference-retrieve-frames-thread-"+Thread.currentThread().getId()); try { for (Object o : ixl.getObjectsByKey(id)) { q.put((FrameData) ((DataChunk) o).getEntity()); @@ -536,6 +538,10 @@ public Method getIdmethod() { return idmethod; } + public Method getIdmethod_() { + return idmethod_; + } + public String getIdFieldType() { return this.idfieldtype; } @@ -607,6 +613,7 @@ public Table(String name) throws ClassNotFoundException, NoSuchMethodException, this.idfieldtype = getTableIdField() == null ? null : getTableIdField().getType().getName(); this.idfieldgetter = getTableIdField() == null ? null : ("get" + this.idfield.getName().substring(0, 1).toUpperCase() + this.idfield.getName().substring(1, this.idfield.getName().length())); this.idmethod = getTableIdField() == null ? null : ca == null ? null : getTableClass().getMethod(idfieldgetter, null); + this.idmethod_ = getTableIdField() == null ? null : ca != null ? null : getTableClass().getMethod(idfieldgetter, new Class[]{Session.class}); this.fields = getTableFields(); this.fieldtypes = getTableFieldTypes(); this.generatedfield = getGeneratedField(); @@ -634,6 +641,7 @@ public Table(String name, Class pclass) throws ClassNotFoundException, NoSuchMet this.idfieldtype = getTableIdField() == null ? null : getTableIdField().getType().getName(); this.idfieldgetter = getTableIdField() == null ? null : ("get" + this.idfield.getName().substring(0, 1).toUpperCase() + this.idfield.getName().substring(1, this.idfield.getName().length())); this.idmethod = getTableIdField() == null ? null : ca == null ? null : getTableClass().getMethod(idfieldgetter, null); + this.idmethod_ = getTableIdField() == null ? null : ca != null ? null : getTableClass().getMethod(idfieldgetter, new Class[]{Session.class}); this.fields = getTableFields(); this.fieldtypes = getTableFieldTypes(); this.generatedfield = getGeneratedField(); @@ -662,6 +670,7 @@ public Table(String name, String name_) throws ClassNotFoundException, NoSuchMet this.idfieldtype = getTableIdField() == null ? null : getTableIdField().getType().getName(); this.idfieldgetter = getTableIdField() == null ? null : ("get" + this.idfield.getName().substring(0, 1).toUpperCase() + this.idfield.getName().substring(1, this.idfield.getName().length())); this.idmethod = getTableIdField() == null ? null : ca == null ? null : getTableClass().getMethod(idfieldgetter, null); + this.idmethod_ = getTableIdField() == null ? null : ca != null ? null : getTableClass().getMethod(idfieldgetter, new Class[]{Session.class}); this.fields = getTableFields(); this.fieldtypes = getTableFieldTypes(); this.generatedfield = getGeneratedField(); @@ -688,6 +697,7 @@ public Table(boolean v) throws ClassNotFoundException, InstantiationException, I this.idfieldtype = getTableIdField() == null ? null : getTableIdField().getType().getName(); this.idfieldgetter = getTableIdField() == null ? null : ("get" + this.idfield.getName().substring(0, 1).toUpperCase() + this.idfield.getName().substring(1, this.idfield.getName().length())); this.idmethod = getTableIdField() == null ? null : ca == null ? null : getTableClass().getMethod(idfieldgetter, null); + this.idmethod_ = getTableIdField() == null ? null : ca != null ? null : getTableClass().getMethod(idfieldgetter, new Class[]{Session.class}); this.fields = getTableFields(); this.fieldtypes = getTableFieldTypes(); this.generatedfield = getGeneratedField(); @@ -710,6 +720,7 @@ public Table(int id, String name) throws MalformedURLException, ClassNotFoundExc this.idfieldtype = getTableIdField() == null ? null : getTableIdField().getType().getName(); this.idfieldgetter = getTableIdField() == null ? null : ("get" + this.idfield.getName().substring(0, 1).toUpperCase() + this.idfield.getName().substring(1, this.idfield.getName().length())); this.idmethod = getTableIdField() == null ? null : ca == null ? null : getTableClass().getMethod(idfieldgetter, null); + this.idmethod_ = getTableIdField() == null ? null : ca != null ? null : getTableClass().getMethod(idfieldgetter, new Class[]{Session.class}); this.generatedfield = getGeneratedField(); this.fields = getTableFields(); this.fieldtypes = getTableFieldTypes(); @@ -745,6 +756,7 @@ public Table (DataChunk chunk, IndexList ixl) throws IllegalAccessException, Cla this.idfieldtype = getTableIdField() == null ? null : getTableIdField().getType().getName(); this.idfieldgetter = getTableIdField() == null ? null : ("get" + this.idfield.getName().substring(0, 1).toUpperCase() + this.idfield.getName().substring(1, this.idfield.getName().length())); this.idmethod = getTableIdField() == null ? null : sa == null ? null : getTableClass().getMethod(idfieldgetter, null); + this.idmethod_ = getTableIdField() == null ? null : sa != null ? null : getTableClass().getMethod(idfieldgetter, new Class[]{Session.class}); this.fields = getTableFields(); this.fieldtypes = getTableFieldTypes(); this.generatedfield = getGeneratedField(); @@ -947,54 +959,45 @@ public void printIndexInfo() { } private void ident(final Object o, final Session s, final LLT extllt) throws Exception { - final Class c_ = o.getClass(); - final TransEntity ta = (TransEntity)c_.getAnnotation(TransEntity.class); - //for Transactional Wrapper Entity we must get superclass (original Entity class) - final Class c = ta != null ? o.getClass().getSuperclass() : o.getClass(); - final Entity ea = (Entity)c.getAnnotation(Entity.class); - if (ea == null) { - throw new InternalException(); - } - final java.lang.reflect.Field[] f = c.getDeclaredFields(); - for (int i=0; i 0 - if (f[i].getType().getName().equals("int")) { - final int exists = (int) f[i].get(o); - if (exists == 0) { - f[i].setInt(o, (int) (this.getIdValue(s, llt) * Storage.MAX_NODES) + Config.getConfig().LOCAL_NODE_ID); - } + final java.lang.reflect.Field idf = this.getIdField(); + if (idf == null) { + return; + } + final Transient tr = idf.getAnnotation(Transient.class); + final GeneratedValue ga = idf.getAnnotation(GeneratedValue.class); + final DistributedId ds = idf.getAnnotation(DistributedId.class); + if (tr == null) { + if (ga != null) { + int m = idf.getModifiers(); + if (Modifier.isPrivate(m)) { + idf.setAccessible(true); + } + final LLT llt = extllt == null ? LLT.getLLT() : extllt; + try { + if (ds == null) { + if (idf.getType().getName().equals("int")) { + idf.setInt(o, (int) this.getIdValue(s, llt)); + } + if (idf.getType().getName().equals("long")) { + idf.setLong(o, this.getIdValue(s, llt)); + } + } else { + //for distributed ids, id value don't replace exists > 0 + if (idf.getType().getName().equals("int")) { + final int exists = (int) idf.get(o); + if (exists == 0) { + idf.setInt(o, (int) (this.getIdValue(s, llt) * Storage.MAX_NODES) + Config.getConfig().LOCAL_NODE_ID); } - if (f[i].getType().getName().equals("long")) { - final long exists = (long) f[i].get(o); - if (exists == 0) { - f[i].setLong(o, (this.getIdValue(s, llt) * Storage.MAX_NODES) + Config.getConfig().LOCAL_NODE_ID); - } + } + if (idf.getType().getName().equals("long")) { + final long exists = (long) idf.get(o); + if (exists == 0) { + idf.setLong(o, (this.getIdValue(s, llt) * Storage.MAX_NODES) + Config.getConfig().LOCAL_NODE_ID); } } - } finally { - if (extllt == null) { llt.commit(); } } - break; + } finally { + if (extllt == null) { llt.commit(); } } } } @@ -1050,7 +1053,7 @@ private WaitFrame getAvailableFrame(final Object o, final boolean fpart) throws for (int i = 0; i < this.lbs.length; i++) { final int i_ = (a + i) % this.lbs.length; final WaitFrame wb = this.lbs[i_]; - final WaitFrame bd = fpart ? wb.acquire(getTargetFileId(((FilePartitioned) o).getFile()), false) : wb.acquire(false); + final WaitFrame bd = fpart ? wb.acquire(getTargetFileId(((FilePartitioned) o).getFile())) : wb.acquire(); if (bd != null) { avframeStart.getAndIncrement(); Metrics.get("getAvailableFrame").stop(); @@ -1063,17 +1066,6 @@ private WaitFrame getAvailableFrame(final Object o, final boolean fpart) throws } logger.warn("avframestart: "+avframeStart.get()); logger.warn("timeout occured during getavailableframe method: " + Config.getConfig().CHECK_AVAIL_FRAME_TIMEOUT); - for (int i = 0; i < this.lbs.length; i++) { - final int i_ = (a + i) % this.lbs.length; - final WaitFrame wb = this.lbs[i_]; - final WaitFrame bd = fpart ? wb.acquire(getTargetFileId(((FilePartitioned) o).getFile()), true) : wb.acquire(true); - if (bd != null) { - logger.warn("forced acquire: "+bd.getBd().getFrameId()); - avframeStart.getAndIncrement(); - Metrics.get("getAvailableFrame").stop(); - return bd; - } - } break; } } @@ -1118,26 +1110,9 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) fpart = true; } } - } else { - if (s.getTransaction() == null || !s.getTransaction().started || s.getTransaction().getMTran() == 0) { - s.startStatement(); - } - if (s.getTransaction() == null || !s.getTransaction().started || s.getTransaction().getMTran() == 0) { - throw new InternalException(); - } - final EntityContainer to = (EntityContainer)o; - if (to.getTran() != null && to.getTran().getCid() == 0) { - if (to.getTran().getTransId() != s.getTransaction().getTransId()) { - logger.error("unable to persist an object that has not been changed by current transaction"); - return null; - } - } - } - final LLT llt = extllt==null?LLT.getLLT():extllt; - try { if (this.isIndex()) { - this.add(new RowId(0, 0, 0), o, s, llt); + this.add(new RowId(0, 0, 0), o, s, extllt); return null; } @@ -1147,140 +1122,192 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) if (dc == null) { Metrics.get("persistInsertChunk").start(); - if (this.isNoTran()) { - this.ident(o, s, llt); //ident system entities during persist - } final DataChunk nc = new DataChunk(o, s, this); final int len = nc.getBytesAmount(); final WaitFrame bdw = getAvailableFrame(o, fpart); final FrameData bd = bdw.getBd(); - final int diff = len - bd.getFrameFree(); - if (isNoTran()) { + final LLT llt = extllt==null?LLT.getLLT():extllt; + try { + this.ident(o, s, llt); //ident system entities during persist final int p = bd.insertChunk(nc, s, true, llt); if (p == 0) { - final FrameData nb = this.createNewFrame(bd, bd.getFile(), 0, 0, false, true, false, s, llt); + final FrameData nb = this.createNewFrame(bd, bdw, bd.getFile(), 0, 0, false, false, false, s, llt); nb.getDataFrame().insertChunk(nc, s, true, llt); usedSpace(nb, nb.getUsed() + len, true, s, llt); } else { usedSpace(bd, bd.getUsed() + len, true, s, llt); } - } else { - if (diff > 0) { - final FrameData nb = this.createNewFrame(bd, bd.getFile(), 0, 0, false, true, false, s, llt); - nb.getDataFrame().insertChunk(nc, s, true, llt); - s.getTransaction().storeFrame(nb, len, s, llt); - } else { - final int p = bd.insertChunk(nc, s, true, llt); - if (p == 0) { - throw new InternalException(); - } - s.getTransaction().storeFrame(bd, len, s, llt); + } finally { + if (extllt == null) { + llt.commit(); } - ((EntityContainer) o).setTran(nc.getHeader().getTran()); - ((EntityContainer) o).setRowId(nc.getHeader().getRowID()); - ((EntityContainer) o).setDataChunk(nc); } //system-only table in-memory indexes - this.addIndexValue(nc); bdw.release(); + this.addIndexValue(nc); Metrics.get("persistInsertChunk").stop(); - Metrics.get("persistInsertIndex").start(); - //remove external llt for deadlock prevent - persistIndexes(nc, s, llt); - Metrics.get("persistInsertIndex").stop(); - return nc; - } else { - - DataChunk udc = null; - - if (!isNoTran()) { //save undo information - udc = dc.lock(s, llt); - } - final int len = dc.getBytesAmount(); final FrameData bd = Instance.getInstance().getFrameById(dc.getHeader().getRowID().getFileId() + dc.getHeader().getRowID().getFramePointer()); - final int newlen = bd.updateChunk(dc, o, s, llt); - final int diff = newlen - len - bd.getFrameFree(); + final LLT llt = extllt==null?LLT.getLLT():extllt; + try { - if (diff > 0) { - lockIndexes(dc, s, llt); - bd.removeChunk(dc.getHeader().getRowID().getRowPointer(), s, llt); - final WaitFrame ibw = getAvailableFrame(o, fpart); - final FrameData ib = ibw.getBd(); + final int newlen = bd.updateChunk(dc, o, s, llt); + final int diff = newlen - len - bd.getFrameFreeNoTran(); - final int p = ib.getDataFrame().insertChunk(dc, s, true, llt); - if (p == 0) { - final FrameData nb = this.createNewFrame(ib, ib.getFile(), 0, 0, false, true, false, s, llt); - nb.getDataFrame().insertChunk(dc, s, true, llt); - if (isNoTran()) { + if (diff > 0) { + bd.removeChunk(dc.getHeader().getRowID().getRowPointer(), s, llt); + final WaitFrame ibw = getAvailableFrame(o, fpart); + final FrameData ib = ibw.getBd(); + + final int p = ib.getDataFrame().insertChunk(dc, s, true, llt); + if (p == 0) { + final FrameData nb = this.createNewFrame(ib, ibw, ib.getFile(), 0, 0, false, false, false, s, llt); + nb.getDataFrame().insertChunk(dc, s, true, llt); usedSpace(bd, bd.getUsed() - len, true, s, llt); usedSpace(nb, newlen, true, s, llt); } else { - s.getTransaction().storeFrame(bd, udc == null ? null : udc.getUframe(),0 - len, s, llt); - s.getTransaction().storeFrame(nb, udc == null ? null : udc.getUframe(), newlen, s, llt); - s.getTransaction().storeFrame(nb, newlen, s, llt); - } - } else { - if (isNoTran()) { usedSpace(bd, bd.getUsed() - len, true, s, llt); usedSpace(ib, ib.getUsed() + newlen, true, s, llt); - } else { - s.getTransaction().storeFrame(bd, udc == null ? null : udc.getUframe(),0 - len, s, llt); - s.getTransaction().storeFrame(ib, udc == null ? null : udc.getUframe(), newlen, s, llt); - s.getTransaction().storeFrame(ib, newlen, s, llt); - } - } - updateIndexesPtr(dc, s, llt); - //update rowid - if (!isNoTran()) { -// logger.debug("updated: " + dc.getHeader().getRowID() + ", old: "+dc.getUndoChunk().getFile()+" "+dc.getUndoChunk().getFrame()+" "+dc.getUndoChunk().getPtr()); - ((EntityContainer) o).setRowId(dc.getHeader().getRowID()); - dc.getUndoChunk().setFile(dc.getHeader().getRowID().getFileId()); - dc.getUndoChunk().setFrame(dc.getHeader().getRowID().getFramePointer()); - dc.getUndoChunk().setPtr(dc.getHeader().getPtr()); - if (udc != null) { - udc.setEntity(dc.getUndoChunk()); - udc.getUframe().updateChunk(udc, dc.getUndoChunk(), s, llt); } + + ibw.release(); + return dc; + + } else { + usedSpace(bd, bd.getUsed() + newlen - len, true, s, llt); } - ibw.release(); + return dc; + } finally { if (extllt == null) { llt.commit(); } + } + } + } else { + synchronized (this) { + if (s.getTransaction() == null || !s.getTransaction().started || s.getTransaction().getMTran() == 0) { + s.startStatement(); + } + if (s.getTransaction() == null || !s.getTransaction().started || s.getTransaction().getMTran() == 0) { + throw new InternalException(); + } + final EntityContainer to = (EntityContainer) o; + if (to.getTran() != null && to.getTran().getCid() == 0) { + if (to.getTran().getTransId() != s.getTransaction().getTransId()) { + logger.error("unable to persist an object that has not been changed by current transaction"); + return null; + } + } - return dc; + final LLT llt = extllt == null ? LLT.getLLT() : extllt; + try { + if (this.isIndex()) { + this.add(new RowId(0, 0, 0), o, s, llt); + return null; + } + + Metrics.get("persistGetChunk").start(); + final DataChunk dc = isIdFieldNoCheck() ? null : this.getChunkByEntity(o, s); + Metrics.get("persistGetChunk").stop(); + + if (dc == null) { + Metrics.get("persistInsertChunk").start(); + final DataChunk nc = new DataChunk(o, s, this); + final int len = nc.getBytesAmount(); + final WaitFrame bdw = getAvailableFrame(o, fpart); + final FrameData bd = bdw.getBd(); + final int diff = len - bd.getFrameFree(); + + if (diff > 0) { + final FrameData nb = this.createNewFrame(bd, bdw, bd.getFile(), 0, 0, false, false, false, s, llt); + nb.getDataFrame().insertChunk(nc, s, true, llt); + s.getTransaction().storeFrame(nb, len, s, llt); + } else { + final int p = bd.insertChunk(nc, s, true, llt); + if (p == 0) { + throw new InternalException(); + } + s.getTransaction().storeFrame(bd, len, s, llt); + } + ((EntityContainer) o).setTran(nc.getHeader().getTran()); + ((EntityContainer) o).setRowId(nc.getHeader().getRowID()); + ((EntityContainer) o).setDataChunk(nc); + + bdw.release(); + Metrics.get("persistInsertChunk").stop(); + + Metrics.get("persistInsertIndex").start(); + //remove external llt for deadlock prevent + persistIndexes(nc, s, llt); + Metrics.get("persistInsertIndex").stop(); + + return nc; - } else { - if (isNoTran()) { - usedSpace(bd, bd.getUsed() + newlen - len, true, s, llt); } else { - s.getTransaction().storeFrame(bd, udc == null ? null : udc.getUframe(), newlen - len, s, llt); + + final DataChunk udc = dc.lock(s, llt); + final int len = dc.getBytesAmount(); + final FrameData bd = Instance.getInstance().getFrameById(dc.getHeader().getRowID().getFileId() + dc.getHeader().getRowID().getFramePointer()); + final int newlen = bd.updateChunk(dc, o, s, llt); + final int diff = newlen - len - bd.getFrameFree(); + + if (diff > 0) { + lockIndexes(dc, s, llt); + bd.removeChunk(dc.getHeader().getRowID().getRowPointer(), s, llt); + final WaitFrame ibw = getAvailableFrame(o, fpart); + final FrameData ib = ibw.getBd(); + + final int p = ib.getDataFrame().insertChunk(dc, s, true, llt); + if (p == 0) { + final FrameData nb = this.createNewFrame(ib, ibw, ib.getFile(), 0, 0, false, false, false, s, llt); + nb.getDataFrame().insertChunk(dc, s, true, llt); + s.getTransaction().storeFrame(bd, udc == null ? null : udc.getUframe(), 0 - len, s, llt); + s.getTransaction().storeFrame(nb, udc == null ? null : udc.getUframe(), newlen, s, llt); + s.getTransaction().storeFrame(nb, newlen, s, llt); + } else { + s.getTransaction().storeFrame(bd, udc == null ? null : udc.getUframe(), 0 - len, s, llt); + s.getTransaction().storeFrame(ib, udc == null ? null : udc.getUframe(), newlen, s, llt); + s.getTransaction().storeFrame(ib, newlen, s, llt); + } + updateIndexesPtr(dc, s, llt); + //update rowid + ((EntityContainer) o).setRowId(dc.getHeader().getRowID()); + dc.getUndoChunk().setFile(dc.getHeader().getRowID().getFileId()); + dc.getUndoChunk().setFrame(dc.getHeader().getRowID().getFramePointer()); + dc.getUndoChunk().setPtr(dc.getHeader().getPtr()); + if (udc != null) { + udc.setEntity(dc.getUndoChunk()); + udc.getUframe().updateChunk(udc, dc.getUndoChunk(), s, llt); + } + + ibw.release(); + if (extllt == null) { + llt.commit(); + } + + return dc; + + } else { + s.getTransaction().storeFrame(bd, udc == null ? null : udc.getUframe(), newlen - len, s, llt); + } + + return dc; + } + } finally { + if (extllt == null) { + llt.commit(); } } - - return dc; - } - } finally { - if (extllt == null) { - llt.commit(); } } - } - private void persistIndexes(DataChunk c, Session s, LLT llt) throws Exception { - for (IndexDescript ids : this.getIndexNames()) { - final Table ixt = Instance.getInstance().getTableByName(SYSTEM_PKG_PREFIX + ids.getName()); - //create IndexChunk implementation - final Object io = ixt.getTableClass().getConstructor(new Class[]{c.getClass(),s.getClass()}).newInstance(new Object[]{c, s}); - ixt.add(c.getHeader().getRowID(), io, s, llt); - } } protected void delete (final Object o, final Session s, LLT extllt, boolean ignoreTransaction) throws Exception { @@ -1334,6 +1361,15 @@ protected void delete (final Object o, final Session s, LLT extllt, boolean igno } } + private void persistIndexes(DataChunk c, Session s, LLT llt) throws Exception { + for (IndexDescript ids : this.getIndexNames()) { + final Table ixt = Instance.getInstance().getTableByName(SYSTEM_PKG_PREFIX + ids.getName()); + //create IndexChunk implementation + final Object io = ixt.getTableClass().getConstructor(new Class[]{c.getClass(),s.getClass()}).newInstance(new Object[]{c, s}); + ixt.add(c.getHeader().getRowID(), io, s, llt); + } + } + private void deleteIndexes(DataChunk dc, boolean noTran, boolean remove, Session s, LLT llt) throws Exception { for (IndexDescript ids : this.getIndexNames()) { final DataChunk ic = dc.getIc(ids, s); @@ -1379,42 +1415,28 @@ private void updateIndexesPtr(DataChunk dc, Session s, LLT llt) throws Exception } //todo deprecated started param - public FrameData createNewFrame(final FrameData frame, final int fileId, final int frameType, final long allocId, final boolean started, final boolean setlbs, final boolean external, final Session s, final LLT llt) throws Exception { + public FrameData createNewFrame(final FrameData frame, final WaitFrame wb, final int fileId, final int frameType, final long allocId, final boolean started, final boolean setlbs, final boolean external, final Session s, final LLT llt) throws Exception { final DataFile df = Storage.getStorage().getDataFileById(fileId); - final FrameData bd = df.createNewFrame(frame, frameType, allocId, started, external, this, s, llt); + final FrameData bd = df.createNewFrame(frame, wb, frameType, allocId, started, external, this, s, llt); + if (!external) { synchronized (this) { boolean done = true; if (setlbs && !this.getName().equals(UndoChunk.class.getName())) { done = false; - for (WaitFrame wb : this.lbs) { - if (wb.trySetBd(frame, bd, frameType)) { + for (WaitFrame w : this.lbs) { + if (w.trySetBd(frame, bd, frameType)) { done = true; break; } } } if (!done) { - // todo evicted frame -> metric - for (WaitFrame wb : this.lbs) { - if (wb.getBd().getFile() == bd.getFile()) { - // remove evicted ptr from prevframe - frame.setNextFrame(wb.getBd().getPtr()); - s.persist(frame, llt); - } - } - bd.clearCurrent(); - s.persist(bd, llt); - logger.info("evict frame " + bd.getObjectId() + ":" + bd.getFile() + ":" + bd.getPtr() + " " + Thread.currentThread().getName()); - for (WaitFrame wb : this.lbs) { - final WaitFrame bd_ = wb.acquire(fileId, false); - if (bd_ != null) { - return bd_.getBd(); - } - } + throw new RuntimeException("cannot set LBS frame"); } } } + return bd; } @@ -1422,6 +1444,7 @@ public synchronized FrameData allocateFrame(DataFile df, Table t, Session s, LLT return df.allocateFrame(t, s, llt); } + @Deprecated public void lockTable(Session s) { try { s.persist(new RetrieveLock(this.objectId, s.getTransaction().getTransId())); //insert @@ -1477,7 +1500,7 @@ protected synchronized RetrieveQueue getTableContentQueue(Session s) { final ManagedCallable r = new ManagedCallable() { @Override public Boolean call() throws Exception { - Thread.currentThread().setName("interference-retrieve-queue-thread"); + Thread.currentThread().setName("interference-retrieve-queue-thread-"+Thread.currentThread().getId()); synchronized (t) { final LinkedBlockingQueue bds = Instance.getInstance().getTableById(getObjectId()).getFrames(); boolean cnue = true; @@ -1517,7 +1540,7 @@ protected synchronized RetrieveQueue getIndexContentQueue(Session s) { final ManagedCallable r = new ManagedCallable() { @Override public Boolean call() throws Exception { - Thread.currentThread().setName("interference-retrieve-index-queue-thread"); + Thread.currentThread().setName("interference-retrieve-index-queue-thread-"+Thread.currentThread().getId()); synchronized (t) { List startframes = new ArrayList<>(); startframes.add(t.fileStart + t.frameStart); @@ -1552,10 +1575,9 @@ public Boolean call() throws Exception { inNodes.add(Instance.getInstance().getFrameById(levelNodes.get(k).getFrameChunks(s).get(i).getHeader().getFramePtr()).getIndexFrame()); } if (k == levelNodes.size() - 1) { - int lcf = levelNodes.get(k).getLcF(); - long lcb = levelNodes.get(k).getLcB(); - if (lcf > 0) { - inNodes.add(Instance.getInstance().getFrameById(lcf + lcb).getIndexFrame()); + final long lcId = levelNodes.get(k).getLcId(); + if (lcId > 0) { + inNodes.add(Instance.getInstance().getFrameById(lcId).getIndexFrame()); } } } @@ -1668,7 +1690,7 @@ public DataChunk getChunkByEntity (Object o, Session s) throws Exception { } else if (ix != null) { return (DataChunk) ix.getIndex().getObjectByKey(new IndexElementKey(new Object[]{idMethod.invoke(o, null)})); } else { - final byte[] id = new DataChunkId(o, s).getIdBytes(); + final byte[] id = new DataChunkId(o, this, s).getIdBytes(); if (id!=null) { final LinkedBlockingQueue bds = Instance.getInstance().getTableById(this.getObjectId()).getFrames(); boolean cnue = true; @@ -1691,7 +1713,7 @@ public DataChunk getChunkByEntity (Object o, Session s) throws Exception { final Table idt = getFirstIndexByIdColumn(); if (to.getDataChunk() == null) { //todo need further optimize - final DataChunkId dcid = new DataChunkId(o, s); + final DataChunkId dcid = new DataChunkId(o, this, s); if (idt != null) { final DataChunk idc = idt.getObjectByKey(new ValueSet(dcid.getId()), s); if (idc == null) { @@ -1818,12 +1840,12 @@ private synchronized void add (RowId rowid, Object o, Session s, LLT extllt) thr target = Instance.getInstance().getFrameById(cc.getHeader().getFramePtr()); target.setMv(cc.getDcs()); //set non-persitent maxvalue } else { - final long lcId = target.getIndexFrame().getLcF() + target.getIndexFrame().getLcB(); - target = Instance.getInstance().getFrameById(lcId); //get by last child + target = Instance.getInstance().getFrameById(target.getIndexFrame().getLcId()); //get by last child if (target == null) { - logger.info("null target returned for frame id " + lcId); + logger.error("null target returned for frame id " + target.getIndexFrame().getLcId()); } } + llt.add(target.getIndexFrame()); target.getIndexFrame().setParentF(parentF); target.getIndexFrame().setParentB(parentB); } @@ -1854,7 +1876,7 @@ private synchronized void add (RowId rowid, Object o, Session s, LLT extllt) thr if (prevtg.getIndexFrame().getParentF() == 0) { //add parent ElementList - always type 2 (node) final int nfileId = getIndexFileId(prevtg); // target = createNewFrame(prevtg, prevtg.getFile(), IndexFrame.INDEX_FRAME_NODE, s); - target = createNewFrame(prevtg, nfileId, IndexFrame.INDEX_FRAME_NODE, 0, false, false, false, s, llt); + target = createNewFrame(prevtg, null, nfileId, IndexFrame.INDEX_FRAME_NODE, 0, false, false, false, s, llt); this.setFileStart(target.getIndexFrame().getFile()); this.setFrameStart(target.getIndexFrame().getPointer()); s.persist(this, llt); @@ -1862,8 +1884,9 @@ private synchronized void add (RowId rowid, Object o, Session s, LLT extllt) thr target = Instance.getInstance().getFrameById(prevtg.getIndexFrame().getParentF() + prevtg.getIndexFrame().getParentB()); //get by last child } if (newlist.getDivided() == 0) { - target.getIndexFrame().setLcF(newlist.getFile()); //lc must be > 0 (0 is first leaf ElementList) - target.getIndexFrame().setLcB(newlist.getPointer()); //lc must be > 0 (0 is first leaf ElementList) + //paranoid fix + llt.add(target.getIndexFrame()); + target.getIndexFrame().setLcId(newlist.getPtr()); //lc must be > 0 (0 is first leaf ElementList) } } } @@ -1955,10 +1978,9 @@ private synchronized List getLocalContent(long start, Session s) throws E inNodes.add(Instance.getInstance().getFrameById(levelNodes.get(k).getFrameChunks(s).get(i).getHeader().getFramePtr()).getIndexFrame()); } if (k==levelNodes.size()-1) { - int lcf = levelNodes.get(k).getLcF(); - long lcb = levelNodes.get(k).getLcB(); - if (lcf>0) { - inNodes.add(Instance.getInstance().getFrameById(lcf+lcb).getIndexFrame()); + final long lcId = levelNodes.get(k).getLcId(); + if (lcId>0) { + inNodes.add(Instance.getInstance().getFrameById(lcId).getIndexFrame()); } } } @@ -1978,7 +2000,7 @@ private synchronized LinkedBlockingQueue getLeafFrames (Session s) th Runnable r = new Runnable() { @Override public void run() { - Thread.currentThread().setName("interference-retrieve-index-frames-thread"); + Thread.currentThread().setName("interference-retrieve-index-frames-thread-"+Thread.currentThread().getId()); try { for (Long start : startfs) { getLocalLeafFrames(q, start, s); @@ -2017,10 +2039,9 @@ private synchronized LinkedBlockingQueue getLocalLeafFrames (LinkedBl inNodes.add(Instance.getInstance().getFrameById(levelNodes.get(k).getFrameChunks(s).get(i).getHeader().getFramePtr()).getIndexFrame()); } if (k==levelNodes.size()-1) { - int lcf = levelNodes.get(k).getLcF(); - long lcb = levelNodes.get(k).getLcB(); - if (lcf>0) { - inNodes.add(Instance.getInstance().getFrameById(lcf+lcb).getIndexFrame()); + final long lcId = levelNodes.get(k).getLcId(); + if (lcId>0) { + inNodes.add(Instance.getInstance().getFrameById(lcId).getIndexFrame()); } } } @@ -2059,10 +2080,9 @@ public synchronized String getInfo(Session s) throws Exception { inNodes.add(Instance.getInstance().getFrameById(levelNodes.get(k).getFrameChunks(s).get(i).getHeader().getFramePtr()).getIndexFrame()); } if (k==levelNodes.size()-1) { - int lcf = levelNodes.get(k).getLcF(); - long lcb = levelNodes.get(k).getLcB(); - if (lcf>0) { - inNodes.add(Instance.getInstance().getFrameById(lcf+lcb).getIndexFrame()); + final long lcId = levelNodes.get(k).getLcId(); + if (lcId>0) { + inNodes.add(Instance.getInstance().getFrameById(lcId).getIndexFrame()); } } } @@ -2110,10 +2130,9 @@ private synchronized DataChunk getLocalObjectByKey (long start, ValueSet key, Se if (cc!=null) { target = Instance.getInstance().getFrameById(cc.getHeader().getFramePtr()).getIndexFrame(); } else { - final long lcId = target.getLcF()+target.getLcB(); - target = Instance.getInstance().getFrameById(lcId).getIndexFrame(); //get by last child + target = Instance.getInstance().getFrameById(target.getLcId()).getIndexFrame(); //get by last child if (target == null) { - logger.info("null target returned for frame id "+lcId); + logger.error("null target returned for frame id "+target.getLcId()); } } } @@ -2140,8 +2159,8 @@ private synchronized List getLocalObjectsByKey (long start, ValueSet ntargets.add(Instance.getInstance().getFrameById(i).getIndexFrame()); } } - if (target.getLcF()>0) { - ntargets.add(Instance.getInstance().getFrameById(target.getLcF()+target.getLcB()).getIndexFrame()); //get by last child + if (target.getLcId()>0) { + ntargets.add(Instance.getInstance().getFrameById(target.getLcId()).getIndexFrame()); //get by last child } } } @@ -2170,8 +2189,8 @@ private synchronized void removeObjects(ValueSet key, Object o, Session s, LLT l ntargets.add(Instance.getInstance().getFrameById(i)); } } - if (target.getIndexFrame().getLcF()>0) { - ntargets.add(Instance.getInstance().getFrameById(target.getIndexFrame().getLcF()+target.getIndexFrame().getLcB())); //get by last child + if (target.getIndexFrame().getLcId()>0) { + ntargets.add(Instance.getInstance().getFrameById(target.getIndexFrame().getLcId())); //get by last child } } } diff --git a/src/main/java/su/interference/persistent/Transaction.java b/src/main/java/su/interference/persistent/Transaction.java index 504aaf0..5444fb4 100644 --- a/src/main/java/su/interference/persistent/Transaction.java +++ b/src/main/java/su/interference/persistent/Transaction.java @@ -55,6 +55,8 @@ public class Transaction implements Serializable { public static final int TRAN_SERIALIZABLE = 1; @Transient public static final int TRAN_THR = 9; + @Transient + public static final int TRAN_LEGACY = 10; @Id @Column @@ -149,9 +151,9 @@ public Transaction (DataChunk chunk) throws IllegalAccessException, InternalExce public void createUndoFrames(Session s, LLT llt) throws Exception { final Table t = Instance.getInstance().getTableByName("su.interference.persistent.UndoChunk"); for (DataFile f : Storage.getStorage().getUndoFiles()) { - final FrameData ub = t.createNewFrame(null, f.getFileId(), 0, 0, false, true, false, s, llt); + final FrameData ub = t.createNewFrame(null, null, f.getFileId(), 0, 0, false, true, false, s, llt); ub.setSynced(false); - setNewLB(null, ub, false); + setNewLB(null, ub); } } @@ -164,7 +166,7 @@ public WaitFrame getAvailableFrame(final FilePartitioned o, final boolean fpart) while (true) { // for (int i=0; i fptr = new ArrayList(); final Process lsync = Instance.getInstance().getProcessByName("lsync"); final SyncQueue syncq = (SyncQueue) lsync.getRunnable(); if (remote) { @@ -223,9 +215,6 @@ public synchronized void commit (Session s, boolean remote) { for (Long frameId : rframes) { Instance.getInstance().getFrameById(frameId).decreaseTcounter(this.transId); } - for (TransFrame tb : Instance.getInstance().getTransFramesByTransId(this.transId)) { - s.delete(tb); - } this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); this.transType = TRAN_THR; s.persist(this); @@ -240,34 +229,12 @@ public synchronized void commit (Session s, boolean remote) { if (isLocal()) { sendBroadcastEvents(CommandEvent.COMMIT, s); try { - this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); - this.transType = TRAN_THR; - s.persist(this); - syncq.commit(); - for (TransFrame tb : tframes) { - boolean hasfb = false; if (tb.getUframeId() > 0) { // undo transframe record - for (Long f : fptr) { - if (f == tb.getUframeId()) { - hasfb = true; - break; - } - } final List rls = Instance.getInstance().getRetrieveLocksByObjectId(tb.getObjectId()); if (rls.size() == 0) { final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); - //deallocate undo frame - final FrameData ub = Instance.getInstance().getFrameById(tb.getUframeId()); - //store frame params as free - if (!hasfb) { - final FreeFrame fb = new FreeFrame(0, tb.getUframeId(), ub.getSize()); - s.persist(fb); //insert - s.delete(ub); - fptr.add(tb.getUframeId()); - } cb.decreaseTcounter(this.transId); - s.delete(tb); } } else { //change transframe record final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); @@ -278,10 +245,14 @@ public synchronized void commit (Session s, boolean remote) { } else { s.persist(cb); //update new size value to dataframe } - s.delete(tb); } } + this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); + this.transType = TRAN_THR; + s.persist(this); + syncq.commit(); + if (this.join != null) { join.deallocate(s); } @@ -301,16 +272,12 @@ public synchronized void commit (Session s, boolean remote) { public synchronized void rollback (Session s, boolean remote) { final ArrayList ubd1 = new ArrayList<>(); final ArrayList ubd2 = new ArrayList<>(); - final ArrayList fptr = new ArrayList<>(); if (remote) { if (!isLocal()) { try { for (Long frameId : rframes) { Instance.getInstance().getFrameById(frameId).decreaseTcounter(this.transId); } - for (TransFrame tb : Instance.getInstance().getTransFramesByTransId(this.transId)) { - s.delete(tb); - } this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); this.transType = TRAN_THR; s.persist(this); //update @@ -333,15 +300,14 @@ public synchronized void rollback (Session s, boolean remote) { for (TransFrame tb : tframes) { final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); - if (cb.getFrame() instanceof DataFrame) { - if (!ubd1.contains(cb)) { - ubd1.add(cb); - } - } - if (cb.getFrame() instanceof IndexFrame) { + if (cb.isIndex()) { if (!ubd2.contains(cb)) { ubd2.add(cb); } + } else { + if (!ubd1.contains(cb)) { + ubd1.add(cb); + } } } for (FrameData ub : ubd2) { @@ -384,30 +350,13 @@ public synchronized void rollback (Session s, boolean remote) { } for (TransFrame tb : tframes) { - boolean hasfb = false; if (tb.getUframeId() > 0) { // undo transframe record - for (Long f : fptr) { - if (f == tb.getUframeId()) { - hasfb = true; - break; - } - } final List rls = Instance.getInstance().getRetrieveLocksByObjectId(tb.getObjectId()); if (rls.size() == 0) { final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); - //deallocate undo frame - final FrameData ub = Instance.getInstance().getFrameById(tb.getUframeId()); - //store frame params as free - if (!hasfb) { - final FreeFrame fb = new FreeFrame(0, tb.getUframeId(), ub.getSize()); - s.persist(fb); //insert - s.delete(ub); - fptr.add(tb.getUframeId()); - } if (cb != null) { cb.decreaseTcounter(this.transId); } - s.delete(tb); } } else { //change transframe record final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); @@ -416,7 +365,6 @@ public synchronized void rollback (Session s, boolean remote) { logger.info("rollback freeing frame " + cb.getFile() + " " + cb.getPtr()); freeFrames(cb, s); } - s.delete(tb); } } this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); diff --git a/src/main/java/su/interference/proxy/POJOProxyFactory.java b/src/main/java/su/interference/proxy/POJOProxyFactory.java index 37fd506..a9e797f 100644 --- a/src/main/java/su/interference/proxy/POJOProxyFactory.java +++ b/src/main/java/su/interference/proxy/POJOProxyFactory.java @@ -219,7 +219,7 @@ public synchronized Class register (String name) throws ClassNotFoundException, sb.append(sparam); sb.append("; }\n"); } - sb.append(" if (session.isStream()||tran==null||(tran!=null&&tran.getTransType()==su.interference.persistent.Transaction.TRAN_THR)||(tran!=null&&tran.getTransId() == "); + sb.append(" if (session.isStream()||tran==null||(tran!=null&&tran.getTransType()>=su.interference.persistent.Transaction.TRAN_THR)||(tran!=null&&tran.getTransId() == "); sb.append("session.getTransaction().getTransId())) {\n"); sb.append(" return super.get"); sb.append(f[i].getName().substring(0,1).toUpperCase()); diff --git a/src/main/java/su/interference/sql/ContainerFrame.java b/src/main/java/su/interference/sql/ContainerFrame.java index 16e9ed8..e689cef 100644 --- a/src/main/java/su/interference/sql/ContainerFrame.java +++ b/src/main/java/su/interference/sql/ContainerFrame.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2020 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -25,13 +25,9 @@ this software and associated documentation files (the "Software"), to deal in package su.interference.sql; import su.interference.core.Chunk; -import su.interference.core.ChunkIdComparator; import su.interference.core.FrameOrderComparator; -import su.interference.exception.InternalException; import su.interference.persistent.Session; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collections; import java.util.List; diff --git a/src/main/java/su/interference/sql/RemoteTask.java b/src/main/java/su/interference/sql/RemoteTask.java index bf5db33..0f6c04b 100644 --- a/src/main/java/su/interference/sql/RemoteTask.java +++ b/src/main/java/su/interference/sql/RemoteTask.java @@ -56,7 +56,7 @@ public RemoteTask(Cursor cur, int nodeId, Map joins, Strin } public Boolean call() throws Exception { - Thread.currentThread().setName("interference-remote-task-thread"); + Thread.currentThread().setName("interference-remote-task-thread-"+Thread.currentThread().getId()); final TransportEvent transportEvent = new SQLEvent(nodeId, cur.getCursorId(), joins, rightType, 0, null, null, 0, true); TransportContext.getInstance().send(transportEvent); transportEvent.getLatch().await(); diff --git a/src/main/java/su/interference/sql/SQLCursor.java b/src/main/java/su/interference/sql/SQLCursor.java index 476f667..1e2ebe9 100644 --- a/src/main/java/su/interference/sql/SQLCursor.java +++ b/src/main/java/su/interference/sql/SQLCursor.java @@ -506,7 +506,7 @@ public ResultSet flushTarget() throws InternalException { Runnable r = new Runnable() { @Override public void run() { - Thread.currentThread().setName("interference-sql-cursor-flush-thread"); + Thread.currentThread().setName("interference-sql-cursor-flush-thread-"+Thread.currentThread().getId()); try { while (hasNextFrame()) { nextFrame(); diff --git a/src/main/java/su/interference/transport/HeartBeatProcess.java b/src/main/java/su/interference/transport/HeartBeatProcess.java index a44d42e..ec7243c 100644 --- a/src/main/java/su/interference/transport/HeartBeatProcess.java +++ b/src/main/java/su/interference/transport/HeartBeatProcess.java @@ -56,7 +56,7 @@ public final class HeartBeatProcess implements Runnable, ManagedProcess { } public void run () { - Thread.currentThread().setName("interference-heartbeat-thread"); + Thread.currentThread().setName("interference-heartbeat-thread-"+Thread.currentThread().getId()); if (Instance.getInstance().getClusterState() == Instance.CLUSTER_STATE_DOWN) { f = false; } diff --git a/src/main/java/su/interference/transport/SyncFrameEvent.java b/src/main/java/su/interference/transport/SyncFrameEvent.java index 06a57ab..74015ef 100644 --- a/src/main/java/su/interference/transport/SyncFrameEvent.java +++ b/src/main/java/su/interference/transport/SyncFrameEvent.java @@ -93,7 +93,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { final int order = (f.getFileId() % Storage.MAX_NODES) % Config.getConfig().FILES_AMOUNT; if (order == allocOrder) { final LLT llt_ = LLT.getLLT(); //df access reordering prevent deadlock - bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), false, false, true, s, llt_); + bd = t.createNewFrame(null, null, f.getFileId(), b.getFrameType(), b.getAllocId(), false, false, true, s, llt_); llt_.commit(); bd.setFrame(null); b.setDf(f); diff --git a/src/main/java/su/interference/transport/TransportChannel.java b/src/main/java/su/interference/transport/TransportChannel.java index 6019e25..c180a53 100644 --- a/src/main/java/su/interference/transport/TransportChannel.java +++ b/src/main/java/su/interference/transport/TransportChannel.java @@ -72,7 +72,7 @@ protected void start(CountDownLatch latch) { @Override public void run() { Socket sock = null; - Thread.currentThread().setName("interference-transport-channel-"+channelId+"-thread"); + Thread.currentThread().setName("interference-transport-channel-"+channelId+"-thread-"+Thread.currentThread().getId()); try { try { sock = new Socket(); @@ -90,7 +90,7 @@ public void run() { final ObjectOutputStream oos = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream(), Config.getConfig().WRITE_BUFFER_SIZE)); boolean running = true; started.set(true); - Thread.currentThread().setName("transport channel thread 2"); + Thread.currentThread().setName("transport-channel-thread-"+Thread.currentThread().getId()); while (running) { final TransportMessage transportMessage = mq.peek() == null ? cbq.poll() : mq.poll(); try { diff --git a/src/main/java/su/interference/transport/TransportContext.java b/src/main/java/su/interference/transport/TransportContext.java index 66b58dc..e22e9fe 100644 --- a/src/main/java/su/interference/transport/TransportContext.java +++ b/src/main/java/su/interference/transport/TransportContext.java @@ -72,7 +72,7 @@ protected void startClient() { pool.submit(new Runnable() { @Override public void run() { - Thread.currentThread().setName("interference-transport-context-thread"); + Thread.currentThread().setName("interference-transport-context-thread-"+Thread.currentThread().getId()); boolean started_ = true; while (started_) { final TransportEvent transportEvent = mq.peek(); diff --git a/src/main/java/su/interference/transport/TransportServer.java b/src/main/java/su/interference/transport/TransportServer.java index a9a48a6..226661d 100644 --- a/src/main/java/su/interference/transport/TransportServer.java +++ b/src/main/java/su/interference/transport/TransportServer.java @@ -61,7 +61,7 @@ private TransportServer() { @Override public void run() { logger.info("event server started on port " + serverPort); - Thread.currentThread().setName("interference-event-server-thread-"+serverPort); + Thread.currentThread().setName("interference-event-server-thread-"+serverPort+"-"+Thread.currentThread().getId()); while (started.get()) { try { serverSocket.setSoTimeout(10000); @@ -83,7 +83,7 @@ protected Class resolveClass(ObjectStreamClass objectStreamClass) @Override public void run() { boolean running = true; - Thread.currentThread().setName("interference-transport-processor-thread-"+serverPort); + Thread.currentThread().setName("interference-transport-processor-thread-"+serverPort+"-"+Thread.currentThread().getId()); logger.info("transport message processor started on port "+serverPort); while (running) { try { diff --git a/src/main/java/su/interference/transport/TransportSyncTask.java b/src/main/java/su/interference/transport/TransportSyncTask.java index bbfe287..242055e 100644 --- a/src/main/java/su/interference/transport/TransportSyncTask.java +++ b/src/main/java/su/interference/transport/TransportSyncTask.java @@ -55,7 +55,7 @@ public TransportSyncTask(ArrayList frames) { //todo need refactor for MT, now pesisted frame algorithm restrict this, should running in ONE thread @SuppressWarnings("unchecked") public void run () { - Thread.currentThread().setName("interference-transport-sync-thread"); + Thread.currentThread().setName("interference-transport-sync-thread-"+Thread.currentThread().getId()); final SyncFrame[] sb = frames.stream().filter(b -> b.isAllowR()).collect(Collectors.toList()).toArray(new SyncFrame[]{}); try {