Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/main/java/su/interference/core/GenericObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ this software and associated documentation files (the "Software"), to deal in
*/

public class GenericObject implements Serializable, GenericResult {
private final Map<String, Object> vmap;
private final static long serialVersionUID = 4330809121118587364L;
private final Map<String, Object> vmap;

protected GenericObject(Map<String, Object> vmap) {
protected GenericObject(Map<String, Object> vmap) {
this.vmap = vmap;
}

public Object getValueByName(String name) {
public Object getValueByName(String name) {
return vmap.get(name);
}
}
33 changes: 28 additions & 5 deletions src/main/java/su/interference/core/IndexFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ this software and associated documentation files (the "Software"), to deal in

package su.interference.core;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import su.interference.exception.*;
import su.interference.persistent.*;
import su.interference.serialize.ByteString;
Expand All @@ -36,22 +38,32 @@ this software and associated documentation files (the "Software"), to deal in
*/

public class IndexFrame extends Frame {
private final static Logger logger = LoggerFactory.getLogger(IndexFrame.class);
private boolean sorted = false;
private final boolean terminate;
public static final int INDEX_FRAME_NODE = 2;
public static final int INDEX_FRAME_LEAF = 1;
public static final int INITIALIZE_DURING_CONSTRUCT = 1;

public IndexFrame(int file, long pointer, int size, int objectId, Table t) throws InternalException {
super(file, pointer, size, t);
this.terminate = false;
}

public IndexFrame(FrameData bd, int frameType, Table t) throws InternalException {
super(bd, t);
this.setType(frameType);
this.terminate = false;
}

public IndexFrame() {
super(0, 0, 0, null);
this.terminate = true;
}

public IndexFrame(int file, long pointer, int size, FrameData bd, Table t, Class c, List<FrameData> uframes) throws Exception {
super(null, file, pointer, size, bd, t, c);
this.terminate = false;

Map<Integer, UndoChunk> ucs = new HashMap<>();
for (FrameData uframe : uframes) {
Expand Down Expand Up @@ -97,6 +109,7 @@ public IndexFrame(int file, long pointer, int size, FrameData bd, Table t, Class
public IndexFrame(byte[] b, int file, long pointer, Map<Long, Long> imap, Map<Long, Long> hmap, Table t) {
super(b, file, pointer, t);
int ptr = FRAME_HEADER_SIZE;
this.terminate = false;

final ByteString bs = new ByteString(this.b);
while (ptr<this.b.length) {
Expand All @@ -107,8 +120,9 @@ public IndexFrame(byte[] b, int file, long pointer, Map<Long, Long> imap, Map<Lo
if (h.getFramePtr() > 0) { //IOT does not contains frameptr
final long allocId = imap.get(h.getFramePtr());
final long bptr = hmap.get(allocId) != null ? hmap.get(allocId) : Instance.getInstance().getFrameByAllocId(allocId).getFrameId();
h.getFramePtrRowId().setFileId((int) bptr % 4096);
h.getFramePtrRowId().setFramePointer(bptr - (bptr % 4096));
final long fbptr = bptr%4096;
h.getFramePtrRowId().setFileId((int) fbptr);
h.getFramePtrRowId().setFramePointer(bptr - fbptr);
}
final DataChunk dc = new DataChunk(bs.substring(ptr, ptr+INDEX_HEADER_SIZE+h.getLen()), this.getFile(), this.getPointer(), INDEX_HEADER_SIZE, this.getDataObject(), this.getEntityClass());
dc.setHeader(h);
Expand Down Expand Up @@ -402,11 +416,16 @@ public synchronized ValueSet getMaxValue() throws InternalException {
}

public HashMap<Long, Long> getAllocateMap() {
final HashMap<Long, Long> imap = new HashMap<Long, Long>();
final HashMap<Long, Long> imap = new HashMap<>();
for (Chunk c : data.getChunks()) {
if (c.getHeader().getFramePtr() > 0) { //IOT does not contains frameptr
final long allocId = Instance.getInstance().getFrameById(c.getHeader().getFramePtr()).getAllocId();
imap.put(c.getHeader().getFramePtr(), allocId);
final FrameData bd = Instance.getInstance().getFrameById(c.getHeader().getFramePtr());
if (bd != null) {
final long allocId = bd.getAllocId();
imap.put(c.getHeader().getFramePtr(), allocId);
} else {
logger.error("getAllocaleMap found null data frame for frame id " + c.getHeader().getFramePtr());
}
}
}
return imap;
Expand Down Expand Up @@ -461,4 +480,8 @@ public synchronized void setLcId(long lcId) {
this.setRes05((int)lcF);
this.setRes07(lcId - lcF);
}

public boolean isTerminate() {
return terminate;
}
}
34 changes: 28 additions & 6 deletions src/main/java/su/interference/core/Instance.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public void rollback (Session s) {

public static boolean initParams (String[] params) {
final Config cfg = Config.getConfig();

/*
try {
new HTTPServer(cfg.MMPORT);
Expand All @@ -145,6 +146,7 @@ public static boolean initParams (String[] params) {
return false;
}
*/

return true;
}

Expand Down Expand Up @@ -625,7 +627,7 @@ public DataFile getDataFileById (int id) {

public ArrayList<DataFile> getDataFilesByType (int id) {
final Table t = getTableByName("su.interference.persistent.DataFile");
final ArrayList<DataFile> r = new ArrayList<DataFile>();
final ArrayList<DataFile> r = new ArrayList<>();
for (Object o : t.getIndexFieldByColumn("type").getIndex().getObjectsByKey(id)) {
r.add((DataFile)((DataChunk)o).getEntity());
}
Expand All @@ -649,13 +651,22 @@ public synchronized Session getSessionBySid (long sid) {
public synchronized Transaction getTransactionById (long transId) {
if (transId == 0) { return null; }
Table t = getTableByName("su.interference.persistent.Transaction");
DataChunk dc = ((DataChunk)t.getIndexFieldByColumn("transId").getIndex().getObjectByKey(transId));
DataChunk dc = ((DataChunk)t.getMapFieldByColumn("transId").getMap().get(transId));
if (dc==null) {
return null;
}
return (Transaction)dc.getEntity();
}

public List<Transaction> getTransactionsBySid (long id) {
final Table t = getTableByName("su.interference.persistent.Transaction");
final ArrayList<Transaction> r = new ArrayList<>();
for (Object o : t.getIndexFieldByColumn("sid").getIndex().getObjectsByKey(id)) {
r.add((Transaction)((DataChunk)o).getEntity());
}
return r;
}

public FreeFrame getFreeFrameById (long id) {
final Table t = getTableByName("su.interference.persistent.FreeFrame");
final DataChunk dc = (DataChunk)t.getIndexFieldByColumn("frameId").getIndex().getObjectByKey(id);
Expand Down Expand Up @@ -722,6 +733,17 @@ public List<TransFrame> getTransFramesByTransId(long transId) {
return res;
}

public String getEventSubscriberByEntityId (String id) {
final Table t = getTableByName("su.interference.persistent.EventSubscriber");
final MapField ixf = t.getMapFieldByColumn("entityId");
final Map ixl = ixf.getMap();
final DataChunk dc = (DataChunk)ixl.get(id);
if (dc != null) {
return ((EventSubscriber) dc.getEntity()).getSubscriberId();
}
return null;
}

//used in unlock table mechanism
@Deprecated
public synchronized ArrayList<TransFrame> getTransFrameByObjectId (int objectId) {
Expand Down Expand Up @@ -752,7 +774,7 @@ public RetrieveLock getRetrieveLockById(int obj, long tran) {
}

public ArrayList<RetrieveLock> getRetrieveLocksByObjectId(int obj) {
final ArrayList<RetrieveLock> r = new ArrayList<RetrieveLock>();
final ArrayList<RetrieveLock> r = new ArrayList<>();
final Table t = getTableByName("su.interference.persistent.RetrieveLock");
for (Object o : t.getIndexFieldByColumn("objectId").getIndex().getObjectsByKey(obj)) {
final RetrieveLock rl = (RetrieveLock)((DataChunk)o).getEntity();
Expand All @@ -763,9 +785,9 @@ public ArrayList<RetrieveLock> getRetrieveLocksByObjectId(int obj) {

public synchronized List<Transaction> getTransactions() {
final Table t = getTableByName("su.interference.persistent.Transaction");
final ArrayList<Transaction> res = new ArrayList<Transaction>();
for (Object o : t.getIndexFieldByColumn("transId").getIndex().getContent()) {
res.add((Transaction)((DataChunk)o).getEntity());
final ArrayList<Transaction> res = new ArrayList<>();
for (Object o : t.getMapFieldByColumn("transId").getMap().entrySet()) {
res.add((Transaction)((DataChunk)((Map.Entry)o).getValue()).getEntity());
}
return res;
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/su/interference/core/SyncFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class SyncFrame implements Comparable, Serializable, AllowRPredicate {
private final boolean allowR;
private final boolean proc;
private final boolean started;
private final boolean distributed;
private final Map<Long, Long> imap;
private final Map<Long, Transaction> rtran;
private final Map<Long, List<Long>> uframes;
Expand All @@ -73,6 +74,7 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb, boolean proc) throws Exce
final FrameData bd = Instance.getInstance().getFrameById(frame.getPtr());
allowR = frame.isLocal() ? !t.isNoTran() || t.getName().equals("su.interference.persistent.UndoChunk") : false;
this.proc = bd == null ? false : proc;
distributed = t.isDistributed();

if (bd == null && allowR) {
final FreeFrame fframe = Instance.getInstance().getFreeFrameById(frame.getPtr());
Expand Down Expand Up @@ -243,6 +245,10 @@ public Map<Long, List<Long>> getUFrames() {
return uframes;
}

public boolean isDistributed() {
return distributed;
}

public boolean equals (SyncFrame bl) {
return (this.getFile() == bl.getFile()) && (this.getPointer() == bl.getPointer());
}
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/su/interference/core/SyncQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ this software and associated documentation files (the "Software"), to deal in

import java.util.concurrent.*;
import java.util.*;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -117,8 +118,11 @@ private synchronized boolean syncFramesFromQueue() throws Exception {
for (FreeFrame fb : fframes) {
s.persist(fb);
}

//todo async process must depends from stop() method
pool2.submit(new TransportSyncTask(frames));

final List<SyncFrame> dframes = frames.stream().filter(p -> p.isDistributed()).collect(Collectors.toList());
pool2.submit(new TransportSyncTask(dframes));

running = false;
return true;
Expand Down
16 changes: 12 additions & 4 deletions src/main/java/su/interference/core/SystemCleanUp.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void cleanUpFrames() {
final long frameAmount = f.getDataObject().getFrameAmount();
if (f.getDataFile().isData() && cleanupDataEnabled()) {
f.decreasePriority();
if (f.isSynced() && f.getObjectId() > 999 && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) {
if (f.isSynced() && f.getObjectId() > 999 && frameAmount > getThr()) {
if (f.clearFrame()) {
d++;
}
Expand All @@ -107,7 +107,7 @@ private void cleanUpFrames() {
if (f.isSynced() && f.getFrameType() == IndexFrame.INDEX_FRAME_NODE) {
xn++;
}
if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && !f.isRbck() && frameAmount > Config.getConfig().IX_CLEANUP_PROTECTION_THR) {
if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && !f.isRbck() && frameAmount > getIxThr()) {
if (f.clearFrame()) {
x++;
}
Expand All @@ -117,7 +117,7 @@ private void cleanUpFrames() {
}
}
if (f.getDataFile().isTemp() && cleanupTempEnabled()) {
if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) {
if (f.isSynced() && f.getFrameType() != IndexFrame.INDEX_FRAME_NODE && frameAmount > getThr()) {
if (f.clearFrame()) {
i++;
}
Expand All @@ -127,7 +127,7 @@ private void cleanUpFrames() {
}
}
if (f.getDataFile().isUndo() && cleanupUndoEnabled()) {
if (f.isSynced() && frameAmount > Config.getConfig().CLEANUP_PROTECTION_THR) {
if (f.isSynced() && frameAmount > getThr()) {
if (f.clearFrame()) {
u++;
}
Expand Down Expand Up @@ -185,4 +185,12 @@ private boolean cleanupTempEnabled() {
// todo cleanup affects temp indices, disable until fix is released
return false;
}

private int getThr() {
return Config.getConfig().CLEANUP_PROTECTION_THR/(Config.getConfig().FRAMESIZE/4096);
}

private int getIxThr() {
return Config.getConfig().IX_CLEANUP_PROTECTION_THR/(Config.getConfig().FRAMESIZE/4096);
}
}
5 changes: 3 additions & 2 deletions src/main/java/su/interference/core/SystemInit.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
The MIT License (MIT)

Copyright (c) 2010-2019 head systems, ltd
Copyright (c) 2010-2021 head systems, ltd

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down Expand Up @@ -44,7 +44,7 @@ this software and associated documentation files (the "Software"), to deal in
*/

public class SystemInit {
private final static int INITIAL_CLASSES_AMT = 15;
private final static int INITIAL_CLASSES_AMT = 16;
private final static Logger logger = LoggerFactory.getLogger(SystemInit.class);

public static Table initSystem (boolean initStorage, int nodeType, Session s) throws Exception {
Expand All @@ -69,6 +69,7 @@ public static String[] getInitialClasses() {
initialClasses[12] = "su.interference.persistent.MgmtModule";
initialClasses[13] = "su.interference.persistent.Cursor";
initialClasses[14] = "su.interference.persistent.FrameSync";
initialClasses[15] = "su.interference.persistent.EventSubscriber";
return initialClasses;
}

Expand Down
30 changes: 30 additions & 0 deletions src/main/java/su/interference/core/TransFrameId.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,38 @@
/**
The MIT License (MIT)

Copyright (c) 2010-2021 head systems, ltd

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

*/

package su.interference.core;

import java.io.Serializable;

/**
* @author Yuriy Glotanov
* @since 1.0
*/

public class TransFrameId implements Serializable {
private final static long serialVersionUID = 2122989901000213555L;
private final long cframeId;
private final long uframeId;
private final long transId;
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/su/interference/core/ValueSet.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
The MIT License (MIT)

Copyright (c) 2010-2019 head systems, ltd
Copyright (c) 2010-2021 head systems, ltd

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
Expand Down Expand Up @@ -46,7 +46,7 @@ public Object[] getValueSet() {
}

public int compareTo (Object obj) {
return compare(obj, vs.length);
return compare(obj, ((ValueSet) obj).getValueSet().length);
}

//used in partial compare datachunks in sql group algorithm
Expand All @@ -55,12 +55,12 @@ public int compareTo (Object obj) {
public int compare (Object obj, int thr) {
final ValueSet j = (ValueSet)obj;

for (int i=0; i<vs.length; i++) {
final int ct = ((Comparable)this.vs[i]).compareTo(j.getValueSet()[i]);
for (int i = 0; i < vs.length; i++) {
final int ct = ((Comparable) this.vs[i]).compareTo(j.getValueSet()[i]);
if (ct != 0) {
return ct;
}
if (i==thr-1) {
if (i == thr - 1) {
break;
}
}
Expand Down
Loading