From 8d9baab8eb89df920254dde06bd8ce8ae91963e9 Mon Sep 17 00:00:00 2001 From: "yuriy.glotanov" Date: Mon, 12 Apr 2021 19:44:09 +0300 Subject: [PATCH] RELEASE 2021.1 hotfix patch 12.04 --- README.md | 3 +- .../java/su/interference/core/DataChunk.java | 28 ++-- .../java/su/interference/core/DataFrame.java | 7 +- src/main/java/su/interference/core/Frame.java | 28 ++-- .../java/su/interference/core/IndexChunk.java | 3 +- .../su/interference/core/IndexDescript.java | 20 ++- .../java/su/interference/core/IndexFrame.java | 34 +++-- .../java/su/interference/core/Instance.java | 60 ++++----- src/main/java/su/interference/core/LLT.java | 24 ++-- .../java/su/interference/core/Storage.java | 20 +-- .../java/su/interference/core/SyncFrame.java | 20 ++- .../java/su/interference/core/SyncQueue.java | 115 +++++++++-------- .../su/interference/core/SystemCleanUp.java | 121 +++++++++--------- .../su/interference/persistent/DataFile.java | 9 +- .../su/interference/persistent/FrameData.java | 41 ++++-- .../su/interference/persistent/FrameSync.java | 6 +- .../su/interference/persistent/FreeFrame.java | 13 +- .../su/interference/persistent/Table.java | 47 ++++--- .../interference/persistent/TransFrame.java | 8 +- .../interference/persistent/Transaction.java | 21 +-- .../interference/proxy/IOTProxyFactory.java | 13 ++ .../su/interference/proxy/RSProxyFactory.java | 2 + .../transport/SyncFrameEvent.java | 40 +++--- .../transport/TransportChannel.java | 4 +- .../transport/TransportContext.java | 3 +- .../transport/TransportSyncTask.java | 12 +- 26 files changed, 396 insertions(+), 306 deletions(-) diff --git a/README.md b/README.md index 3f20331..681f7db 100644 --- a/README.md +++ b/README.md @@ -65,8 +65,7 @@ Next, specify the necessary set of keys in the project -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false --Xms256m --Xmn512m +-Xms1g -Xmx4g -XX:MaxMetaspaceSize=256m -XX:ParallelGCThreads=8 diff --git a/src/main/java/su/interference/core/DataChunk.java b/src/main/java/su/interference/core/DataChunk.java index 93cc951..2389aa8 100644 --- a/src/main/java/su/interference/core/DataChunk.java +++ b/src/main/java/su/interference/core/DataChunk.java @@ -43,8 +43,6 @@ this software and associated documentation files (the "Software"), to deal in import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static su.interference.persistent.Table.SYSTEM_PKG_PREFIX; - /** * @author Yuriy Glotanov * @since 1.0 @@ -355,7 +353,7 @@ public DataChunk (byte[] b, int file, long frame, int hsize, Table t, Class c) this.chunk = bsc.getBytes(); } - //constructor for clone method - de-serialize chunk only without header + //constructor for clone method - de-serialize chunk only without header public DataChunk (byte[] b, Table t, RowHeader h, DataChunk source) { this.chunk = b; this.header = h; @@ -695,13 +693,13 @@ public DataChunk cloneEntity(Session s) throws InternalException { return dc_; } - public DataChunk restore(UndoChunk uc) throws Exception { + public DataChunk restore(UndoChunk uc, Session s, LLT llt) throws Exception { // since we are using a dirty hack with changing the header in a source chunk for rollback his, // we need to first delete the source chunk in the target block to prevent inconsistency within ChunkMap / chunk headers if (!(this.getHeader().getRowID().getFileId() == uc.getFile() && this.getHeader().getRowID().getFramePointer() == uc.getFrame())) { final long srcFrameId = uc.getFile() + uc.getFrame(); final Frame srcFrame = Instance.getInstance().getFrameById(srcFrameId).getFrame(); - srcFrame.removeChunk(uc.getPtr(), null, true); + srcFrame.removeChunk(uc.getPtr(), llt, true); } return this; } @@ -727,7 +725,6 @@ public void setHeader(Header header) { } //lock mechanism - private synchronized DataChunk insertUC (UndoChunk uc, Session s, LLT llt) throws Exception { final WaitFrame ubw = s.getTransaction().getAvailableFrame(uc, true); final FrameData ub = ubw.getBd(); @@ -827,13 +824,12 @@ protected void cleanUpIcs() { } public DataChunk getIc(IndexDescript ids, Session s) throws Exception { - final Table ixt = Instance.getInstance().getTableByName(SYSTEM_PKG_PREFIX + ids.getName()); - final DataChunk ic = this.ics.get(ixt.getObjectId()); + final DataChunk ic = this.ics.get(ids.getIndex().getObjectId()); if (ic == null) { final ValueSet key = this.getValueByColumnName(ids.getColumns(), s); - final DataChunk ic_ = ixt.getObjectByKey(key, s); + final DataChunk ic_ = ids.getIndex().getObjectByKey(key, s); if (ic_ != null) { - this.ics.put(ixt.getObjectId(), ic_); + this.ics.put(ids.getIndex().getObjectId(), ic_); return ic_; } else { throw new RuntimeException("Unable to retrieve index chunk: " + ids.getName()); @@ -842,6 +838,13 @@ public DataChunk getIc(IndexDescript ids, Session s) throws Exception { return ic; } + public DataChunk getIcForUpdate(IndexDescript ids, Session s, LLT llt) throws Exception { + final DataChunk ic = getIc(ids, s); + llt.add(ic.getFrameData().getFrame()); + this.ics.remove(ids.getIndex().getObjectId()); + return getIc(ids, s); + } + public ValueSet getValueByColumnName(String[] columns, Session s) throws Exception { final Class c = this.getEntity().getClass(); final SystemEntity sa = (SystemEntity)c.getAnnotation(SystemEntity.class); @@ -874,4 +877,9 @@ public synchronized String getHexByChunk() { return sb.toString(); } + public FrameData getFrameData() { + final long ptr = this.getHeader().getRowID().getFileId() + this.getHeader().getRowID().getFramePointer(); + return Instance.getInstance().getFrameById(ptr); + } + } diff --git a/src/main/java/su/interference/core/DataFrame.java b/src/main/java/su/interference/core/DataFrame.java index 3368d4e..c44703d 100644 --- a/src/main/java/su/interference/core/DataFrame.java +++ b/src/main/java/su/interference/core/DataFrame.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -130,8 +130,9 @@ public DataFrame(byte[] b, int file, long pointer, Map imap, Map getFrameChunks (Session s) { public synchronized void rollbackTransaction(Transaction tran, ArrayList ubs, Session s) throws Exception { data.check(); + final LLT llt = LLT.getLLT(); + llt.add(this); //rollback inserted records final ArrayList r = new ArrayList<>(); @@ -585,7 +591,7 @@ public synchronized void rollbackTransaction(Transaction tran, ArrayList imap, Map ubs, Session s) throws Exception { data.check(); + final LLT llt = LLT.getLLT(); + llt.add(this); final Map ucmap = new HashMap(); if (ubs!=null) { @@ -143,20 +145,29 @@ public synchronized void rollbackTransaction(Transaction tran, ArrayList r = new ArrayList<>(); for (Chunk c : data.getChunks()) { if (c.getHeader().getTran().getTransId() == tran.getTransId()) { final DataChunk dc = ucmap.get(c.getHeader().getPtr()); - final DataChunk dc_ = ((IndexChunk) c.getEntity()).getDataChunk().getUndoChunk().getDataChunk(); - ((DataChunk) c).setUndoChunk(null); - ((DataChunk) c).cleanUpIcs(); - ((IndexChunk)c.getEntity()).setDataChunk(dc_); - ((IndexChunk)c.getEntity()).setFramePtrRowId(dc_.getHeader().getRowID()); - ((DataChunk)c).getHeader().setFramePtr(dc_.getHeader().getRowID()); + if (dc != null) { + final DataChunk dc_ = ((IndexChunk) c.getEntity()).getDataChunk().getUndoChunk().getDataChunk(); + ((DataChunk) c).setUndoChunk(null); + ((DataChunk) c).cleanUpIcs(); + ((IndexChunk) c.getEntity()).setDataChunk(dc_); + ((IndexChunk) c.getEntity()).setFramePtrRowId(dc_.getHeader().getRowID()); + ((DataChunk) c).getHeader().setFramePtr(dc_.getHeader().getRowID()); + c.getHeader().setState(Header.RECORD_NORMAL_STATE); + } else { + if (c.getHeader().getState() == Header.RECORD_NORMAL_STATE) { + r.add(c.getHeader().getPtr()); + } + } } } + for (Integer i : r) { + data.removeByPtr(i); + } - final LLT llt = LLT.getLLT(); - llt.add(this); llt.commit(); } @@ -445,7 +456,8 @@ public synchronized long getLcId() { } public synchronized void setLcId(long lcId) { - this.setRes05((int)lcId%4096); - this.setRes07(lcId - lcId%4096); + final long lcF = lcId%4096; + this.setRes05((int)lcF); + this.setRes07(lcId - lcF); } } diff --git a/src/main/java/su/interference/core/Instance.java b/src/main/java/su/interference/core/Instance.java index ead02ba..ddc119f 100644 --- a/src/main/java/su/interference/core/Instance.java +++ b/src/main/java/su/interference/core/Instance.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2020 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -39,6 +39,7 @@ this software and associated documentation files (the "Software"), to deal in import org.slf4j.LoggerFactory; import org.slf4j.Logger; import su.interference.transport.TransportContext; +import su.interference.transport.TransportSyncTask; /** * @author Yuriy Glotanov @@ -48,7 +49,7 @@ this software and associated documentation files (the "Software"), to deal in public class Instance implements Interference { public static final String RELEASE = "2021.1"; - public static final int SYSTEM_VERSION = 20210201; + public static final int SYSTEM_VERSION = 20210412; public static final String DATA_FILE = "datafile"; public static final String INDX_FILE = "indxfile"; @@ -578,6 +579,19 @@ public FrameData getFrameById (long id) { return null; } + public FrameData getFrameByIdForUpdate (long id, LLT llt) { + final Table t = getTableByName("su.interference.persistent.FrameData"); + final MapField ixf = t.getMapFieldByColumn("frameId"); + final Map ixl = ixf.getMap(); + final DataChunk dc = (DataChunk)ixl.get(id); + if (dc != null) { + final FrameData bd = (FrameData) dc.getEntity(); + llt.add(bd); + return bd; + } + return null; + } + public FrameData getFrameByAllocId (long id) { final Table t = getTableByName("su.interference.persistent.FrameData"); final MapField ixf = t.getMapFieldByColumn("allocId"); @@ -647,46 +661,26 @@ public FreeFrame getFreeFrameById (long id) { return (FreeFrame)dc.getEntity(); } - @Deprecated - public ArrayList getSyncFrames(int nodeId, int amount, boolean bulk) { + public ArrayList getSyncFrames(int nodeId) { final Table t = getTableByName("su.interference.persistent.FrameSync"); final ArrayList r = new ArrayList<>(); - for (Object o : t.getIndexFieldByColumn("nodeId").getIndex().getObjectsByKey(nodeId, amount)) { - r.add((FrameSync)((DataChunk)o).getEntity()); - } - return r; - } - - public ArrayList getSyncFrames(int nodeId, int amount) { - final Table t = getTableByName("su.interference.persistent.FrameSync"); - final ArrayList r = new ArrayList<>(); - for (Object o : t.getIndexFieldByColumn("syncId").getIndex().getContent(amount)) { + String uuid = null; + for (Object o : t.getIndexFieldByColumn("syncId").getIndex().getContent(TransportSyncTask.REMOTE_SYNC_DEFERRED_AMOUNT)) { final FrameSync fs = (FrameSync)((DataChunk)o).getEntity(); if (fs.getNodeId() == nodeId) { - r.add(fs); + if (uuid == null) { + uuid = fs.getSyncUUID(); + r.add(fs); + } else { + if (uuid.equals(fs.getSyncUUID())) { + r.add(fs); + } + } } } return r; } - public ArrayList getSyncFramesByUUID(String UUID) { - final Table t = getTableByName("su.interference.persistent.FrameSync"); - final ArrayList r = new ArrayList<>(); - for (Object o : t.getIndexFieldByColumn("syncUUID").getIndex().getObjectsByKey(UUID)) { - r.add((FrameSync)((DataChunk)o).getEntity()); - } - return r; - } - - public ArrayList getSyncFramesById(long frameId) { - final Table t = getTableByName("su.interference.persistent.FrameSync"); - final ArrayList r = new ArrayList<>(); - for (Object o : t.getIndexFieldByColumn("frameId").getIndex().getObjectsByKey(frameId)) { - r.add((FrameSync)((DataChunk)o).getEntity()); - } - return r; - } - public synchronized Process getProcessByName (String name) { final Table t = getTableByName("su.interference.persistent.Process"); return (Process)((DataChunk)t.getIndexFieldByColumn("processName").getIndex().getObjectByKey(name)).getEntity(); diff --git a/src/main/java/su/interference/core/LLT.java b/src/main/java/su/interference/core/LLT.java index ab9727f..76b2073 100644 --- a/src/main/java/su/interference/core/LLT.java +++ b/src/main/java/su/interference/core/LLT.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -26,6 +26,7 @@ this software and associated documentation files (the "Software"), to deal in import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import su.interference.persistent.FrameData; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -43,7 +44,7 @@ public class LLT { private static final AtomicLong sync = new AtomicLong(); private static final ReentrantLock rlck = new ReentrantLock(); private static final ConcurrentHashMap pool = new ConcurrentHashMap(); - private static final ConcurrentHashMap frames = new ConcurrentHashMap(); + private static final ConcurrentHashMap frames = new ConcurrentHashMap<>(); private final static Logger logger = LoggerFactory.getLogger(LLT.class); private final boolean lock; private final long id; @@ -104,16 +105,21 @@ private static boolean poolNotEmpty() { } public void add(Frame b) { - b.getFrameData().setSynced(false); - frames.put(b.getFrameData().getFrameId(), b); + b.getFrameData().setUnsynced(); + frames.put(b.getFrameData().getFrameId(), b.getFrameData()); } - public void commit() { + public void add(FrameData b) { + b.setUnsynced(); + frames.put(b.getFrameId(), b); + } + + public void commit() throws Exception { pool.remove(this.id); if (this.lock) { - for (Map.Entry entry : frames.entrySet()) { - entry.getValue().getFrameData().setSynced(true); - entry.getValue().clearSnaps(this.id); + for (Map.Entry entry : frames.entrySet()) { + entry.getValue().setSynced(); + entry.getValue().getFrame().clearSnaps(this.id); } frames.clear(); sync.compareAndSet(this.id, 0); @@ -123,7 +129,7 @@ public void commit() { } } - public static ConcurrentHashMap getFrames() { + public static ConcurrentHashMap getFrames() { return frames; } diff --git a/src/main/java/su/interference/core/Storage.java b/src/main/java/su/interference/core/Storage.java index 90ba607..220f659 100644 --- a/src/main/java/su/interference/core/Storage.java +++ b/src/main/java/su/interference/core/Storage.java @@ -349,7 +349,7 @@ public void openDataFiles () throws Exception { } } - public void closeDataFiles () throws IOException, InvalidFrame, ClassNotFoundException, InstantiationException, IllegalAccessException { + public void closeDataFiles () throws IOException, InvalidFrame { for (Map.Entry e : ifs.entrySet()) { if (((DataFile)e.getValue()).isOpen()) { ((DataFile)e.getValue()).closeFile(); @@ -357,7 +357,7 @@ public void closeDataFiles () throws IOException, InvalidFrame, ClassNotFoundExc } } - public DataFile getInitFile() throws ClassNotFoundException, InternalException, IllegalAccessException, InstantiationException { + public DataFile getInitFile() throws InternalException { for (Map.Entry e : ifs.entrySet()) { if (((DataFile)e.getValue()).getFileId()==INITFILE_ID) { if (!((DataFile)e.getValue()).isOpen()) { @@ -369,7 +369,7 @@ public DataFile getInitFile() throws ClassNotFoundException, InternalException, return null; } - public int openStorage (DataFile[] files) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException { + public int openStorage (DataFile[] files) throws InternalException { if (state==STORAGE_STATE_OPEN) { return state; } @@ -456,17 +456,17 @@ public int writeFrame (Frame b) throws IOException, InternalException, ClassNotF } public int writeFrame (byte[] b, long frameId) throws IOException, InternalException, ClassNotFoundException, IllegalAccessException, InstantiationException { - final int file = (int)frameId%4096; - final long ptr = frameId - (frameId%4096); - final DataFile f = getDataFileById(file); + final long file = frameId%4096; + final long ptr = frameId - file; + final DataFile f = getDataFileById((int)file); f.writeFrame(ptr, b); return 0; } public int writeFrameWithBackup (byte[] b, long frameId) throws IOException, InternalException, ClassNotFoundException, IllegalAccessException, InstantiationException { - final int file = (int)frameId%4096; - final long ptr = frameId - (frameId%4096); - final DataFile f = getDataFileById(file); + final long file = frameId%4096; + final long ptr = frameId - file; + final DataFile f = getDataFileById((int)file); final byte[] bb = f.readDataFromPtr(ptr, b.length); if (bb!=null) { jrnFile.seek(jrnptr.getAndAdd(bb.length)); @@ -476,7 +476,7 @@ public int writeFrameWithBackup (byte[] b, long frameId) throws IOException, Int return 0; } - public int writeFrameWithBackup (final DataFile f, final byte[] b, final long ptr) throws IOException, InternalException, ClassNotFoundException, IllegalAccessException, InstantiationException { + public int writeFrameWithBackup (final DataFile f, final byte[] b, final long ptr) throws IOException, InternalException { final byte[] bb = f.readDataFromPtr(ptr, b.length); if (bb != null) { jrnFile.seek(jrnptr.getAndAdd(bb.length)); diff --git a/src/main/java/su/interference/core/SyncFrame.java b/src/main/java/su/interference/core/SyncFrame.java index f4bfdb6..899754c 100644 --- a/src/main/java/su/interference/core/SyncFrame.java +++ b/src/main/java/su/interference/core/SyncFrame.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2020 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -117,12 +117,10 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb, boolean proc) throws Exce lcId = 0; this.frameType = t.getName().equals("su.interference.persistent.UndoChunk") ? 99 : frame.getType(); } else if (frame.getClass().getName().equals("su.interference.core.IndexFrame")) { - if (frame.getType()==0|| frame.getType()>2) { + if (frame.getType()==0 || frame.getType()>2) { throw new InternalException(); } - final int fileId = (int) frame.getPtr()%4096; - final long ptr = frame.getPtr() - frame.getPtr()%4096; - started = t.getFileStart() == fileId && t.getFrameStart() == ptr; + started = t.getFileStart() == frame.getFile() && t.getFrameStart() == frame.getPointer(); final IndexFrame ib = (IndexFrame) frame; imap = allowR ? ib.getAllocateMap() : null; prevId = 0; @@ -161,6 +159,14 @@ public long getAllocId() { return allocId; } + public long getAllocFile() { + return this.allocId%4096; + } + + public long getAllocPtr() { + return this.allocId - (this.allocId%4096); + } + public int getFileType() { return fileType; } @@ -169,8 +175,8 @@ public int getFrameType() { return frameType; } - public int getFile() { - return (int)frameId%4096; + public long getFile() { + return frameId%4096; } public long getPointer() { diff --git a/src/main/java/su/interference/core/SyncQueue.java b/src/main/java/su/interference/core/SyncQueue.java index 000ac0b..9a5bb50 100644 --- a/src/main/java/su/interference/core/SyncQueue.java +++ b/src/main/java/su/interference/core/SyncQueue.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -49,77 +49,80 @@ public class SyncQueue implements Runnable, ManagedProcess { private final ExecutorService pool2 = Executors.newFixedThreadPool(1); CountDownLatch latch; private final static Logger logger = LoggerFactory.getLogger(SyncQueue.class); + protected final static Object synclock = new Object(); public SyncQueue() { } private synchronized boolean syncFramesFromQueue() throws Exception { - if (running) { - return false; - } - running = true; - final LLT llt = LLT.getLLTAndLock(); - final int famt = Storage.getStorage().getFiles()==null?0:Storage.getStorage().getFiles().size(); - logger.debug("sync procedure was started with frames amount=" + LLT.getFrames().size()); - - final ArrayList frames = new ArrayList<>(); - final Map> frames_ = new HashMap<>(); - final ArrayList fframes = new ArrayList<>(); - final Session s = Session.getDntmSession(); - - for (Map.Entry entry : LLT.getFrames().entrySet()) { - FreeFrame fb = null; - try { - final Frame f = entry.getValue(); - frames.add(new SyncFrame(f, s, fb)); - if (f.isLocal()) { - if (frames_.get(f.getObjectId()) == null) { - frames_.put(f.getObjectId(), new ArrayList<>()); + synchronized (synclock) { + if (running) { + return false; + } + running = true; + final LLT llt = LLT.getLLTAndLock(); + final int famt = Storage.getStorage().getFiles() == null ? 0 : Storage.getStorage().getFiles().size(); + logger.debug("sync procedure was started with frames amount=" + LLT.getFrames().size()); + + final ArrayList frames = new ArrayList<>(); + final Map> frames_ = new HashMap<>(); + final ArrayList fframes = new ArrayList<>(); + final Session s = Session.getDntmSession(); + + for (Map.Entry entry : LLT.getFrames().entrySet()) { + FreeFrame fb = null; + try { + final Frame f = entry.getValue().getFrame(); + frames.add(new SyncFrame(f, s, fb)); + if (f.isLocal()) { + if (frames_.get(f.getObjectId()) == null) { + frames_.put(f.getObjectId(), new ArrayList<>()); + } + frames_.get(f.getObjectId()).add(f.getFrameData()); } - frames_.get(f.getObjectId()).add(f.getFrameData()); + } catch (MissingSyncFrameException e) { + logger.debug("Unable to sync frame " + ((FrameData) entry.getValue()).getPtr() + " because removed by freeing"); + } + if (fb != null) { + fframes.add(fb); } - } catch (MissingSyncFrameException e) { - logger.debug("Unable to sync frame "+((Frame) entry.getValue()).getPtr()+" because removed by freeing"); - } - if (fb!=null) { - fframes.add(fb); } - } - for (Map.Entry> entry: frames_.entrySet()) { - SQLCursor.addStreamFrame(new ContainerFrame(entry.getKey(), entry.getValue())); - } + for (Map.Entry> entry : frames_.entrySet()) { + SQLCursor.addStreamFrame(new ContainerFrame(entry.getKey(), entry.getValue())); + } - SyncTask[] tasklist = new SyncTask[famt]; + SyncTask[] tasklist = new SyncTask[famt]; - int cnt = 0; - for (Map.Entry e : Storage.getStorage().getFiles().entrySet()) { - tasklist[cnt] = new SyncTask((DataFile)e.getValue()); - cnt++; - } + int cnt = 0; + for (Map.Entry e : Storage.getStorage().getFiles().entrySet()) { + tasklist[cnt] = new SyncTask((DataFile) e.getValue()); + cnt++; + } - for (SyncTask task : tasklist) { - for (SyncFrame bd : frames) { - if (task.getDataFile().getFileId()==bd.getFile()) { - task.add(bd); + for (SyncTask task : tasklist) { + for (SyncFrame bd : frames) { + if (task.getDataFile().getFileId() == (int) bd.getFile()) { + task.add(bd); + } } } - } - long t1 = System.currentTimeMillis(); - pool.invokeAll(Arrays.asList(tasklist)); - long t2 = System.currentTimeMillis(); - logger.info("sync procedure was completed in "+(t2-t1)+"ms"); - llt.commit(); - Storage.getStorage().clearJournal(); - for (FreeFrame fb : fframes) { - s.persist(fb); - } - //todo async process must depends from stop() method - pool2.submit(new TransportSyncTask(frames)); + long t1 = System.currentTimeMillis(); + pool.invokeAll(Arrays.asList(tasklist)); + long t2 = System.currentTimeMillis(); + logger.info("sync procedure was completed in " + (t2 - t1) + "ms"); + llt.commit(); + Storage.getStorage().clearJournal(); + for (FreeFrame fb : fframes) { + s.persist(fb); + } + //todo async process must depends from stop() method + pool2.submit(new TransportSyncTask(frames)); - running = false; - return true; + running = false; + return true; + } } public void commit() throws Exception { diff --git a/src/main/java/su/interference/core/SystemCleanUp.java b/src/main/java/su/interference/core/SystemCleanUp.java index e0bafc5..2415ba3 100644 --- a/src/main/java/su/interference/core/SystemCleanUp.java +++ b/src/main/java/su/interference/core/SystemCleanUp.java @@ -74,76 +74,77 @@ public void stop() throws InterruptedException{ } } - private void cleanUpFrames() throws Exception { - Metrics.get("systemCleanUp").start(); - int i = 0; - int d = 0; - int x = 0; - int xn = 0; - int xall = 0; - int u = 0; - int i_ = 0; - int d_ = 0; - int x_ = 0; - int u_ = 0; - for (Object entry : Instance.getInstance().getFramesMap().entrySet()) { - final FrameData f = (FrameData) ((DataChunk) ((Map.Entry) entry).getValue()).getEntity(); - final long frameAmount = f.getDataObject().getFrameAmount(); - if (f.getDataFile().isData() && cleanupDataEnabled()) { - f.decreasePriority(); - if (f.isSynced() && f.getObjectId() > 999 && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) { - if (f.clearFrame()) { - d++; + private void cleanUpFrames() { + synchronized (SyncQueue.synclock) { + Metrics.get("systemCleanUp").start(); + int i = 0; + int d = 0; + int x = 0; + int xn = 0; + int xall = 0; + int u = 0; + int i_ = 0; + int d_ = 0; + int x_ = 0; + int u_ = 0; + for (Object entry : Instance.getInstance().getFramesMap().entrySet()) { + final FrameData f = (FrameData) ((DataChunk) ((Map.Entry) entry).getValue()).getEntity(); + final long frameAmount = f.getDataObject().getFrameAmount(); + if (f.getDataFile().isData() && cleanupDataEnabled()) { + f.decreasePriority(); + if (f.isSynced() && f.getObjectId() > 999 && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) { + if (f.clearFrame()) { + d++; + } } - } - if (f.isFrame()) { - d_++; - } - } - if (f.getDataFile().isIndex() && cleanupIndxEnabled()) { - f.decreasePriority(); - xall++; - if (f.isSynced() && f.getFrameType() == IndexFrame.INDEX_FRAME_NODE) { - xn++; - } - if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && !f.isRbck() && frameAmount > Config.getConfig().IX_CLEANUP_PROTECTION_THR) { - if (f.clearFrame()) { - x++; + if (f.isFrame()) { + d_++; } } - if (f.isFrame()) { - x_++; - } - } - if (f.getDataFile().isTemp() && cleanupTempEnabled()) { - if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) { - if (f.clearFrame()) { - i++; + if (f.getDataFile().isIndex() && cleanupIndxEnabled()) { + f.decreasePriority(); + xall++; + if (f.isSynced() && f.getFrameType() == IndexFrame.INDEX_FRAME_NODE) { + xn++; + } + if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && !f.isRbck() && frameAmount > Config.getConfig().IX_CLEANUP_PROTECTION_THR) { + if (f.clearFrame()) { + x++; + } + } + if (f.isFrame()) { + x_++; } } - if (f.isFrame()) { - i_++; - } - } - if (f.getDataFile().isUndo() && cleanupUndoEnabled()) { - if (f.isSynced() && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) { - if (f.clearFrame()) { - u++; + if (f.getDataFile().isTemp() && cleanupTempEnabled()) { + if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) { + if (f.clearFrame()) { + i++; + } + } + if (f.isFrame()) { + i_++; } } - if (f.isFrame()) { - u_++; + if (f.getDataFile().isUndo() && cleanupUndoEnabled()) { + if (f.isSynced() && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) { + if (f.clearFrame()) { + u++; + } + } + if (f.isFrame()) { + u_++; + } } } + Metrics.get("сleanUpDataFrames").put(d); + Metrics.get("сleanUpIndexFrames").put(x); + Metrics.get("сleanUpUndoFrames").put(u); + Metrics.get("imDataFrames").put(d_); + Metrics.get("imIndexFrames").put(x_); + Metrics.get("imUndoFrames").put(u_); + Metrics.get("systemCleanUp").stop(); } - Metrics.get("сleanUpDataFrames").put(d); - Metrics.get("сleanUpIndexFrames").put(x); - Metrics.get("сleanUpUndoFrames").put(u); - Metrics.get("imDataFrames").put(d_); - Metrics.get("imIndexFrames").put(x_); - Metrics.get("imUndoFrames").put(u_); - Metrics.get("systemCleanUp").stop(); - } public static void forceCleanUp() { diff --git a/src/main/java/su/interference/persistent/DataFile.java b/src/main/java/su/interference/persistent/DataFile.java index 6a9bb50..20efa95 100644 --- a/src/main/java/su/interference/persistent/DataFile.java +++ b/src/main/java/su/interference/persistent/DataFile.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2020 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -491,8 +491,10 @@ public synchronized void writeFrame(final long ptr, final byte[] b) throws IOExc logger.error("Wrong write frame operation with file = " + this.file + " ptr = " + ptr + ", internal file = " + file_ + " ptr = " + ptr_); } - this.file.seek(ptr); - this.file.write(b); + synchronized (this) { + this.file.seek(ptr); + this.file.write(b); + } } public void writeFrame(FrameData bd, final long ptr, final byte[] b, LLT llt, Session s) throws Exception { @@ -508,6 +510,7 @@ public void writeFrame(FrameData bd, final long ptr, final byte[] b, LLT llt, Se if (ptr != ptr_) { logger.error("Wrong write frame operation with file = " + this.file + " ptr = " + ptr + ", internal file = " + file_ + " ptr = " + ptr_); } + synchronized (this) { this.file.seek(ptr); this.file.write(b); diff --git a/src/main/java/su/interference/persistent/FrameData.java b/src/main/java/su/interference/persistent/FrameData.java index 18ec278..8c88e66 100644 --- a/src/main/java/su/interference/persistent/FrameData.java +++ b/src/main/java/su/interference/persistent/FrameData.java @@ -37,6 +37,7 @@ this software and associated documentation files (the "Software"), to deal in import java.io.Serializable; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -105,7 +106,7 @@ public class FrameData implements Serializable, Comparable, FrameApi, FilePartit @Transient private AtomicInteger priority = new AtomicInteger(2); @Transient - private volatile boolean synced = true; + private AtomicBoolean synced = new AtomicBoolean(true); @Transient private volatile boolean rbck; @Transient @@ -208,6 +209,10 @@ public Frame getFrame() throws Exception { return frame; } + public synchronized Chunk getChunkByPtr(int ptr) throws Exception { + return getFrame().getChunkByPtr(ptr); + } + public int getFrameType() { return frameType; } @@ -236,6 +241,14 @@ public long getAllocId() { return this.allocId; } + public long getAllocFile() { + return this.allocId%4096; + } + + public long getAllocPtr() { + return this.allocId - (this.allocId%4096); + } + public void setAllocId(long allocId) { this.allocId = allocId; } @@ -250,7 +263,7 @@ public long getNextFrameId() { //real current transactional value //skip negative differences for prevent frame oversize when many transactions change data - public int getFrameUsed() { + public synchronized int getFrameUsed() { int tdiff = 0; for (Map.Entry> entry : tcounter.entrySet()) { for (Map.Entry entry_ : entry.getValue().entrySet()) { @@ -438,7 +451,7 @@ public void setNextFrame(long nextFrame) { } public synchronized boolean clearFrame() { - if (this.frame != null) { + if (this.frame != null && this.isSynced()) { this.frame.cleanUpIcs(); this.frame = null; return true; @@ -454,7 +467,7 @@ public void setEntityClass(Class entityClass) { this.entityClass = entityClass; } - public boolean isFrameBusy() { + public synchronized boolean isFrameBusy() { for (Map.Entry> entry : tcounter.entrySet()) { for (Map.Entry entry_ : entry.getValue().entrySet()) { if (entry_.getValue().getCframeId() == getFrameId()) { @@ -476,8 +489,8 @@ public void decreaseTcounter(long id) { tcounter.remove(id); } - public Map> getLiveUFrameAllocIds() { - Map> uframes = new HashMap<>(); + public synchronized Map> getLiveUFrameAllocIds() { + final Map> uframes = new HashMap<>(); for (Map.Entry> entry : tcounter.entrySet()) { uframes.put(entry.getKey(), new ArrayList<>()); for (Map.Entry entry_ : entry.getValue().entrySet()) { @@ -493,7 +506,7 @@ public Map> getLiveUFrameAllocIds() { return uframes; } - public void updateTCounter(long transId, List ulist) { + public synchronized void updateTCounter(long transId, List ulist) { if (this.frameId == this.allocId) { throw new RuntimeException("cannot update tcounter for local frame"); } @@ -503,6 +516,10 @@ public void updateTCounter(long transId, List ulist) { } } + public synchronized void rollbackTransaction(Transaction tran, ArrayList ubs, Session s) throws Exception { + this.getFrame().rollbackTransaction(tran, ubs, s); + } + public int getPriority() { return priority.get(); } @@ -514,11 +531,15 @@ public void decreasePriority() { } public boolean isSynced() { - return synced; + return synced.get(); + } + + public void setSynced() { + this.synced.set(true); } - public void setSynced(boolean synced) { - this.synced = synced; + public void setUnsynced() { + this.synced.set(false); } public boolean isRbck() { diff --git a/src/main/java/su/interference/persistent/FrameSync.java b/src/main/java/su/interference/persistent/FrameSync.java index 45c24d4..41ab808 100644 --- a/src/main/java/su/interference/persistent/FrameSync.java +++ b/src/main/java/su/interference/persistent/FrameSync.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2020 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -50,19 +50,15 @@ public class FrameSync implements Comparable { @MgmtColumn(width=20, show=true, form=false, edit=false) private long syncId; @Column - @IndexColumn @MgmtColumn(width=20, show=true, form=false, edit=false) private long allocId; @Column - @IndexColumn @MgmtColumn(width=20, show=true, form=false, edit=false) private int nodeId; @Column - @IndexColumn @MgmtColumn(width=20, show=true, form=false, edit=false) private String syncUUID; @Column - @IndexColumn @MgmtColumn(width=20, show=true, form=false, edit=false) private long frameId; @Transient diff --git a/src/main/java/su/interference/persistent/FreeFrame.java b/src/main/java/su/interference/persistent/FreeFrame.java index cff06e7..a67d5bc 100644 --- a/src/main/java/su/interference/persistent/FreeFrame.java +++ b/src/main/java/su/interference/persistent/FreeFrame.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -80,9 +80,10 @@ public FreeFrame() { public FreeFrame(int objectId, long frame, int size) { this.objectId = objectId; - this.frameId = frame; - this.size = size; - this.fileId = (int)frame%4096; + this.frameId = frame; + this.size = size; + final long file_ = frame%4096; + this.fileId = (int) file_; } public int compareTo(Object obj) { @@ -123,11 +124,11 @@ public int hashCode() { } public int getFile() { - return (int)this.frameId%4096; + return this.fileId; } public long getPtr() { - return this.frameId - (this.frameId%4096); + return this.frameId - this.fileId; } public long getFrameId() { diff --git a/src/main/java/su/interference/persistent/Table.java b/src/main/java/su/interference/persistent/Table.java index b568766..1caf447 100644 --- a/src/main/java/su/interference/persistent/Table.java +++ b/src/main/java/su/interference/persistent/Table.java @@ -1036,7 +1036,7 @@ public void usedSpace (final FrameData bd, final int used, final boolean persist try { if (persist) { final Table t = Instance.getInstance().getTableByName(FrameData.class.getName()); - final DataChunk dc = t.getChunkByEntity(bd, s); + final DataChunk dc = t.getChunkByEntity(bd, s, llt); final FrameData bd_ = Instance.getInstance().getFrameById(dc.getHeader().getRowID().getFileId() + dc.getHeader().getRowID().getFramePointer()); bd_.updateChunk(dc, bd, s, llt); } @@ -1120,7 +1120,7 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) } Metrics.get("persistGetChunk").start(); - final DataChunk dc = isIdFieldNoCheck() ? null : this.getChunkByEntity(o, s); + final DataChunk dc = isIdFieldNoCheck() ? null : this.getChunkByEntity(o, s, null); Metrics.get("persistGetChunk").stop(); if (dc == null) { @@ -1217,7 +1217,7 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) } Metrics.get("persistGetChunk").start(); - final DataChunk dc = isIdFieldNoCheck() ? null : this.getChunkByEntity(o, s); + final DataChunk dc = isIdFieldNoCheck() ? null : this.getChunkByEntity(o, s, llt); Metrics.get("persistGetChunk").stop(); if (dc == null) { @@ -1262,7 +1262,7 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) final int diff = newlen - len - bd.getFrameFree(); if (diff > 0) { - lockIndexes(dc, s, llt); + final List ics = lockIndexes(dc, s, llt); bd.removeChunk(dc.getHeader().getRowID().getRowPointer(), s, llt); final WaitFrame ibw = getAvailableFrame(o, fpart); final FrameData ib = ibw.getBd(); @@ -1279,7 +1279,7 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) s.getTransaction().storeFrame(ib, udc == null ? null : udc.getUframe(), newlen, s, llt); s.getTransaction().storeFrame(ib, newlen, s, llt); } - updateIndexesPtr(dc, s, llt); + updateIndexesPtr(ics, dc); //update rowid ((EntityContainer) o).setRowId(dc.getHeader().getRowID()); dc.getUndoChunk().setFile(dc.getHeader().getRowID().getFileId()); @@ -1324,7 +1324,8 @@ protected void delete (final Object o, final Session s, LLT extllt, boolean igno } } - final DataChunk dc = this.getChunkByEntity(o, s); + final LLT llt = extllt==null?LLT.getLLT():extllt; + final DataChunk dc = this.getChunkByEntity(o, s, llt); if (dc == null) { throw new CannotAccessToDeletedRecord(); } @@ -1333,7 +1334,6 @@ protected void delete (final Object o, final Session s, LLT extllt, boolean igno } final int len = dc.getBytesAmount(); final FrameData bd = Instance.getInstance().getFrameById(dc.getHeader().getRowID().getFileId()+dc.getHeader().getRowID().getFramePointer()); - final LLT llt = extllt==null?LLT.getLLT():extllt; DataChunk udc = null; @@ -1395,21 +1395,19 @@ private void deleteIndexes(DataChunk dc, boolean noTran, boolean remove, Session } } - private void lockIndexes(DataChunk dc, Session s, LLT llt) throws Exception { + private List lockIndexes(DataChunk dc, Session s, LLT llt) throws Exception { + final List res = new ArrayList<>(); for (IndexDescript ids : this.getIndexNames()) { - final DataChunk ic = dc.getIc(ids, s); - final int iclen = ic.getBytesAmount(); - final FrameData ibd = Instance.getInstance().getFrameById(ic.getHeader().getRowID().getFileId() + ic.getHeader().getRowID().getFramePointer()); + final DataChunk ic = dc.getIcForUpdate(ids, s, llt); final DataChunk udc = ic.lock(s, llt); - s.getTransaction().storeFrame(ibd, udc == null ? null : udc.getUframe(),0 - iclen, s, llt); + s.getTransaction().storeFrame(ic.getFrameData(), udc == null ? null : udc.getUframe(),0 - ic.getBytesAmount(), s, llt); + res.add(ic); } + return res; } - private void updateIndexesPtr(DataChunk dc, Session s, LLT llt) throws Exception { - for (IndexDescript ids : this.getIndexNames()) { - final DataChunk ic = dc.getIc(ids, s); - final FrameData xf = Instance.getInstance().getFrameById(ic.getHeader().getRowID().getFileId() + ic.getHeader().getRowID().getFramePointer()); - llt.add(xf.getFrame()); + private void updateIndexesPtr(List ics, DataChunk dc) { + for (DataChunk ic : ics) { final IndexChunk ic_ = (IndexChunk) ic.getEntity(); ic_.setFramePtrRowId(dc.getHeader().getRowID()); ic_.setDataChunk(dc); @@ -1674,7 +1672,7 @@ protected ArrayList getStream (Map retrieved, Session s) return r; } - public DataChunk getChunkByEntity (Object o, Session s) throws Exception { + public DataChunk getChunkByEntity (Object o, Session s, LLT llt) throws Exception { final Class c = o.getClass(); final ResultSetEntity ca = (ResultSetEntity)c.getAnnotation(ResultSetEntity.class); if (ca!=null) { //ResultSet entities ALWAYS insert only, then, no neccessary for find datachunk @@ -1689,9 +1687,13 @@ public DataChunk getChunkByEntity (Object o, Session s) throws Exception { final MapField mf = this.getMapFieldByColumn(idf.getName()); final IndexField ix = this.getIndexFieldByColumn(idf.getName()); if (mf != null) { - return (DataChunk) mf.getMap().get(idmethod.invoke(o, null)); + final DataChunk dc = (DataChunk) mf.getMap().get(idmethod.invoke(o, null)); + if (llt != null) { llt.add(dc.getFrameData().getFrame()); } + return dc; } else if (ix != null) { - return (DataChunk) ix.getIndex().getObjectByKey(new IndexElementKey(new Object[]{idMethod.invoke(o, null)})); + final DataChunk dc = (DataChunk) ix.getIndex().getObjectByKey(new IndexElementKey(new Object[]{idMethod.invoke(o, null)})); + if (llt != null) { llt.add(dc.getFrameData().getFrame()); } + return dc; } else { final byte[] id = new DataChunkId(o, this, s).getIdBytes(); if (id!=null) { @@ -1704,6 +1706,7 @@ public DataChunk getChunkByEntity (Object o, Session s) throws Exception { } else { for (Chunk dc : b.getDataFrame().getFrameChunks(s)) { if (Arrays.equals(id, ((DataChunk) dc).getSerializedId(s))) { + if (llt != null) { llt.add(((DataChunk) dc).getFrameData().getFrame()); } return (DataChunk) dc; } } @@ -1723,7 +1726,7 @@ public DataChunk getChunkByEntity (Object o, Session s) throws Exception { return null; } final IndexChunk ibx = (IndexChunk) idc.getEntity(); - return ibx.getDataChunk(); + return ibx.getDataChunkForUpdate(llt); } else { final byte[] id = dcid.getIdBytes(); if (id != null) { @@ -1736,6 +1739,7 @@ public DataChunk getChunkByEntity (Object o, Session s) throws Exception { } else { for (Chunk dc : b.getDataFrame().getFrameChunks(s)) { if (Arrays.equals(id, ((DataChunk) dc).getSerializedId(s))) { + if (llt != null) { llt.add(((DataChunk) dc).getFrameData().getFrame()); } return (DataChunk) dc; } } @@ -1744,6 +1748,7 @@ public DataChunk getChunkByEntity (Object o, Session s) throws Exception { } } } else { + if (llt != null) { llt.add(to.getDataChunk().getFrameData().getFrame()); } return to.getDataChunk(); } } diff --git a/src/main/java/su/interference/persistent/TransFrame.java b/src/main/java/su/interference/persistent/TransFrame.java index 5d9cfe0..7f62819 100644 --- a/src/main/java/su/interference/persistent/TransFrame.java +++ b/src/main/java/su/interference/persistent/TransFrame.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -126,7 +126,11 @@ public TransFrame(DataChunk chunk) throws ClassNotFoundException, IllegalAccessE } public int getFile() { - return (int)this.cframeId%4096; + return (int) this.getFile_(); + } + + private long getFile_() { + return this.cframeId%4096; } public long getTransId() { diff --git a/src/main/java/su/interference/persistent/Transaction.java b/src/main/java/su/interference/persistent/Transaction.java index 9040afc..dd8cf2e 100644 --- a/src/main/java/su/interference/persistent/Transaction.java +++ b/src/main/java/su/interference/persistent/Transaction.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2020 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -152,7 +152,7 @@ public void createUndoFrames(Session s, LLT llt) throws Exception { final Table t = Instance.getInstance().getTableByName("su.interference.persistent.UndoChunk"); for (DataFile f : Storage.getStorage().getUndoFiles()) { final FrameData ub = t.createNewFrame(null, null, f.getFileId(), 0, 0, false, true, false, s, llt); - ub.setSynced(false); + ub.setUnsynced(); setNewLB(null, ub); } } @@ -218,6 +218,7 @@ public synchronized void commit (Session s, boolean remote) { this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); this.transType = TRAN_THR; s.persist(this); + rframes.clear(); syncq.commit(); } catch (Exception e) { e.printStackTrace(); @@ -281,6 +282,7 @@ public synchronized void rollback (Session s, boolean remote) { this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); this.transType = TRAN_THR; s.persist(this); //update + rframes.clear(); /* if (this.join != null) { join.deallocate(s); @@ -320,10 +322,9 @@ public synchronized void rollback (Session s, boolean remote) { } } - final Frame frame_ = ub.getFrame(); - if (frame_ instanceof IndexFrame) { + if (ub.isIndex()) { ub.setRbck(true); - frame_.rollbackTransaction(this, ubs, s); + ub.rollbackTransaction(this, ubs, s); } } for (FrameData ub : ubd1) { @@ -337,9 +338,8 @@ public synchronized void rollback (Session s, boolean remote) { } } - final Frame frame_ = ub.getFrame(); - if (frame_ instanceof DataFrame) { - frame_.rollbackTransaction(this, ubs, s); + if (!ub.isIndex()) { + ub.rollbackTransaction(this, ubs, s); } } for (FrameData ub : ubd2) { @@ -392,7 +392,7 @@ public void unlockUndoFrames (int objectId, Session s) throws InternalException if (this.getTransType()!=TRAN_THR) { throw new InternalException(); //ONLY FOR FIXED TRANSACTIONS } - final ArrayList fptr = new ArrayList(); + final ArrayList fptr = new ArrayList<>(); try { for (TransFrame tb : tframes) { if (tb.getObjectId()==objectId) { @@ -567,7 +567,8 @@ private void sendBroadcastEvents(int command, Session s) { } public int getNodeId() { - return (int)this.transId%Storage.MAX_NODES; + final long n = this.transId%Storage.MAX_NODES; + return (int)n; } public boolean isLocal() { diff --git a/src/main/java/su/interference/proxy/IOTProxyFactory.java b/src/main/java/su/interference/proxy/IOTProxyFactory.java index 460e027..c7e83a8 100644 --- a/src/main/java/su/interference/proxy/IOTProxyFactory.java +++ b/src/main/java/su/interference/proxy/IOTProxyFactory.java @@ -76,12 +76,15 @@ public synchronized Class register (Field[] cs, String name, String pname) throw sb.append("import javax.persistence.Column;\n"); sb.append("import javax.persistence.Transient;\n"); sb.append("import javax.persistence.GeneratedValue;\n"); + sb.append("import su.interference.core.Instance;\n"); sb.append("import su.interference.core.IndexEntity;\n"); sb.append("import su.interference.core.IndexChunk;\n"); sb.append("import su.interference.core.EntityContainer;\n"); sb.append("import su.interference.core.ResultSetEntity;\n"); + sb.append("import su.interference.core.LLT;\n"); sb.append("import su.interference.mgmt.MgmtColumn;\n"); sb.append("import su.interference.persistent.Transaction;\n"); + sb.append("import su.interference.persistent.FrameData;\n"); sb.append("\n"); sb.append("@Entity\n"); sb.append("@IndexEntity\n"); @@ -110,6 +113,16 @@ public synchronized Class register (Field[] cs, String name, String pname) throw sb.append(" }\n"); sb.append(" return dc;\n"); sb.append(" }\n"); + sb.append(" public su.interference.core.DataChunk getDataChunkForUpdate(LLT llt) {\n"); + sb.append(" final long framePtr = framePtrRowId.getFileId() + framePtrRowId.getFramePointer();\n"); + sb.append(" final FrameData bd = Instance.getInstance().getFrameByIdForUpdate(framePtr, llt);\n"); + sb.append(" try {\n"); + sb.append(" dc = (su.interference.core.DataChunk) bd.getChunkByPtr(framePtrRowId.getRowPointer());\n"); + sb.append(" } catch (java.lang.Exception e) {\n"); + sb.append(" throw new java.lang.RuntimeException(e);\n"); + sb.append(" }\n"); + sb.append(" return dc;\n"); + sb.append(" }\n"); sb.append(" public boolean getReceived() { return received; }\n"); sb.append(" public void setReceived(boolean received) { this.received = received; }\n"); sb.append(" public Object getEntity(su.interference.persistent.Session s) {\n"); diff --git a/src/main/java/su/interference/proxy/RSProxyFactory.java b/src/main/java/su/interference/proxy/RSProxyFactory.java index f4df1a4..21da2ee 100644 --- a/src/main/java/su/interference/proxy/RSProxyFactory.java +++ b/src/main/java/su/interference/proxy/RSProxyFactory.java @@ -80,6 +80,7 @@ public synchronized Class register (List cs, String name, boolean ixf sb.append("import su.interference.core.ResultSetEntity;\n"); sb.append("import su.interference.core.IndexEntity;\n"); sb.append("import su.interference.core.DisableSync;\n"); + sb.append("import su.interference.core.LLT;\n"); sb.append("import su.interference.mgmt.MgmtColumn;\n"); sb.append("\n"); sb.append("@Entity\n"); @@ -106,6 +107,7 @@ public synchronized Class register (List cs, String name, boolean ixf sb.append(" @Transient\n"); sb.append(" public su.interference.core.DataChunk dc;\n"); sb.append(" public su.interference.core.DataChunk getDataChunk() { return dc; }\n"); + sb.append(" public su.interference.core.DataChunk getDataChunkForUpdate(LLT llt) { return dc; }\n"); sb.append(" public void setDataChunk(su.interference.core.DataChunk c) { dc = c; }\n"); sb.append(" public su.interference.core.RowId getFramePtrRowId() { return framePtrRowId; }\n"); sb.append(" public void setFramePtrRowId(su.interference.core.RowId r) { framePtrRowId = r; }\n"); diff --git a/src/main/java/su/interference/transport/SyncFrameEvent.java b/src/main/java/su/interference/transport/SyncFrameEvent.java index eec5034..e34c12d 100644 --- a/src/main/java/su/interference/transport/SyncFrameEvent.java +++ b/src/main/java/su/interference/transport/SyncFrameEvent.java @@ -86,7 +86,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { s.delete(bd); } else { if (bd == null) { - final int allocFileId = (int) b.getAllocId() % 4096; + final int allocFileId = (int) b.getAllocFile(); final int allocOrder = (allocFileId % Storage.MAX_NODES) % Config.getConfig().FILES_AMOUNT; ArrayList dfs = Instance.getInstance().getDataFilesByType(b.getFileType()); for (DataFile f : dfs) { @@ -128,18 +128,18 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { final Table t = Instance.getInstance().getTableByName(b.getClassName()); final long prevId_ = b.getPrevId() == 0 ? 0 : hmap.get(b.getPrevId()) != null ? hmap.get(b.getPrevId()) : Instance.getInstance().getFrameByAllocId(b.getPrevId()).getFrameId(); final long nextId_ = b.getNextId() == 0 ? 0 : hmap.get(b.getNextId()) != null ? hmap.get(b.getNextId()) : Instance.getInstance().getFrameByAllocId(b.getNextId()).getFrameId(); - final int prevF = (int) prevId_ % 4096; - final long prevB = prevId_ - prevId_ % 4096; - final int nextF = (int) nextId_ % 4096; - final long nextB = nextId_ - nextId_ % 4096; + final long prevF = prevId_ % 4096; + final long prevB = prevId_ - prevF; + final long nextF = nextId_ % 4096; + final long nextB = nextId_ - nextF; if (b.getBd() == null) { throw new InternalException(); } else { if (b.getFrameType() == 99) { Frame frame = new DataFrame(b.getBytes(), b.getBd().getFile(), b.getBd().getPtr(), b.getImap(), hmap, t, s); - frame.setRes01(prevF); - frame.setRes02(nextF); + frame.setRes01((int)prevF); + frame.setRes02((int)nextF); frame.setRes06(prevB); frame.setRes07(nextB); frame.setFrameData(b.getBd()); @@ -190,18 +190,18 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { final Table t = Instance.getInstance().getTableByName(b.getClassName()); final long prevId_ = b.getPrevId() == 0 ? 0 : hmap.get(b.getPrevId()) != null ? hmap.get(b.getPrevId()) : Instance.getInstance().getFrameByAllocId(b.getPrevId()).getFrameId(); final long nextId_ = b.getNextId() == 0 ? 0 : hmap.get(b.getNextId()) != null ? hmap.get(b.getNextId()) : Instance.getInstance().getFrameByAllocId(b.getNextId()).getFrameId(); - final int prevF = (int) prevId_ % 4096; - final long prevB = prevId_ - prevId_ % 4096; - final int nextF = (int) nextId_ % 4096; - final long nextB = nextId_ - nextId_ % 4096; + final long prevF = prevId_ % 4096; + final long prevB = prevId_ - prevF; + final long nextF = nextId_ % 4096; + final long nextB = nextId_ - nextF; if (b.getBd() == null) { throw new InternalException(); } else { if (b.getFrameType() == 0) { Frame frame = new DataFrame(b.getBytes(), b.getBd().getFile(), b.getBd().getPtr(), t); - frame.setRes01(prevF); - frame.setRes02(nextF); + frame.setRes01((int)prevF); + frame.setRes02((int)nextF); frame.setRes06(prevB); frame.setRes07(nextB); frame.setFrameData(b.getBd()); @@ -227,18 +227,18 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { final Table t = Instance.getInstance().getTableByName(b.getClassName()); final long parentId_ = b.getParentId() == 0 ? 0 : hmap.get(b.getParentId()) != null ? hmap.get(b.getParentId()) : Instance.getInstance().getFrameByAllocId(b.getParentId()).getFrameId(); final long lcId_ = b.getLcId() == 0 ? 0 : hmap.get(b.getLcId()) != null ? hmap.get(b.getLcId()) : Instance.getInstance().getFrameByAllocId(b.getLcId()).getFrameId(); - final int parentF = (int) parentId_ % 4096; - final long parentB = parentId_ - parentId_ % 4096; - final int lcF = (int) lcId_ % 4096; - final long lcB = lcId_ - lcId_ % 4096; + final long parentF = parentId_ % 4096; + final long parentB = parentId_ - parentF; + final long lcF = lcId_ % 4096; + final long lcB = lcId_ - lcF; if (b.getBd() == null) { throw new InternalException(); } else { if (b.getFrameType() == 1 || b.getFrameType() == 2) { IndexFrame frame = new IndexFrame(b.getBytes(), b.getBd().getFile(), b.getBd().getPtr(), b.getImap(), hmap, t); - frame.setRes04(parentF); - frame.setRes05(lcF); + frame.setRes04((int)parentF); + frame.setRes05((int)lcF); frame.setRes06(parentB); frame.setRes07(lcB); b.setRFrame(frame); @@ -268,6 +268,8 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { final Map> frames_ = new HashMap<>(); for (SyncFrame f : sb) { if (f.isAllowR() && f.isProc()) { + f.getBd().setSynced(); + final Table t = Instance.getInstance().getTableByName(f.getClassName()); if (frames_.get(t.getObjectId()) == null) { frames_.put(t.getObjectId(), new ArrayList<>()); diff --git a/src/main/java/su/interference/transport/TransportChannel.java b/src/main/java/su/interference/transport/TransportChannel.java index 5d09c3a..640d3fd 100644 --- a/src/main/java/su/interference/transport/TransportChannel.java +++ b/src/main/java/su/interference/transport/TransportChannel.java @@ -99,7 +99,9 @@ public void run() { oos.flush(); oos.reset(); transportMessage.setSendChannel(TransportChannel.this); - mmap.put(transportMessage.getUuid(), transportMessage); + if (transportMessage.getType() == TransportMessage.TRANSPORT_MESSAGE || transportMessage.getType() == TransportMessage.HEARTBEAT_MESSAGE) { + mmap.put(transportMessage.getUuid(), transportMessage); + } logger.debug("channel id = " + channelId + " sent " + transportMessage + " message with UUID: " + transportMessage.getUuid()); } else { if (transportMessage.getType() == TransportMessage.TRANSPORT_MESSAGE || transportMessage.getType() == TransportMessage.HEARTBEAT_MESSAGE) { diff --git a/src/main/java/su/interference/transport/TransportContext.java b/src/main/java/su/interference/transport/TransportContext.java index dc3447b..0c59ece 100644 --- a/src/main/java/su/interference/transport/TransportContext.java +++ b/src/main/java/su/interference/transport/TransportContext.java @@ -149,7 +149,7 @@ protected void onMessage(TransportMessage transportMessage, InetAddress inetAddr transportMessage.getTransportEvent().setCallbackNodeId(transportMessage.getSender()); final EventResult result = transportMessage.getTransportEvent().process(); TransportCallback transportCallback = new TransportCallback(Config.getConfig().LOCAL_NODE_ID, transportMessage.getUuid(), result); - sendCallback(transportMessage.getSender(), new TransportMessage(TransportMessage.CALLBACK_MESSAGE, Config.getConfig().LOCAL_NODE_ID, transportMessage.getTransportEvent(), transportCallback)); + sendCallback(transportMessage.getSender(), new TransportMessage(TransportMessage.CALLBACK_MESSAGE, Config.getConfig().LOCAL_NODE_ID, null, transportCallback)); logger.debug("callback sent with UUID: " + transportMessage.getUuid() + ", type = " + transportMessage.getTransportEvent().getClass()+", destination="+transportMessage.getSender()); } } @@ -182,7 +182,6 @@ private void sendCallback(int channelId, TransportMessage transportMessage) thro if (channel != null) { if (transportMessage.getType() == TransportMessage.CALLBACK_MESSAGE) { channel.send(transportMessage); - mmap.put(transportMessage.getUuid(), transportMessage); } } else { logger.error("Unable to find channel for channelId = "+channelId); diff --git a/src/main/java/su/interference/transport/TransportSyncTask.java b/src/main/java/su/interference/transport/TransportSyncTask.java index 242055e..1a0f70b 100644 --- a/src/main/java/su/interference/transport/TransportSyncTask.java +++ b/src/main/java/su/interference/transport/TransportSyncTask.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2020 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -43,7 +43,7 @@ this software and associated documentation files (the "Software"), to deal in public class TransportSyncTask implements Runnable { private final static Logger logger = LoggerFactory.getLogger(TransportSyncTask.class); - private final static int REMOTE_SYNC_DEFERRED_AMOUNT = 10000; + public final static int REMOTE_SYNC_DEFERRED_AMOUNT = 10000; private final ArrayList frames; private final Session s; @@ -62,13 +62,9 @@ public void run () { for (Map.Entry entry : HeartBeatProcess.channels.entrySet()) { final TransportChannel channel = entry.getValue(); if (channel.isStarted() && channel.isConnected()) { - final List lbs_ = Instance.getInstance().getSyncFrames(channel.getChannelId(), REMOTE_SYNC_DEFERRED_AMOUNT); - final String lbsUUID = lbs_.size() > 0 ? lbs_.get(0).getSyncUUID() : "NONE"; - if (lbs_.size() > 0) { - logger.debug("retrieve first framesync: " + lbs_.get(0)); - } - final List lbs = lbs_.size() == 0 ? new ArrayList<>() : Instance.getInstance().getSyncFramesByUUID(lbs_.get(0).getSyncUUID()); + final List lbs = Instance.getInstance().getSyncFrames(channel.getChannelId()); Collections.sort(lbs); + String lbsUUID = lbs.size() == 0 ? "NONE" : lbs.get(0).getSyncUUID(); logger.info(lbs.size() + " persisted sync frame(s) found (node id = " + channel.getChannelId() + ", UUID = " + lbsUUID + ")"); final List psb = new ArrayList<>(); for (FrameSync bs : lbs) {