diff --git a/config/properties b/config/properties index e8d6117..8774849 100644 --- a/config/properties +++ b/config/properties @@ -21,7 +21,7 @@ frame.size=8192 frame.size.ix=8192 # system code page -codepage=Cp1251 +codepage=UTF8 ########################################### # runtime parameters diff --git a/interference-2020.1.jar b/interference-2020.1.jar deleted file mode 100644 index d7865a9..0000000 Binary files a/interference-2020.1.jar and /dev/null differ diff --git a/interference-2020.1.jar.md5 b/interference-2020.1.jar.md5 deleted file mode 100644 index 84a210a..0000000 --- a/interference-2020.1.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -9d28e75b094559c9bb7ceb1fcc145b7f *interference-2020.1.jar \ No newline at end of file diff --git a/src/main/java/su/interference/core/DataChunk.java b/src/main/java/su/interference/core/DataChunk.java index 1262b42..55f7cac 100644 --- a/src/main/java/su/interference/core/DataChunk.java +++ b/src/main/java/su/interference/core/DataChunk.java @@ -320,7 +320,8 @@ public DataChunk (ValueSet vs, Session s, Table t) { } catch (Exception e) { e.printStackTrace(); } - header.setLen(getChunkLen()); + header.setLen(getChunk().length); + //header.setLen(getChunkLen()); } //serializer INSERT ONLY!!! (with generate Id value) @@ -332,7 +333,7 @@ public DataChunk (Object o, Session s) throws IOException, InvocationTargetExcep public DataChunk (Object o, Session s, RowId r) throws IOException, InvocationTargetException, NoSuchMethodException, InternalException, ClassNotFoundException, InstantiationException, IllegalAccessException { this.entity = o; this.state = NORMAL_STATE; - this.header = new RowHeader(r, null, getChunkLen(), false); + this.header = new RowHeader(r, null, getChunk().length, false); } public DataChunk (byte[] b, int file, long frame, int hsize, DataObject t, Class c) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException, MalformedURLException { @@ -411,6 +412,7 @@ public byte[] getChunk () { return null; } + //todo public int getChunkLen () { if (state == INIT_STATE) { return chunk.length; @@ -480,7 +482,8 @@ public Object getEntity () { } } if (t.isIndex()) { - final DataChunk dc = (DataChunk) Instance.getInstance().getChunkByPointer(this.getHeader().getFramePtr(), this.getHeader().getFramePtrRowId().getRowPointer()); + final ResultSetEntity rsa = (ResultSetEntity) ((Table) this.t).getTableClass().getAnnotation(ResultSetEntity.class); + final DataChunk dc = rsa == null ? (DataChunk) Instance.getInstance().getChunkByPointer(this.getHeader().getFramePtr(), this.getHeader().getFramePtrRowId().getRowPointer()) : this; ((IndexChunk)o).setDataChunk(dc); if (dc == null) { // todo during rframe.IndexFrame.init system directory not yet contains replicated FrameData objects @@ -541,6 +544,8 @@ public Object getEntity (Class c, Object[] params) { } catch (Exception e) { e.printStackTrace(); } + this.chunk = null; + this.state = NORMAL_STATE; return entity; } @@ -598,6 +603,12 @@ public void updateEntity(Object o) throws InternalException, ClassNotFoundExcept } } + protected void setNormalState() { + //set chunk to normal state after update + this.chunk = null; + this.state = NORMAL_STATE; + } + //for UNDO processing public DataChunk cloneEntity(Session s) throws IOException, InvocationTargetException, NoSuchMethodException, InternalException, ClassNotFoundException, InstantiationException, IllegalAccessException { final byte[] b = this.getChunk(); @@ -644,8 +655,8 @@ public void setHeader(Header header) { //lock mechanism private synchronized DataChunk insertUC (FrameData cb, UndoChunk uc, Session s, LLT llt) throws Exception { - final FrameData ub = s.getTransaction().getAvailableFrame(uc, true); - + final WaitFrame ubw = s.getTransaction().getAvailableFrame(uc, true); + final FrameData ub = ubw.getBd(); if (ub == null) { //s.getTransaction().createUndoFrames(s); //ub = s.getTransaction().getAvailableFrame(uc, true); @@ -665,7 +676,7 @@ private synchronized DataChunk insertUC (FrameData cb, UndoChunk uc, Session s, } else { s.getTransaction().storeFrame(cb, ub, 0, s, llt); } - ub.release(); + ubw.release(); return dc; } diff --git a/src/main/java/su/interference/core/Frame.java b/src/main/java/su/interference/core/Frame.java index 0b2da34..9b442b1 100644 --- a/src/main/java/su/interference/core/Frame.java +++ b/src/main/java/su/interference/core/Frame.java @@ -96,6 +96,10 @@ public FrameData getFrameData() { return frameData; } + public void setFrameData(FrameData frameData) { + this.frameData = frameData; + } + public DataObject getDataObject() { return dataObject; } @@ -271,10 +275,11 @@ public Frame(byte[] bb, int file, long pointer, int size, FrameData bd, DataObje this.res09 = bs.getIntFromBytes(92); if ((this.file==0)&&(this.pointer==0)) { + logger.error("empty frame header frameId = " + (bd == null ? "N/A" : bd.getFrameId()) + " allocId = " + (bd == null ? "N/A" : bd.getAllocId())); throw new EmptyFrameHeaderFound(); } if ((this.file!=file)||(this.pointer!=pointer)) { - logger.error("InvalidFrameHeader: " + this.file + ":" + file + " " + this.pointer + ":" + pointer); + logger.error("invalid frame header frameId = " + (bd == null ? "N/A" : bd.getFrameId()) + " allocId = " + (bd == null ? "N/A" : bd.getAllocId())); throw new InvalidFrameHeader(); } if (this.objectId<0) { @@ -296,8 +301,10 @@ public synchronized byte[] getFrame() throws InvalidFrame { if (Config.getConfig().SYNC_LOCK_ENABLE||sync==0) { for (Chunk c : data.getChunks()) { + final byte[] chunk_ = c.getChunk(); + c.getHeader().setLen(chunk_.length); res2.append(c.getHeader().getHeader()); - res2.append(c.getChunk()); + res2.append(chunk_); used = used + c.getBytesAmount(); } } else { @@ -306,8 +313,10 @@ public synchronized byte[] getFrame() throws InvalidFrame { } for (Chunk c : data.getChunks()) { if (c.getHeader().getLltId() < sync) { + final byte[] chunk_ = c.getChunk(); +// c.getHeader().setLen(chunk_.length); res2.append(c.getHeader().getHeader()); - res2.append(c.getChunk()); + res2.append(chunk_); used = used + c.getBytesAmount(); } } @@ -352,32 +361,6 @@ public synchronized byte[] getFrame() throws InvalidFrame { return res.getBytes(); } - public synchronized byte[] flushHeader() { - final ByteString res = new ByteString(); - final ByteString bs = new ByteString(this.b); - res.addBytesFromInt(this.file); - res.addBytesFromLong(this.pointer); - res.addBytesFromInt(this.objectId); - res.addBytesFromInt(this.type); - res.addBytesFromInt(this.cptr); - res.addBytesFromInt(this.bytesAmount); - res.addBytesFromInt(this.rowCntr); - res.addBytesFromInt(this.sptr); - res.addBytesFromInt(this.allocFile); - res.addBytesFromLong(this.allocPointer); - res.addBytesFromInt(this.res01); - res.addBytesFromInt(this.res02); - res.addBytesFromInt(this.res03); - res.addBytesFromInt(this.res04); - res.addBytesFromInt(this.res05); - res.addBytesFromLong(this.res06); - res.addBytesFromLong(this.res07); - res.addBytesFromLong(this.res08); - res.addBytesFromInt(this.res09); - res.append(bs.substring(FRAME_HEADER_SIZE, this.b.length)); - return res.getBytes(); - } - public boolean equals (Frame bl) { if ((this.file==bl.getFile())&&(this.pointer==bl.getPointer())) { return true; @@ -461,7 +444,7 @@ public synchronized int updateChunk(DataChunk chunk, Object o, Session s, LLT ll if (chunk.getEntity()!=o) { //not same object chunk.updateEntity(o); } - + chunk.setNormalState(); final byte[] ncb = chunk.getChunk(); chunk.getHeader().setLen(ncb.length); final int newlen = chunk.getBytesAmount(); @@ -652,11 +635,6 @@ public HashMap getLiveTransactions() { return rtran; } - public ArrayList getLiveTransFrames() { - final long ptr = this.file + this.pointer; - return Instance.getInstance().getTransFrames(ptr); - } - public int getBytesAmount() { int used = FRAME_HEADER_SIZE; final int size = data.getChunks().size(); diff --git a/src/main/java/su/interference/core/IndexElement.java b/src/main/java/su/interference/core/IndexElement.java index 4992cae..8671332 100644 --- a/src/main/java/su/interference/core/IndexElement.java +++ b/src/main/java/su/interference/core/IndexElement.java @@ -64,8 +64,4 @@ public void setElement(Object element) { this.element = element; } - public String ElementToString () { - return "" + this.element; - } - } diff --git a/src/main/java/su/interference/core/IndexElementKey.java b/src/main/java/su/interference/core/IndexElementKey.java index 88e62e2..5080469 100644 --- a/src/main/java/su/interference/core/IndexElementKey.java +++ b/src/main/java/su/interference/core/IndexElementKey.java @@ -76,14 +76,6 @@ public boolean equals(final Object obj) { return false; } - public String keyToString() { - String res = ""; - for (int i=0; i getProcesses () { return r; } - //used in getUndoChunkByRowId - public ArrayList getTransFrames(long cframeId) { - final Table t = getTableByName("su.interference.persistent.TransFrame"); - final ArrayList r = new ArrayList(); - for (Object o : t.getIndexFieldByColumn("cframeId").getIndex().getObjectsByKey(cframeId)) { - r.add((TransFrame)((DataChunk)o).getEntity()); - } - return r; - } - //used in storeFrame public TransFrame getTransFrameById(long transId, long cframeId, long uframe) { final Table t = getTableByName("su.interference.persistent.TransFrame"); - for (Object o : t.getIndexFieldByColumn("cframeId").getIndex().getObjectsByKey(cframeId)) { - final TransFrame tb = (TransFrame)((DataChunk)o).getEntity(); - if (tb.getTransId()==transId&&tb.getUframeId()==uframe) { - return tb; - } + final TransFrameId id = new TransFrameId(cframeId, uframe, transId); + DataChunk dc = (DataChunk) t.getMapFieldByColumn("frameId").getMap().get(id); + if (dc != null) { + return (TransFrame) dc.getEntity(); } return null; } - //used in commit/rollback mechanism - public synchronized ArrayList getTransFrameByTransId (long transId) { - final Table t = getTableByName("su.interference.persistent.TransFrame"); - final ArrayList r = new ArrayList(); - for (Object o : t.getIndexFieldByColumn("transId").getIndex().getObjectsByKey(transId)) { - r.add((TransFrame)((DataChunk)o).getEntity()); - } - return r; - } - //used in unlock table mechanism + @Deprecated public synchronized ArrayList getTransFrameByObjectId (int objectId) { - final Table t = getTableByName("su.interference.persistent.TransFrame"); + //final Table t = getTableByName("su.interference.persistent.TransFrame"); final ArrayList r = new ArrayList(); - for (Object o : t.getIndexFieldByColumn("objectId").getIndex().getObjectsByKey(objectId)) { - r.add((TransFrame)((DataChunk)o).getEntity()); - } return r; } diff --git a/src/main/java/su/interference/core/LLT.java b/src/main/java/su/interference/core/LLT.java index 7e77a34..e525f8d 100644 --- a/src/main/java/su/interference/core/LLT.java +++ b/src/main/java/su/interference/core/LLT.java @@ -104,6 +104,7 @@ private static boolean poolNotEmpty() { } public void add(Frame b) { + b.getFrameData().setSynced(false); frames.put(b.getFrameData().getFrameId(), b); } @@ -111,6 +112,7 @@ public void commit() { pool.remove(this.id); if (this.lock) { for (Map.Entry entry : frames.entrySet()) { + entry.getValue().getFrameData().setSynced(true); entry.getValue().clearSnaps(this.id); } frames.clear(); @@ -129,7 +131,7 @@ public long getId() { return id; } - public StackTraceElement[] getTrace() { + private StackTraceElement[] getTrace() { return trace; } } diff --git a/src/main/java/su/interference/core/SyncFrame.java b/src/main/java/su/interference/core/SyncFrame.java index 51ba0bc..2253096 100644 --- a/src/main/java/su/interference/core/SyncFrame.java +++ b/src/main/java/su/interference/core/SyncFrame.java @@ -58,7 +58,6 @@ public class SyncFrame implements Comparable, Serializable, AllowRPredicate { private Frame rFrame; private final HashMap imap; private final HashMap rtran; - private final ArrayList tframes; private final static long serialVersionUID = 8712349857239487289L; public SyncFrame(Frame frame, Session s, FreeFrame fb) throws Exception { @@ -80,7 +79,6 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb) throws Exception { className = bd == null ? null : t.getName(); rtran = frame.getLiveTransactions(); - tframes = frame.getLiveTransFrames(); if (frame.getClass().getName().equals("su.interference.core.DataFrame")) { if (frame.getType()!=0) { throw new InternalException(); @@ -92,8 +90,16 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb) throws Exception { final long nextId_ = db.getNextFrame()+db.getNextFile(); try { //todo NPE possibly by evicted frame - prevId = prevId_ == 0 ? 0 : Instance.getInstance().getFrameById(prevId_).getAllocId(); - nextId = nextId_ == 0 ? 0 : Instance.getInstance().getFrameById(nextId_).getAllocId(); + try { + prevId = prevId_ == 0 ? 0 : Instance.getInstance().getFrameById(prevId_).getAllocId(); + } catch (NullPointerException npe) { + logger.info("evicted frame caused an NPE during SyncFrame construction id = " + frame.getPtr()); + } + try { + nextId = nextId_ == 0 ? 0 : Instance.getInstance().getFrameById(nextId_).getAllocId(); + } catch (NullPointerException npe) { + logger.info("evicted frame caused an NPE during SyncFrame construction id = " + frame.getPtr()); + } } catch (Exception e) { e.printStackTrace(); } @@ -126,7 +132,7 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb) throws Exception { fileType = Instance.getInstance().getDataFileById(frame.getFile()).getType(); b = frame.getFrame(); frameId = frame.getPtr(); - this.allocId = frame.getAllocFile()+ frame.getAllocPointer(); + this.allocId = frame.getAllocFile() + frame.getAllocPointer(); } public Frame getRFrame() { @@ -217,10 +223,6 @@ public HashMap getRtran() { return rtran; } - public ArrayList getTframes() { - return tframes; - } - public boolean equals (SyncFrame bl) { if ((this.getFile()==bl.getFile())&&(this.getPointer()==bl.getPointer())) { return true; diff --git a/src/main/java/su/interference/core/SyncQueue.java b/src/main/java/su/interference/core/SyncQueue.java index b021b58..f8d710d 100644 --- a/src/main/java/su/interference/core/SyncQueue.java +++ b/src/main/java/su/interference/core/SyncQueue.java @@ -60,7 +60,7 @@ private synchronized boolean syncFramesFromQueue() throws Exception { running = true; final LLT llt = LLT.getLLTAndLock(); final int famt = Storage.getStorage().getFiles()==null?0:Storage.getStorage().getFiles().size(); - logger.debug("sync procedure was started with frames amount="+LLT.getFrames().size()); + logger.debug("sync procedure was started with frames amount=" + LLT.getFrames().size()); final ArrayList frames = new ArrayList<>(); final Map> frames_ = new HashMap<>(); @@ -72,10 +72,12 @@ private synchronized boolean syncFramesFromQueue() throws Exception { try { final Frame f = entry.getValue(); frames.add(new SyncFrame(f, s, fb)); - if (frames_.get(f.getObjectId()) == null) { - frames_.put(f.getObjectId(), new ArrayList<>()); + if (f.isLocal()) { + if (frames_.get(f.getObjectId()) == null) { + frames_.put(f.getObjectId(), new ArrayList<>()); + } + frames_.get(f.getObjectId()).add(f.getFrameData()); } - frames_.get(f.getObjectId()).add(f.getFrameData()); } catch (MissingSyncFrameException e) { logger.debug("Unable to sync frame "+((Frame) entry.getValue()).getPtr()+" because removed by freeing"); } diff --git a/src/main/java/su/interference/core/SystemCleanUp.java b/src/main/java/su/interference/core/SystemCleanUp.java new file mode 100644 index 0000000..185c390 --- /dev/null +++ b/src/main/java/su/interference/core/SystemCleanUp.java @@ -0,0 +1,56 @@ +package su.interference.core; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import su.interference.metrics.Metrics; +import su.interference.persistent.FrameData; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +public class SystemCleanUp implements Runnable, ManagedProcess { + private volatile boolean f = true; + CountDownLatch latch; + private final static Logger logger = LoggerFactory.getLogger(SystemCleanUp.class); + + public void run () { + while (f) { + latch = new CountDownLatch(1); + try { + cleanUpFrames(); + } catch(Exception e) { + e.printStackTrace(); + } + + try { + //final int period = Config.getConfig().SYNC_PERIOD; + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + latch.countDown(); + } + } + + public void stop() throws InterruptedException{ + f = false; + if (latch != null) { + latch.await(); + } + } + + private void cleanUpFrames() { + Metrics.get("systemCleanUp").start(); + int i = 0; + for (Object entry : Instance.getInstance().getFramesMap().entrySet()) { + final FrameData f = (FrameData) ((DataChunk) ((Map.Entry) entry).getValue()).getEntity(); + f.decreasePriority(); + if (f.isSynced() && f.getObjectId() > 999 && f.getPriority() <= 0) { + f.clearFrame(); + } + i++; + } + Metrics.get("сleanUpBlocks").put(i); + Metrics.get("systemCleanUp").stop(); + } +} diff --git a/src/main/java/su/interference/core/SystemInit.java b/src/main/java/su/interference/core/SystemInit.java index 9822743..4bd1795 100644 --- a/src/main/java/su/interference/core/SystemInit.java +++ b/src/main/java/su/interference/core/SystemInit.java @@ -235,7 +235,8 @@ public static Table createInitialFrames(boolean initStorage, int nodeType, Sessi if (tables[i].equals("su.interference.persistent.Process")) { //insert process ret[framesCntr].insertChunk(new DataChunk(new Process(1, "hbeat","su.interference.transport.HeartBeatProcess"), s), s, true, null); - ret[framesCntr].insertChunk(new DataChunk(new Process(3, "lsync","su.interference.core.SyncQueue"), s), s, true, null); + ret[framesCntr].insertChunk(new DataChunk(new Process(2, "lsync","su.interference.core.SyncQueue"), s), s, true, null); + ret[framesCntr].insertChunk(new DataChunk(new Process(3, "clean","su.interference.core.SystemCleanUp"), s), s, true, null); //ret[framesCntr].insertChunk(new DataChunk(new Process(2, "rsync","su.interference.remote.RemoteSync"), s), s, true, null); } if (tables[i].equals("su.interference.persistent.Node")) { diff --git a/src/main/java/su/interference/core/TransFrameId.java b/src/main/java/su/interference/core/TransFrameId.java new file mode 100644 index 0000000..761070c --- /dev/null +++ b/src/main/java/su/interference/core/TransFrameId.java @@ -0,0 +1,40 @@ +package su.interference.core; + +import java.io.Serializable; + +public class TransFrameId implements Serializable { + private final long cframeId; + private final long uframeId; + private final long transId; + + public TransFrameId(long cframeId, long uframeId, long transId) { + this.cframeId = cframeId; + this.uframeId = uframeId; + this.transId = transId; + } + + public boolean equals(final Object obj) { + final TransFrameId id = (TransFrameId)obj; + if (this.cframeId == id.cframeId && this.uframeId == id.uframeId && this.transId == id.transId) { return true; } + return false; + } + + public int hashCode() { + int hashCode = 1; + hashCode = 31 * hashCode + Long.valueOf(cframeId).hashCode(); + hashCode = 31 * hashCode + Long.valueOf(uframeId).hashCode(); + hashCode = 31 * hashCode + Long.valueOf(transId).hashCode(); + return hashCode; + } + public long getCframeId() { + return cframeId; + } + + public long getUframeId() { + return uframeId; + } + + public long getTransId() { + return transId; + } +} diff --git a/src/main/java/su/interference/core/WaitFrame.java b/src/main/java/su/interference/core/WaitFrame.java index 0c6079f..5e4c89a 100644 --- a/src/main/java/su/interference/core/WaitFrame.java +++ b/src/main/java/su/interference/core/WaitFrame.java @@ -47,25 +47,23 @@ public WaitFrame(FrameData bd) { this.busy = new AtomicLong(0); } - public synchronized FrameData acquire() { + public synchronized WaitFrame acquire() { if (this.bd == null) { return null; } if (this.busy.compareAndSet(0, Thread.currentThread().getId())) { - bd.setWaitFrame(this); - return bd; + return this; } return null; } - public synchronized FrameData acquire(final int fileId) { + public synchronized WaitFrame acquire(final int fileId) { if (this.bd == null) { return null; } if (this.bd.getFile() == fileId) { if (this.busy.compareAndSet(0, Thread.currentThread().getId()) || this.busy.compareAndSet(Thread.currentThread().getId(), Thread.currentThread().getId())) { - bd.setWaitFrame(this); - return bd; + return this; } } return null; @@ -75,12 +73,10 @@ public synchronized boolean trySetBd(FrameData oldbd, FrameData newbd, int frame if (oldbd==null) { if (this.bd == null) { this.bd = newbd; - this.bd.setWaitFrame(this); return true; } } else if ((this.bd.getFile() == oldbd.getFile()&&(busy.get() == Thread.currentThread().getId()))||frameType > 0) { this.bd = newbd; - this.bd.setWaitFrame(this); return true; } return false; @@ -90,7 +86,6 @@ public synchronized boolean trySetBdAndAcquire(FrameData bd) { if (this.busy.compareAndSet(0, Thread.currentThread().getId())) { if (this.bd == null) { this.bd = bd; - this.bd.setWaitFrame(this); return true; } else { this.busy.compareAndSet(Thread.currentThread().getId(), 0); @@ -108,14 +103,9 @@ public AtomicLong getBusy() { return busy; } - /* - public synchronized void setBd(FrameData bd) { - this.bd = bd; - } -*/ - public void release() { - this.busy.compareAndSet(Thread.currentThread().getId(), 0); + //this.busy.compareAndSet(Thread.currentThread().getId(), 0); + this.busy.set(0); } } diff --git a/src/main/java/su/interference/metrics/Meter.java b/src/main/java/su/interference/metrics/Meter.java index 01943c4..6c37c11 100644 --- a/src/main/java/su/interference/metrics/Meter.java +++ b/src/main/java/su/interference/metrics/Meter.java @@ -29,9 +29,10 @@ this software and associated documentation files (the "Software"), to deal in * @since 1.0 */ -public class Meter { +public class Meter implements MeterMBean { private final String name; + private long value; public Meter (String name) { this.name = name; @@ -42,11 +43,11 @@ public void put() { } public void put(final long value) { - + this.value = value; } public long getCnt() { - return 0; + return value; } public long getMin() { diff --git a/src/main/java/su/interference/metrics/MeterMBean.java b/src/main/java/su/interference/metrics/MeterMBean.java new file mode 100644 index 0000000..b36358b --- /dev/null +++ b/src/main/java/su/interference/metrics/MeterMBean.java @@ -0,0 +1,5 @@ +package su.interference.metrics; + +public interface MeterMBean { + long getCnt(); +} diff --git a/src/main/java/su/interference/metrics/Metrics.java b/src/main/java/su/interference/metrics/Metrics.java index 723c6ec..94c44a8 100644 --- a/src/main/java/su/interference/metrics/Metrics.java +++ b/src/main/java/su/interference/metrics/Metrics.java @@ -40,6 +40,7 @@ public class Metrics { public static final int COUNTER = 1; public static final int HISTOGRAM = 2; public static final int TIMER = 3; + public static final int METER = 10; private static final ConcurrentHashMap metrics = new ConcurrentHashMap(); private static final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@ -47,6 +48,7 @@ public static void register(int type, String name) throws Exception { if (type == COUNTER) { metrics.put(name, new Counter(name)); } if (type == HISTOGRAM) { metrics.put(name, new Histogram(name)); } if (type == TIMER) { metrics.put(name, new Timer(name)); } + if (type == METER) { metrics.put(name, new Meter(name)); } ObjectName obj = new ObjectName("su.interference:type="+name+metrics.get(name).getClass().getSimpleName()); mbs.registerMBean(metrics.get(name), obj); diff --git a/src/main/java/su/interference/persistent/DataFile.java b/src/main/java/su/interference/persistent/DataFile.java index 0f40da4..0af2570 100644 --- a/src/main/java/su/interference/persistent/DataFile.java +++ b/src/main/java/su/interference/persistent/DataFile.java @@ -196,6 +196,7 @@ public synchronized FrameData createNewFrame(FrameData frame, int frameType, lon if (setcurrenable) bd.markAsCurrent(); int prevFile = 0; long prevPtr = 0; + if (frame != null && frameType == 0) { frame.clearCurrent(); prevFile = frame.getFile(); @@ -206,26 +207,35 @@ public synchronized FrameData createNewFrame(FrameData frame, int frameType, lon frame.getDataFrame().setNextFrame(bd.getPtr()); s.persist(frame, llt); //update } + bd.setPrevFile(prevFile); bd.setPrevFrame(prevPtr); + if (frameType==0) { ((DataFrame)db).setPrevFile(prevFile); ((DataFrame)db).setPrevFrame(prevPtr); } + if (t.getFileStart()==0&&t.getFrameStart()==0) { t.setFileStart(bd.getFile()); t.setFrameStart(bd.getPtr()); } + t.setFileLast(bd.getFile()); t.setFrameLast(bd.getPtr()); t.incFrameAmount(); - if (llt!=null) { - llt.add(db); - if (frame!=null) { - llt.add(frame.getFrame()); + + if (!external) { + if (llt != null) { + llt.add(db); + if (frame != null) { + llt.add(frame.getFrame()); + } } } + s.persist(t, llt); //update + if (t.getName().equals("su.interference.persistent.FrameData")) { DataChunk dc = new DataChunk(bd, s); int len = dc.getBytesAmount(); @@ -450,11 +460,15 @@ public synchronized void writeFrame(final long ptr, final byte[] b) throws IOExc final long ptr_ = bs.getLongFromBytes(4); final int id_ = bs.getIntFromBytes(12); + if (file_ == 0) { + logger.error("Wrong write frame operation with file = 0 ptr = " + ptr_); + } + if (this.fileId != file_) { logger.error("Wrong write frame operation with file = " + this.file + ", internal file = " + file_ + " ptr = " + ptr_); } - if (ptr != ptr) { + if (ptr != ptr_) { logger.error("Wrong write frame operation with file = " + this.file + " ptr = " + ptr + ", internal file = " + file_ + " ptr = " + ptr_); } @@ -472,13 +486,13 @@ public synchronized void writeFrame(FrameData bd, final long ptr, final byte[] b logger.error("Wrong write frame operation with file = " + this.file + ", internal file = " + file_ + " ptr = " + ptr_); } - if (ptr != ptr) { + if (ptr != ptr_) { logger.error("Wrong write frame operation with file = " + this.file + " ptr = " + ptr + ", internal file = " + file_ + " ptr = " + ptr_); } this.file.seek(ptr); this.file.write(b); - + llt.add(bd.getFrame()); s.persist(bd, llt); } diff --git a/src/main/java/su/interference/persistent/FrameData.java b/src/main/java/su/interference/persistent/FrameData.java index e21bbc6..b611e93 100644 --- a/src/main/java/su/interference/persistent/FrameData.java +++ b/src/main/java/su/interference/persistent/FrameData.java @@ -39,6 +39,9 @@ this software and associated documentation files (the "Software"), to deal in import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -112,33 +115,22 @@ public class FrameData implements Serializable, Comparable, FrameApi, FilePartit @Transient private long transId; //for UndoChunk: transId @Transient + private final Map tcounter = new ConcurrentHashMap<>(); + @Transient + private volatile int priority = 2; + @Transient + private volatile boolean synced = true; + @Transient private volatile Frame frame; @Transient private DataObject dataObject; @Transient private Class entityClass; - @Transient - private volatile WaitFrame waitFrame; public int getImpl() { return FrameApi.IMPL_DATA; } - public void release() throws InternalException { - if (current == null || current.get() == 0) { - return; - } - if (waitFrame==null) { - throw new InternalException(); - } else { - waitFrame.release(); - } - } - - public void setWaitFrame(WaitFrame waitFrame) { - this.waitFrame = waitFrame; - } - public void markAsCurrent() { if (this.current==null) { this.current = new AtomicInteger(0); @@ -237,10 +229,9 @@ protected int getFrameSize() throws ClassNotFoundException, InternalException, I //real current transactional value //skip negative differences for prevent frame oversize when many transactions change data public int getFrameUsed() { - final ArrayList tbs = Instance.getInstance().getTransFrames(this.getFrameId()); int tdiff = 0; - for (TransFrame tb : tbs) { - tdiff = tdiff + tb.getDiff(); + for (Map.Entry entry : tcounter.entrySet()) { + tdiff = tdiff + entry.getValue().getDiff(); } return this.getUsed() + (tdiff>0?tdiff:0); } @@ -257,50 +248,6 @@ public int getFrameFree() throws ClassNotFoundException, InternalException, Inst return getFrameSize() - Frame.FRAME_HEADER_SIZE - getFrameUsed(); } - public boolean hasLiveTransaction(long transId) { - //Map tmap = frame.getLiveTransactions(); - final ArrayList tbs = Instance.getInstance().getTransFrames(this.getFrameId()); - for (TransFrame tb : tbs) { - if (tb.getTransId() == transId) { - return true; - } - } - return false; - } - - public boolean hasLocalTransactions() { - final ArrayList tbs = Instance.getInstance().getTransFrames(this.getFrameId()); - for (TransFrame tb : tbs) { - final Transaction tran = Instance.getInstance().getTransactionById(tb.getTransId()); - if (tran.getNodeId() == Config.getConfig().LOCAL_NODE_ID) { - return true; - } - } - return false; - } - - //returns node id - public int hasRemoteTransactions() throws InternalException { - final ArrayList tbs = Instance.getInstance().getTransFrames(this.getFrameId()); - boolean local = false; - int nodeId = 0; - for (TransFrame tb : tbs) { - final Transaction tran = Instance.getInstance().getTransactionById(tb.getTransId()); - if (tran.getNodeId() == Config.getConfig().LOCAL_NODE_ID) { - local = true; - } else { - if (nodeId > 0 && nodeId != tran.getNodeId()) { - throw new InternalException(); - } - nodeId = tran.getNodeId(); - } - if (local == true && nodeId > 0) { - throw new InternalException(); - } - } - return nodeId; - } - public FrameData() { } @@ -323,7 +270,7 @@ public FrameData(FrameData bd, DataObject tt) { this.size = bd.getSize(); this.allocId = bd.getAllocId(); this.started = bd.getStarted(); - this.current = bd.getCurrent(); + this.current = new AtomicInteger(bd.getCurrent().get()); this.used = bd.getUsed(); this.prevFrame = bd.getPrevFrame(); this.prevFile = bd.getPrevFile(); @@ -454,6 +401,10 @@ public void setNextFrame(long nextFrame) { } } + public void clearFrame() { + this.frame = null; + } + public Class getEntityClass() { return entityClass; } @@ -462,6 +413,38 @@ public void setEntityClass(Class entityClass) { this.entityClass = entityClass; } + public int getTcounterSize() { + return tcounter.size(); + } + + public void increaseTcounter(long id, TransFrame f) { + this.tcounter.put(id, f); + } + + public void decreaseTcounter(long id) { + this.tcounter.remove(id); + } + + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + public void decreasePriority() { + this.priority--; + } + + public boolean isSynced() { + return synced; + } + + public void setSynced(boolean synced) { + this.synced = synced; + } + public AtomicLong getDistribution() { return distribution; } diff --git a/src/main/java/su/interference/persistent/Session.java b/src/main/java/su/interference/persistent/Session.java index e7a9168..579dd31 100644 --- a/src/main/java/su/interference/persistent/Session.java +++ b/src/main/java/su/interference/persistent/Session.java @@ -306,6 +306,7 @@ public void closeQueue() { } public void closeStreamQueue() { + logger.info("Stream is closed"); SQLCursor.removeStreamQueue(this); } diff --git a/src/main/java/su/interference/persistent/Table.java b/src/main/java/su/interference/persistent/Table.java index cbf6ce5..894b0c4 100644 --- a/src/main/java/su/interference/persistent/Table.java +++ b/src/main/java/su/interference/persistent/Table.java @@ -684,8 +684,8 @@ public Table (DataChunk chunk, IndexList ixl) throws IllegalAccessException, Cla if (this.name.equals("su.interference.persistent.FrameData")) { getIndexFieldByColumn("objectId").setIndex(ixl); - final Map ixlb = new HashMap(); - final Map ixla = new HashMap(); + final Map ixlb = new ConcurrentHashMap<>(); + final Map ixla = new ConcurrentHashMap<>(); final IndexList ixls = new IndexList(); for (Object o : ixl.getContent()) { ixlb.put(((FrameData) ((DataChunk) o).getEntity()).getFrameId(), o); @@ -961,7 +961,7 @@ public void usedSpace (final FrameData bd, final int used, final boolean persist } } - private FrameData getAvailableFrame(final Object o, final boolean fpart) throws ClassNotFoundException, InstantiationException, InternalException, IllegalAccessException { + private WaitFrame getAvailableFrame(final Object o, final boolean fpart) throws ClassNotFoundException, InstantiationException, InternalException, IllegalAccessException { Metrics.get("getAvailableFrame").start(); final long st = System.currentTimeMillis(); @@ -976,7 +976,7 @@ private FrameData getAvailableFrame(final Object o, final boolean fpart) throws for (int i = 0; i < this.lbs.length; i++) { final int i_ = (a + i) % this.lbs.length; final WaitFrame wb = this.lbs[i_]; - final FrameData bd = fpart ? wb.acquire(getTargetFileId(((FilePartitioned) o).getFile())) : wb.acquire(); + final WaitFrame bd = fpart ? wb.acquire(getTargetFileId(((FilePartitioned) o).getFile())) : wb.acquire(); if (bd != null) { avframeStart.getAndIncrement(); Metrics.get("getAvailableFrame").stop(); @@ -1067,7 +1067,8 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) final DataChunk nc = new DataChunk(o, s); final int len = nc.getBytesAmount(); - final FrameData bd = getAvailableFrame(o, fpart); + final WaitFrame bdw = getAvailableFrame(o, fpart); + final FrameData bd = bdw.getBd(); final int diff = len - bd.getFrameFree(); if (isNoTran()) { @@ -1075,7 +1076,6 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) if (p==0) { final FrameData nb = this.createNewFrame(bd, bd.getFile(), 0, 0, false, true, false, s, llt); nb.getDataFrame().insertChunk(nc, s, true, llt); - nb.release(); usedSpace(nb,nb.getUsed()+len, true, s, llt); } else { usedSpace(bd,bd.getUsed()+len, true, s, llt); @@ -1084,7 +1084,6 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) if (diff > 0) { final FrameData nb = this.createNewFrame(bd, bd.getFile(), 0, 0, false, true, false, s, llt); nb.getDataFrame().insertChunk(nc, s, true, llt); - nb.release(); s.getTransaction().storeFrame(nb, len, s, llt); } else { final int p = bd.insertChunk(nc, s, true, llt); @@ -1100,7 +1099,7 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) //system-only table in-memory indexes this.addIndexValue(nc); - bd.release(); + bdw.release(); Metrics.get("persistInsertChunk").stop(); if (extllt == null) { llt.commit(); } @@ -1129,14 +1128,14 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) if (diff>0) { bd.removeChunk(dc.getHeader().getRowID().getRowPointer(), s, llt); // bd.deleteChunk(dc.getHeader().getRowID().getRowPointer(), s, llt); - final FrameData ib = getAvailableFrame(o, fpart); + final WaitFrame ibw = getAvailableFrame(o, fpart); + final FrameData ib = ibw.getBd(); final int p = ib.getDataFrame().insertChunk(dc, s, true, llt); if (p==0) { // bd.removeChunk(dc.getHeader().getRowID().getRowPointer(), s, llt); final FrameData nb = this.createNewFrame(ib, ib.getFile(), 0, 0, false, true, false, s, llt); nb.getDataFrame().insertChunk(dc, s, true, llt); - nb.release(); if (isNoTran()) { usedSpace(bd,bd.getUsed()-len, true, s, llt); usedSpace(nb,newlen, true, s, llt); @@ -1163,7 +1162,7 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt) //logger.info("updated "+dc.getHeader().getRowID().getFileId()+" "+dc.getHeader().getRowID().getFramePointer()+" "+dc.getHeader().getRowID().getRowPointer()+" : "+dc.getUndoChunk().getFile()+" "+dc.getUndoChunk().getFrame()+" "+dc.getUndoChunk().getPtr()); } - ib.release(); + ibw.release(); if (extllt == null) { llt.commit(); } return dc; @@ -1243,16 +1242,16 @@ public synchronized FrameData createNewFrame(final FrameData frame, final int fi if (wb.getBd().getFile() == bd.getFile()) { // remove evicted ptr from prevframe frame.setNextFrame(wb.getBd().getPtr()); - s.persist(frame); + s.persist(frame, llt); } } bd.clearCurrent(); - s.persist(bd); + s.persist(bd, llt); logger.info("evict frame " + bd.getObjectId() + ":" + bd.getFile() + ":" + bd.getPtr() + " " + Thread.currentThread().getName()); for (WaitFrame wb : this.lbs) { - final FrameData bd_ = wb.acquire(fileId); + final WaitFrame bd_ = wb.acquire(fileId); if (bd_ != null) { - return bd_; + return bd_.getBd(); } } } @@ -1272,6 +1271,7 @@ public void lockTable(Session s) { } } + @Deprecated public void unlockTable(Session s) { try { final RetrieveLock rl = Instance.getInstance().getRetrieveLockById(this.objectId, s.getTransaction().getTransId()); @@ -1695,6 +1695,7 @@ public synchronized void remove (ValueSet key, Object o, Session s, LLT llt) thr public synchronized void storeFrames(List frames, int sourceNodeId, LLT llt, Session s) throws Exception { for (SyncFrame b : frames) { b.getBd().setStarted(0); + b.getRFrame().setFrameData(b.getBd()); b.getBd().setFrame(b.getRFrame()); if (b.isStarted()) { ixstartfs.put(sourceNodeId, b.getBd().getFrameId()); diff --git a/src/main/java/su/interference/persistent/TransFrame.java b/src/main/java/su/interference/persistent/TransFrame.java index 21d3f9a..1f8973a 100644 --- a/src/main/java/su/interference/persistent/TransFrame.java +++ b/src/main/java/su/interference/persistent/TransFrame.java @@ -44,15 +44,12 @@ this software and associated documentation files (the "Software"), to deal in public class TransFrame implements Comparable, FilePartitioned, Serializable { @Column - @IndexColumn @MgmtColumn(width=10, show=true, form=false, edit=false) private long transId; @Column - @IndexColumn @MgmtColumn(width=10, show=true, form=false, edit=false) private int objectId; @Column - @IndexColumn @MgmtColumn(width=10, show=true, form=false, edit=false) private long cframeId; @Column @@ -65,7 +62,7 @@ public class TransFrame implements Comparable, FilePartitioned, Serializable { @MapColumn @MgmtColumn(width=10, show=true, form=false, edit=false) @Transient - private transient String frameId; + private transient TransFrameId frameId; @Transient public static final int CLASS_ID = 8; @@ -76,8 +73,11 @@ public static int getCLASS_ID() { return CLASS_ID; } - public String getFrameId() { - return transId + "-" + cframeId + "-" + uframeId; + public TransFrameId getFrameId() { + if (frameId == null) { + frameId = new TransFrameId(cframeId, uframeId, transId); + } + return frameId; } public TransFrame() { @@ -85,10 +85,11 @@ public TransFrame() { } public TransFrame(long tr, int obj, long cp, long up) { - this.transId = tr; + this.transId = tr; this.objectId = obj; - this.cframeId = cp; - this.uframeId = up; + this.cframeId = cp; + this.uframeId = up; + this.frameId = new TransFrameId(cframeId, uframeId, transId); } public int compareTo(Object obj) { @@ -128,34 +129,18 @@ public long getTransId() { return transId; } - public void setTransId(long transId) { - this.transId = transId; - } - public int getObjectId() { return objectId; } - public void setObjectId(int objectId) { - this.objectId = objectId; - } - public long getCframeId() { return cframeId; } - public void setCframeId(long cframeId) { - this.cframeId = cframeId; - } - public long getUframeId() { return uframeId; } - public void setUframeId(long uframeId) { - this.uframeId = uframeId; - } - public int getDiff() { return diff; } diff --git a/src/main/java/su/interference/persistent/Transaction.java b/src/main/java/su/interference/persistent/Transaction.java index 0c0b38c..5e4c8c1 100644 --- a/src/main/java/su/interference/persistent/Transaction.java +++ b/src/main/java/su/interference/persistent/Transaction.java @@ -38,6 +38,7 @@ this software and associated documentation files (the "Software"), to deal in import java.util.*; import java.lang.reflect.Modifier; import java.net.MalformedURLException; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; /** @@ -79,6 +80,8 @@ public class Transaction implements Serializable { @MgmtColumn(width=10, show=true, form=false, edit=false) private long cid; + @Transient + private final List tframes = new CopyOnWriteArrayList<>(); @Transient private final transient WaitFrame[] lbs; @Transient @@ -149,7 +152,7 @@ public void createUndoFrames(Session s, LLT llt) throws Exception { } } - public FrameData getAvailableFrame(final FilePartitioned o, final boolean fpart) throws ClassNotFoundException, InstantiationException, InternalException, IllegalAccessException { + public WaitFrame getAvailableFrame(final FilePartitioned o, final boolean fpart) throws ClassNotFoundException, InstantiationException, InternalException, IllegalAccessException { Metrics.get("getAvailableFrame").start(); final long st = System.currentTimeMillis(); final long timeout = 1000; @@ -158,7 +161,7 @@ public FrameData getAvailableFrame(final FilePartitioned o, final boolean fpart) while (true) { // for (int i=0; i0) { // undo transframe record for (Long f : fptr) { @@ -238,6 +241,7 @@ public synchronized void commit (Session s, boolean remote) { } final List rls = Instance.getInstance().getRetrieveLocksByObjectId(tb.getObjectId()); if (rls.size()==0) { + final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); //deallocate undo frame final FrameData ub = Instance.getInstance().getFrameById(tb.getUframeId()); //store frame params as free @@ -247,6 +251,7 @@ public synchronized void commit (Session s, boolean remote) { s.delete(ub); fptr.add(tb.getUframeId()); } + cb.decreaseTcounter(this.transId); s.delete(tb); } } else { //change transframe record @@ -257,6 +262,9 @@ public synchronized void commit (Session s, boolean remote) { } else { s.persist(cb); //update new size value to dataframe } + if (cb != null) { + cb.decreaseTcounter(this.transId); + } s.delete(tb); } } @@ -268,6 +276,7 @@ public synchronized void commit (Session s, boolean remote) { e.printStackTrace(); } } + tframes.clear(); started = false; logger.info("Transaction committed"); } @@ -291,9 +300,8 @@ public synchronized void rollback (Session s, boolean remote) { } else { sendBroadcastEvents(CommandEvent.ROLLBACK, s); try { - ArrayList tbs = Instance.getInstance().getTransFrameByTransId(this.transId); - Collections.sort(tbs); - for (TransFrame tb : tbs) { + Collections.sort(tframes); + for (TransFrame tb : tframes) { final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); if (!ubd.contains(cb)) { ubd.add(cb); @@ -301,7 +309,7 @@ public synchronized void rollback (Session s, boolean remote) { } for (FrameData ub : ubd) { final ArrayList ubs = new ArrayList(); - for (TransFrame tb : tbs) { + for (TransFrame tb : tframes) { if (ub.getFrameId()==tb.getCframeId()) { if (tb.getUframeId()>0) { final FrameData ubb = Instance.getInstance().getFrameById(tb.getUframeId()); @@ -313,7 +321,7 @@ public synchronized void rollback (Session s, boolean remote) { ub.getFrame().rollbackTransaction(this, ubs, s); } - for (TransFrame tb : tbs) { + for (TransFrame tb : tframes) { boolean hasfb = false; if (tb.getUframeId()>0) { // undo transframe record for (Long f : fptr) { @@ -323,6 +331,7 @@ public synchronized void rollback (Session s, boolean remote) { } final List rls = Instance.getInstance().getRetrieveLocksByObjectId(tb.getObjectId()); if (rls.size()==0) { + final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); //deallocate undo frame final FrameData ub = Instance.getInstance().getFrameById(tb.getUframeId()); //store frame params as free @@ -332,6 +341,7 @@ public synchronized void rollback (Session s, boolean remote) { s.delete(ub); fptr.add(tb.getUframeId()); } + cb.decreaseTcounter(this.transId); s.delete(tb); } } else { //change transframe record @@ -340,6 +350,9 @@ public synchronized void rollback (Session s, boolean remote) { logger.info("rollback freeing frame "+cb.getFile()+" "+cb.getPtr()); freeFrames(cb, s); } + if (cb != null) { + cb.decreaseTcounter(this.transId); + } s.delete(tb); } } @@ -353,18 +366,19 @@ public synchronized void rollback (Session s, boolean remote) { e.printStackTrace(); } } + tframes.clear(); started = false; logger.info("Transaction rolled back"); } + @Deprecated public void unlockUndoFrames (int objectId, Session s) throws InternalException { if (this.getTransType()!=TRAN_THR) { throw new InternalException(); //ONLY FOR FIXED TRANSACTIONS } - final ArrayList tbs = Instance.getInstance().getTransFrameByTransId(this.transId); final ArrayList fptr = new ArrayList(); try { - for (TransFrame tb : tbs) { + for (TransFrame tb : tframes) { if (tb.getObjectId()==objectId) { boolean hasfb = false; if (tb.getUframeId()>0) { // undo transframe record @@ -398,15 +412,10 @@ public void freeFrames (FrameData cb, Session s) throws Exception { final Table t = Instance.getInstance().getTableById(cb.getObjectId()); if (!t.checkLBS(cb)) { //LB can't deallocated!!! May be empty //check for other transactions, which locked this frame - final ArrayList tbs = Instance.getInstance().getTransFrames(cb.getFrameId()); - boolean hasOT = false; - for (TransFrame tb: tbs) { - if (tb.getTransId()!=this.transId) { - hasOT = true; - break; - } + if (cb.getTcounterSize() == 0) { + throw new RuntimeException("Zero tcounter in transactional frame"); } - if (!hasOT) { + if (cb.getTcounterSize() == 1) { final FreeFrame fb = new FreeFrame(0, cb.getFrameId(), cb.getSize()); final FrameData pb = cb.getPrevFrameId()>0 ? Instance.getInstance().getFrameById(cb.getPrevFrameId()) : null; final FrameData nb = Instance.getInstance().getFrameById(cb.getNextFrameId()); @@ -484,7 +493,6 @@ private void startStatement (final Session s, LLT llt) { } public void storeFrame (final FrameData cb, final FrameData ub, final int len, final Session s, LLT llt) { - final long uframeid = ub==null?0:ub.getFrameId(); final TransFrame tb = Instance.getInstance().getTransFrameById(this.transId, cb.getFrameId(), uframeid); @@ -499,9 +507,11 @@ public void storeFrame (final FrameData cb, final FrameData ub, final int len, f } final TransFrame ntb = new TransFrame(this.transId, cb.getObjectId(), cb.getFrameId(), uframeid); + cb.increaseTcounter(this.transId, ntb); ntb.setDiff(len); try { s.persist(ntb, llt); //insert + this.tframes.add(ntb); } catch (Exception e) { e.printStackTrace(); } diff --git a/src/main/java/su/interference/proxy/RSProxyFactory.java b/src/main/java/su/interference/proxy/RSProxyFactory.java index 6e20f5f..85b4768 100644 --- a/src/main/java/su/interference/proxy/RSProxyFactory.java +++ b/src/main/java/su/interference/proxy/RSProxyFactory.java @@ -75,6 +75,7 @@ public synchronized Class register (List cs, String name, boolean ixf sb.append("import javax.persistence.Column;\n"); sb.append("import javax.persistence.Transient;\n"); sb.append("import javax.persistence.GeneratedValue;\n"); + sb.append("import su.interference.core.IndexChunk;\n"); sb.append("import su.interference.core.SystemEntity;\n"); sb.append("import su.interference.core.ResultSetEntity;\n"); sb.append("import su.interference.core.IndexEntity;\n"); @@ -95,11 +96,15 @@ public synchronized Class register (List cs, String name, boolean ixf sb.append("@DisableSync\n"); sb.append("public class "); sb.append(sname); - sb.append(" extends su.interference.proxy.GenericResultImpl implements java.io.Serializable {\n"); + sb.append(" extends su.interference.proxy.GenericResultImpl implements IndexChunk, java.io.Serializable {\n"); sb.append("\n"); //todo serialVersionUID should be unique for every proxy class sb.append(" @Transient\n"); sb.append(" private final static long serialVersionUID = 6730871208437219890L;\n"); + sb.append(" @Transient\n"); + sb.append(" public su.interference.core.DataChunk dc;\n"); + sb.append(" public su.interference.core.DataChunk getDataChunk() { return dc; }\n"); + sb.append(" public void setDataChunk(su.interference.core.DataChunk c) { dc = c; }\n"); sb.append("\n"); for (int i=0; i getFrameChunks(Session s) throws IOException, ClassNotFoundException, InternalException, IllegalAccessException, InstantiationException, InvocationTargetException, NoSuchMethodException; ArrayList getFrameEntities(Session s) throws IOException, ClassNotFoundException, InternalException, IllegalAccessException, InstantiationException, InvocationTargetException, NoSuchMethodException; - boolean hasLiveTransaction(long transId); - boolean hasLocalTransactions(); - int hasRemoteTransactions() throws InternalException; } diff --git a/src/main/java/su/interference/sql/FrameGroupTask.java b/src/main/java/su/interference/sql/FrameGroupTask.java index 14e4df1..c0bb01e 100644 --- a/src/main/java/su/interference/sql/FrameGroupTask.java +++ b/src/main/java/su/interference/sql/FrameGroupTask.java @@ -60,6 +60,7 @@ public FrameGroupTask(Cursor cur, Queue q, ResultSet target, Table gtabl public void run() { final boolean ixflag = cur.getSqlStmt().getCols().getOrderColumns().size() > 0; + Thread.currentThread().setName("Group SQL thread "+Thread.currentThread().getId()); try { final SQLGroupContainer sqlg = new SQLGroupContainer(((StreamQueue) target).getRscols(), diff --git a/src/main/java/su/interference/sql/SQLIndex.java b/src/main/java/su/interference/sql/SQLIndex.java index 5018193..ca3fcad 100644 --- a/src/main/java/su/interference/sql/SQLIndex.java +++ b/src/main/java/su/interference/sql/SQLIndex.java @@ -68,8 +68,8 @@ public SQLIndex(Table t, Table parent, boolean left, SQLColumn lkey, SQLColumn r this.left = left; this.unique = left?lkey.isUnique():rkey.isUnique(); this.merged = merged; - frames = t.getFrames(s); - amount = frames.size(); + frames = left ? t.getFrames(s) : null; + amount = left ? frames.size() : 0; cntrStart = 0; this.framecntr = new AtomicInteger(0); this.returned = new AtomicBoolean(false); diff --git a/src/main/java/su/interference/transport/SyncFrameEvent.java b/src/main/java/su/interference/transport/SyncFrameEvent.java index 4b06ea8..9e1b71e 100644 --- a/src/main/java/su/interference/transport/SyncFrameEvent.java +++ b/src/main/java/su/interference/transport/SyncFrameEvent.java @@ -97,11 +97,12 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { b.setDf(f); } } - logger.debug("create replicated frame with allocId " + b.getAllocId() + " ptr " + bd.getFrameId()); + logger.debug("create new frame allocId " + b.getAllocId() + " ptr " + bd.getFrameId()); } else { - if (t.getObjectId() == bd.getObjectId()) { + final Table t_ = Instance.getInstance().getTableById(bd.getObjectId()); + if (t.getName().equals(t_.getName())) { b.setDf(Instance.getInstance().getDataFileById(bd.getFile())); - logger.debug("rframe bd found with allocId=" + b.getAllocId()); + logger.debug("rframe bd " + bd.getFrameId() + " found with allocId=" + b.getAllocId()); } else { final FrameData bd_ = new FrameData(bd, t); s.delete(bd); @@ -140,10 +141,12 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { frame.setRes02(nextF); frame.setRes06(prevB); frame.setRes07(nextB); + frame.setFrameData(b.getBd()); + b.getBd().setFrame(frame); final LLT llt_ = LLT.getLLT(); //df access reordering prevent deadlock b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt_, s); llt_.commit(); - b.getBd().setFrame(null); +// b.getBd().setFrame(null); logger.debug("write undo frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId()); } } @@ -173,11 +176,13 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { frame.setRes02(nextF); frame.setRes06(prevB); frame.setRes07(nextB); + frame.setFrameData(b.getBd()); + b.getBd().setFrame(frame); final LLT llt_ = LLT.getLLT(); //df access reordering prevent deadlock b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt_, s); llt_.commit(); - b.getBd().setFrame(frame); - logger.debug("write data frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId()); +// logger.debug("write data frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId()+" size "+frame.getChunks().size()); + logger.debug("allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId()); } } } @@ -238,11 +243,13 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { final Map> frames_ = new HashMap<>(); for (SyncFrame f : sb) { - final Table t = Instance.getInstance().getTableByName(f.getClassName()); - if (frames_.get(t.getObjectId()) == null) { - frames_.put(t.getObjectId(), new ArrayList<>()); + if (f.isAllowR()) { + final Table t = Instance.getInstance().getTableByName(f.getClassName()); + if (frames_.get(t.getObjectId()) == null) { + frames_.put(t.getObjectId(), new ArrayList<>()); + } + frames_.get(t.getObjectId()).add(f.getBd()); } - frames_.get(t.getObjectId()).add(f.getBd()); } for (Map.Entry> entry: frames_.entrySet()) { SQLCursor.addStreamFrame(new ContainerFrame(entry.getKey(), entry.getValue())); @@ -267,22 +274,4 @@ private void updateTransactions(HashMap rtran, Session s) { } } - private void updateTransFrames(ArrayList tframes, HashMap hmap2, Session s) throws Exception { - for (TransFrame tb : tframes) { - final long cframeid = hmap2.get(tb.getCframeId()); - if (cframeid == 0) { - throw new InternalException(); - } - final long uframeid = tb.getUframeId() == 0 ? 0 : hmap2.get(tb.getUframeId()); - if (tb.getUframeId() > 0) { - if (uframeid == 0) { - throw new InternalException(); - } - } - tb.setCframeId(cframeid); - tb.setUframeId(uframeid); - s.persist(tb); - } - } - }