diff --git a/README.md b/README.md index 681f7db..25a5f0f 100644 --- a/README.md +++ b/README.md @@ -1,33 +1,111 @@ -# interference - -##### java-based distributed database platform -###### (c) 2010 - 2021 head systems, ltd -###### current revision: release 2021.1 -###### for detailed information see: -###### http://io.digital and doc/InterferenceManual.pdf -###### contacts: info@inteference.su +# interference open cluster + +##### pure-java opensource distributed database platform +(c) 2010 - 2021 head systems, ltd + +current revision: release 2021.1 +for detailed information see doc/InterferenceManual.pdf + +contacts: info@inteference.su ##### https://github.com/interference-project/interference +##### http://io.digital ## Concepts & features +######i.o.cluster also known as interference open cluster is a simple java framework enables you to launch a distributed database and complex event processing service within your java application, using JPA-like interface and annotations for structure mapping and data operations. This software inherits its name from the interference project, within which its mechanisms were developed. + +######i.o.cluster is a opensource, pure java software. + +The basic unit of the i.o.cluster service is a node - it can be a standalone running service, or a service running within some java application. + +Each i.o.cluster node has own persistent storage and can considered and used as a local database with following basic features: + - runs in the same JVM with your application - operates with simple objects (POJOs) - uses base JPA annotations for object mapping directly to persistent storage -- supports horizontal scaling SQL queries +- supports SQL queries with READ COMMITTED isolation level - supports transactions -- supports complex event processing (CEP) and simple streaming SQL +- supports complex event processing and streaming SQL - can be used as a local or distributed SQL database -- allows you to inserts data and run SQL queries from any node included in the cluster -- does not require the launch of any additional coordinators - uses the simple and fast serialization -- uses indices for fast access to data and increase performance of SQL joins +- uses persistent indices for fast access to data and increase performance of SQL joins +- allows flexible management of data in memory for stable operation of a node at any ratio of storage size / available memory, which allows, depending on the problem being solved, how to allocate all data directly in memory with a sufficient heap size, or use access to huge storages with a minimum heap size of java application + +Nodes can be joined into a cluster, at the cluster level with inter-node interactions, we get the following features: +- allows you to insert data and run SQL queries from any node included in the cluster +- support of horizontal scaling SQL queries with READ COMMITTED isolation level +- support of transparent cluster-level transactions +- support of complex event processing (CEP) and simple streaming SQL +- i.o.cluster nodes does not require the launch of any additional coordinators + +##Overview + +Initially, the service was designed in such a way that each node is a java application that can be launched both by sharing one JVM with the client application using the service, or autonomously. + +Each node uses its own storage and, being included in the cluster, replicates to other nodes all changes made on it, and also reflects changes made on other nodes. + +You can start the service of each specific node inside an application and use fast access to data inside the node, as well as execute queries that will automatically scale to other nodes in the cluster. + +Also from your java application, you can use remote client connections to the nodes of an existing cluster without the need to deploy a full service with its own storage (see Remote Client). + +Each of the nodes includes several mechanisms that ensure its operation: + +- core algorithms (supports structured persistent storage, supports indices, custom serialization, heap management, local and distributed sync processes) +- SQL and CEP processor +- event transport, which is used to exchange messages between nodes, as well as between a node and a client application +a brief diagram of the internal implementation of the service on the example of one node: + +![Screenshot](doc/interference.png) + +##Distributed persistent model -## NOTE: +To include a node in the cluster, you must specify the full list of cluster nodes (excluding this one) in the cluster.nodes configuration parameter. The minimum number of cluster nodes is 2, and the maximum is 64 (for more details, see cluster configuration rules below). -Interference is not a RDBMS in the classical sense, and it does -not support ddl operations (the table structure is created on the basis -of @Entity class JPA-compatible annotations). +After such configuration, we may start all configured nodes as cluster. In this case, all nodes will be use specific messages (events) for provide inter-node data consistency and horizontal-scaling queries. + +Interference open cluster is a decentralized system. This means that the cluster does not use any coordination nodes; instead, each node follows to a set of some formal rules of behavior that guarantee the integrity and availability of data within a certain interaction framework. + +Within the framework of these rules, all nodes of the Interference open cluster are equivalent. This means that there is no separation in the system of master and slave nodes - changes to user tables can be made from any node, also all changes are replicated to all nodes, regardless of which node they were made on. + +Running commit in a local user session automatically ensures that the changed data is visible on all nodes in the cluster. + +##Distribute rules + +The concept of interference open cluster is based on a simple basic requirement, which can be literally expressed as follows: we must allow insertion and modification of data at the cluster level from any node, and we must allow data retrieval from any node, using as much as possible the computing resources of the cluster as a whole. Further, we accept the condition that all cluster nodes must be healthy and powered on, if any of the nodes has been turned off for a while, it will not be turned on to receive data until her storage is synchronized with other nodes. In practice, in the absence of changes in the moment, this means that there are identical copies of the storage on the cluster nodes. To prevent conflicts of changes in cluster, several lock modes are used: + +- table level (a session on a node locks the entire table) +- frame level (a session on a node locks a frame) +- disallowed changes for non-owner nodes + +here it is necessary to explain in more detail: all data inserts on a certain node are performed into a frame which was allocated on the same node, for which, in turn, the node is the owner. This is done so that when there are simultaneous inserts into a table from several nodes at once, there are no conflicts during replication. Subsequently, this distinction allows us to understand whether or not to request permission to change the data in the frame at the cluster level or not. Moreover, it allows us to implement a mode when changes to frames on a non-owner node are prohibited. This mode is used on cluster nodes if one or more other nodes become unavailable (we cannot know for certain whether the node is down or there is a problem in the network connection). + +Thus, let's repeat again: + +- all cluster nodes should be equivalent +- all changes on any of the nodes are mapped to other nodes immediately +- data inserts are performed in local storage structure, and then the changes are replicated to other nodes. +- if replication is not possible (the node is unavailable or the connection is broken), a persistent change queue is created for this node +- the owner of any data frame is the node on which this frame has been allocated +- data changes in node own dataframe are performed immediately, else, performed distributed lock for dataframe on cluster level +- if cluster is failed (some node are offline or connection broken), all data changes are not allowed or changes in only node own dataframes allowed +- the cluster uses the generation of unique identifiers for entities (@DistributedId annotation) so that the identifier is unique within the cluster, but not just within the same node +- the cluster does not use any additional checks for uniqueness, requiring locks at the cluster level + +##SQL horizontal-scaling queries + +All SQL queries called on any of the cluster nodes will be automatically distributed among the cluster nodes for parallel processing, if such a decision is made by the node based on the analysis of the volume of tasks (the volume of the query tables is large enough, etc.) +If during the processing of a request a node is unavailable, the task distributed for this node will be automatically rescheduled to another available node. + +##Complex event processing concepts + +So, we must allow insertion and modification of data at the cluster level from any node, and we must allow data retrieval from any node, using as much as possible the computing resources of the cluster as a whole. + +The next concept of interference open cluster is that any table is at the same time a queue, in particular, using the SELECT STREAM clause, we can retrieve records in exactly the same order in which they were added. In general, at the cluster level, the session.persist() operation can be considered as publishing a persistent event. Based on our basic distribution rules, we send this event to all nodes. + +Interference open cluster does not currently support the standard DML UPDATE and DELETE operations, instead for bulk table processing (including the optional WHERE clause) we have implemented PROCESS and PROCESS STREAM clauses that allow us to process each record from a selection of one of the EventProcessor interface implementations. + +On the one hand, this approach allows us to obtain results similar to those that we would achieve using UPDATE and DELETE, on the other hand, it significantly expands the possibilities for custom processing of records, allowing full event processing. For the sake of fairness, it is need noting that you can get similar results using standard SELECT and SELECT STREAM, using some custom code to process the result set, but PROCESS and PROCESS STREAM implement processing at the core level of the cluster, which significantly improve the performance, second, this statements are launched at the cluster level and provide a ready-made implementation for distributed event processing. ## Quick Start Application @@ -60,15 +138,21 @@ Next, specify the necessary set of keys in the project ``` -Dsu.interference.config=interference.properties +-verbose:gc +-Xloggc:/ioclustergc.log +-XX:+PrintGCDetails +-XX:+PrintGCDateStamps +-XX:+AggressiveOpts +-Xms1G +-Xmx4G +-XX:MaxMetaspaceSize=256m +-XX:+UseStringDeduplication +-XX:ParallelGCThreads=4 +-XX:ConcGCThreads=2 +-Dlogback.configurationFile=config/app-log-config.xml -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=8888 --Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false --Xms1g --Xmx4g --XX:MaxMetaspaceSize=256m --XX:ParallelGCThreads=8 --XX:ConcGCThreads=4 ``` diff --git a/config/properties b/config/properties index 47cc649..5c0a6fb 100644 --- a/config/properties +++ b/config/properties @@ -49,6 +49,7 @@ sync.lock.enable=true # size of blocking queue, which use in SQL retrieve # mechanism for prevent of heap overload +# NOTE: too small values for large frame sizes may cause locks on queue.put() during distributed processing retrieve.queue.size=100000 # the number of threads for parallel processing of the SQL query diff --git a/doc/InterferenceManual.pdf b/doc/InterferenceManual.pdf index 805b7d9..c11ccff 100644 Binary files a/doc/InterferenceManual.pdf and b/doc/InterferenceManual.pdf differ diff --git a/doc/interference.png b/doc/interference.png new file mode 100644 index 0000000..44ad77c Binary files /dev/null and b/doc/interference.png differ diff --git a/src/main/java/su/interference/core/Config.java b/src/main/java/su/interference/core/Config.java index 8214a99..3ee86cc 100644 --- a/src/main/java/su/interference/core/Config.java +++ b/src/main/java/su/interference/core/Config.java @@ -106,6 +106,8 @@ public class Config { private static final int HEAP_USE_THR_INDX_DEFAULT = 60; private static final int HEAP_USE_THR_TEMP_DEFAULT = 40; private static final int HEAP_USE_THR_UNDO_DEFAULT = 50; + // locks and processing + private static final boolean IGNORE_COMMAND_CHANNEL_FAILURES_DEFAULT = true; public final int LOCAL_NODE_ID; public final String DB_PATH; @@ -141,6 +143,8 @@ public class Config { // internal public final int TEST_DISTRIBUTE_MODE = 1; public final int CHECK_AVAIL_FRAME_TIMEOUT = 3000; + // locks and processing + public final boolean IGNORE_COMMAND_CHANNEL_FAILURES = true; private final Properties p; diff --git a/src/main/java/su/interference/core/Frame.java b/src/main/java/su/interference/core/Frame.java index 6535862..c03da68 100644 --- a/src/main/java/su/interference/core/Frame.java +++ b/src/main/java/su/interference/core/Frame.java @@ -450,9 +450,11 @@ public synchronized int updateChunk(DataChunk chunk, Object o, Session s, LLT ll return chunk.getBytesAmount(); } - public synchronized void deleteChunk (int ptr, Session s, LLT llt) { + public synchronized void deleteChunk (int ptr, Session s, LLT llt, boolean ignoreNoLocal) { if (!this.isLocal()) { - throw new CannotAccessToForeignRecord(); + if (!ignoreNoLocal) { + throw new CannotAccessToForeignRecord(); + } } final Transaction tran = s.getTransaction(); final long sync = LLT.getSyncId(); diff --git a/src/main/java/su/interference/core/IndexFrame.java b/src/main/java/su/interference/core/IndexFrame.java index 5161950..417b823 100644 --- a/src/main/java/su/interference/core/IndexFrame.java +++ b/src/main/java/su/interference/core/IndexFrame.java @@ -424,7 +424,7 @@ public HashMap getAllocateMap() { final long allocId = bd.getAllocId(); imap.put(c.getHeader().getFramePtr(), allocId); } else { - logger.error("getAllocaleMap found null data frame for frame id " + c.getHeader().getFramePtr()); + logger.error("getAllocateMap found null data frame for frame id " + c.getHeader().getFramePtr()); } } } diff --git a/src/main/java/su/interference/core/Instance.java b/src/main/java/su/interference/core/Instance.java index 4ba7cf7..d43e66f 100644 --- a/src/main/java/su/interference/core/Instance.java +++ b/src/main/java/su/interference/core/Instance.java @@ -49,7 +49,7 @@ this software and associated documentation files (the "Software"), to deal in public class Instance implements Interference { public static final String RELEASE = "2021.1"; - public static final int SYSTEM_VERSION = 20210905; + public static final int SYSTEM_VERSION = 20210926; public static final String DATA_FILE = "datafile"; public static final String INDX_FILE = "indxfile"; diff --git a/src/main/java/su/interference/core/RetrieveQueue.java b/src/main/java/su/interference/core/RetrieveQueue.java index 84cecfe..aab5716 100644 --- a/src/main/java/su/interference/core/RetrieveQueue.java +++ b/src/main/java/su/interference/core/RetrieveQueue.java @@ -24,7 +24,10 @@ this software and associated documentation files (the "Software"), to deal in package su.interference.core; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import su.interference.persistent.Session; + import java.util.concurrent.LinkedBlockingQueue; /** @@ -33,6 +36,7 @@ this software and associated documentation files (the "Software"), to deal in */ public class RetrieveQueue { + private final static Logger logger = LoggerFactory.getLogger(RetrieveQueue.class); private final LinkedBlockingQueue q; private final ManagedCallable r; private volatile boolean retrieve = true; @@ -53,7 +57,7 @@ public synchronized Object poll(Session s) { return c.getEntity(s); } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during retrieveQueue.poll", e); } return null; } @@ -69,7 +73,7 @@ public synchronized Chunk cpoll() { return c; } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during retrieveQueue.cpoll", e); } return null; } diff --git a/src/main/java/su/interference/core/SyncFrame.java b/src/main/java/su/interference/core/SyncFrame.java index c34aff2..5ec5eb0 100644 --- a/src/main/java/su/interference/core/SyncFrame.java +++ b/src/main/java/su/interference/core/SyncFrame.java @@ -72,7 +72,8 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb) throws Exception { public SyncFrame(Frame frame, Session s, FreeFrame fb, boolean proc) throws Exception { final Table t = Instance.getInstance().getTableById(frame.getObjectId()); final FrameData bd = Instance.getInstance().getFrameById(frame.getPtr()); - allowR = frame.isLocal() ? !t.isNoTran() || t.getName().equals("su.interference.persistent.UndoChunk") : false; + //allowR = frame.isLocal() || bd.isLockedLocally() ? !t.isNoTran() || t.getName().equals("su.interference.persistent.UndoChunk") : false; + allowR = !t.isNoTran() || (frame.isLocal() && t.getName().equals("su.interference.persistent.UndoChunk")); this.proc = bd == null ? false : proc; distributed = t.isDistributed(); @@ -113,7 +114,7 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb, boolean proc) throws Exce logger.info("evicted frame caused an NPE during SyncFrame construction id = " + frame.getPtr()); } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during SyncFrame.init", e); } parentId = 0; lcId = 0; diff --git a/src/main/java/su/interference/core/SyncQueue.java b/src/main/java/su/interference/core/SyncQueue.java index aeca4ae..047732b 100644 --- a/src/main/java/su/interference/core/SyncQueue.java +++ b/src/main/java/su/interference/core/SyncQueue.java @@ -134,7 +134,7 @@ public void commit() throws Exception { try { Thread.sleep(5); } catch (InterruptedException e) { - e.printStackTrace(); + logger.error("exception occured", e); } } } @@ -146,14 +146,14 @@ public void run () { try { syncFramesFromQueue(); } catch(Exception e) { - e.printStackTrace(); + logger.error("exception occured during sync queue process", e); } try { final int period = Config.getConfig().SYNC_PERIOD; Thread.sleep(period); } catch (InterruptedException e) { - e.printStackTrace(); + logger.error("exception occured", e); } latch.countDown(); } diff --git a/src/main/java/su/interference/core/SyncTask.java b/src/main/java/su/interference/core/SyncTask.java index a0c5fda..54ccd43 100644 --- a/src/main/java/su/interference/core/SyncTask.java +++ b/src/main/java/su/interference/core/SyncTask.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -24,6 +24,8 @@ this software and associated documentation files (the "Software"), to deal in package su.interference.core; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import su.interference.metrics.Metrics; import su.interference.persistent.DataFile; @@ -40,6 +42,7 @@ public class SyncTask implements Callable { @SuppressWarnings("unchecked") private final PriorityBlockingQueue pq = new PriorityBlockingQueue(); private final DataFile df; + private final static Logger logger = LoggerFactory.getLogger(SyncTask.class); public SyncTask(DataFile df) { this.df = df; @@ -67,7 +70,7 @@ public Integer call() { Metrics.get("syncFrames").stop(); } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during sync task process", e); } return 0; } diff --git a/src/main/java/su/interference/persistent/FrameData.java b/src/main/java/su/interference/persistent/FrameData.java index 259f0a4..319b0a4 100644 --- a/src/main/java/su/interference/persistent/FrameData.java +++ b/src/main/java/su/interference/persistent/FrameData.java @@ -29,6 +29,8 @@ this software and associated documentation files (the "Software"), to deal in import su.interference.core.*; import su.interference.mgmt.MgmtColumn; import su.interference.sql.FrameApi; +import su.interference.transport.CommandEvent; +import su.interference.transport.TransportSyncTask; import javax.persistence.Entity; import javax.persistence.Column; @@ -196,7 +198,11 @@ public ArrayList getFrameChunks(Session s) throws Exception { public ArrayList getFrameEntities(Session s) throws Exception { final ArrayList res = new ArrayList<>(); for (Chunk c : getFrameChunks(s)) { - res.add(((DataChunk)c).getEntity()); + if (isIndex() || isNoTran()) { + res.add(c.getEntity()); + } else { + res.add(((EntityContainer) c.getEntity()).getEntity(s)); + } } return res; } @@ -229,6 +235,10 @@ public boolean isIndex() { return getDataObject().isIndex(); } + public boolean isNoTran() { + return getDataObject().isNoTran(); + } + public boolean isFrame() { return frame != null; } @@ -349,8 +359,8 @@ public void removeChunk(int ptr, Session s, LLT llt) throws Exception { this.getFrame().removeChunk(ptr, llt, false); } - public void deleteChunk(int ptr, Session s, LLT llt) throws Exception { - this.getFrame().deleteChunk(ptr, s, llt); + public void deleteChunk(int ptr, Session s, LLT llt, boolean ignoreNoLocal) throws Exception { + this.getFrame().deleteChunk(ptr, s, llt, ignoreNoLocal); } public DataFile getDataFile() { @@ -520,6 +530,11 @@ public synchronized void rollbackTransaction(Transaction tran, ArrayList 0; + } + public java.lang.reflect.Field getIdField() { return this.idfield; } @@ -848,7 +856,7 @@ public Table (DataChunk chunk, IndexList ixl) throws IllegalAccessException, Cla } } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during Table.init", e); } } @@ -1074,7 +1082,7 @@ public void usedSpace (final FrameData bd, final int used, final boolean persist s.persist(this, llt); logger.debug("deallocate frame " + bd.getObjectId() + ":" + bd.getFile() + ":" + bd.getPtr() + " " + Thread.currentThread().getName()); } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during usedSpace", e); } } } else { @@ -1086,7 +1094,7 @@ public void usedSpace (final FrameData bd, final int used, final boolean persist bd_.updateChunk(dc, bd, s, llt); } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during usedSpace", e); } } } @@ -1386,10 +1394,7 @@ protected void delete (final Object o, final Session s, LLT extllt, boolean igno final FrameData bd = Instance.getInstance().getFrameById(dc.getHeader().getRowID().getFileId()+dc.getHeader().getRowID().getFramePointer()); if (!bd.getFrame().isLocal()) { - if (ignoreNoLocal) { - //todo - throw new CannotAccessToForeignRecord(); - } else { + if (!ignoreNoLocal) { throw new CannotAccessToForeignRecord(); } } @@ -1413,11 +1418,11 @@ protected void delete (final Object o, final Session s, LLT extllt, boolean igno } if (ignoreTransaction || udc == null) { - deleteIndexes(dc, noTran, true, s, llt); + deleteIndexes(dc, noTran, true, s, llt, ignoreNoLocal); bd.removeChunk(dc.getHeader().getRowID().getRowPointer(), s, llt); } else { - deleteIndexes(dc, noTran, false, s, llt); - bd.deleteChunk(dc.getHeader().getRowID().getRowPointer(), s, llt); + deleteIndexes(dc, noTran, false, s, llt, ignoreNoLocal); + bd.deleteChunk(dc.getHeader().getRowID().getRowPointer(), s, llt, ignoreNoLocal); } removeIndexValue(dc); @@ -1440,7 +1445,7 @@ private void persistIndexes(DataChunk c, Session s, LLT llt) throws Exception { } } - private void deleteIndexes(DataChunk dc, boolean noTran, boolean remove, Session s, LLT llt) throws Exception { + private void deleteIndexes(DataChunk dc, boolean noTran, boolean remove, Session s, LLT llt, boolean ignoreNoLocal) throws Exception { for (IndexDescript ids : this.getIndexNames()) { final DataChunk ic = dc.getIc(ids, s); final int iclen = ic.getBytesAmount(); @@ -1449,7 +1454,7 @@ private void deleteIndexes(DataChunk dc, boolean noTran, boolean remove, Session if (remove) { ibd.removeChunk(ic.getHeader().getRowID().getRowPointer(), s, llt); } else { - ibd.deleteChunk(ic.getHeader().getRowID().getRowPointer(), s, llt); + ibd.deleteChunk(ic.getHeader().getRowID().getRowPointer(), s, llt, ignoreNoLocal); } if (noTran) { usedSpace(ibd, ibd.getUsed() - iclen, true, s, llt); @@ -1517,7 +1522,7 @@ public void lockTable(Session s) { try { s.persist(new RetrieveLock(this.objectId, s.getTransaction().getTransId())); //insert } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during table lock", e); } } @@ -1542,7 +1547,7 @@ public void unlockTable(Session s) { s.delete(rl); } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during table unlock", e); } } @@ -1697,7 +1702,7 @@ public Object poll(Session s) { final RetrieveQueue rq = s.getContentQueue(this); return rq.poll(s); } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during table poll", e); } return null; } @@ -1707,7 +1712,7 @@ public Chunk cpoll(Session s) { final RetrieveQueue rq = s.getContentQueue(this); return rq.cpoll(); } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during table cpoll", e); } return null; } @@ -2338,6 +2343,42 @@ private synchronized void removeObjects(ValueSet key, Object o, Session s, LLT l } } + public synchronized boolean lock(long transId) { + if (this.lock.compareAndSet(0, transId)) { + logger.info(this.getName()+" successfully locked"); + return true; + } + return false; + } + + public synchronized boolean unlock(long transId) { + if (this.lock.compareAndSet(transId, 0)) { + logger.info(this.getName()+" successfully unlocked"); + return true; + } + return false; + } + + public synchronized boolean clusterlock(long transId) throws Exception { + if (this.lock(transId)) { + if (TransportSyncTask.sendNoPersistBroadcastCommand(CommandEvent.LOCK_TABLE, transId, this.objectId)) { + return true; + } + this.unlock(transId); + } + return false; + } + + public synchronized boolean clusterunlock(long transId) throws Exception { + if (this.unlock(transId)) { + if (TransportSyncTask.sendNoPersistBroadcastCommand(CommandEvent.UNLOCK_TABLE, transId, this.objectId)) { + return true; + } + this.lock(transId); + } + return false; + } + public int getObjectId() { return objectId; } diff --git a/src/main/java/su/interference/persistent/Transaction.java b/src/main/java/su/interference/persistent/Transaction.java index 31f3f12..87337c7 100644 --- a/src/main/java/su/interference/persistent/Transaction.java +++ b/src/main/java/su/interference/persistent/Transaction.java @@ -74,7 +74,7 @@ public class Transaction implements Serializable { private long timeStamp; @Column @MgmtColumn(width=10, show=true, form=false, edit=false) - private int transType; // 0 - READ COMMITTED, 1 - SERIALIZABLE, 9 - THR + private int transType; // 0 - READ COMMITTED, 1 - SERIALIZABLE, 9 - THR @Column @MgmtColumn(width=10, show=true, form=false, edit=false) private long mTran; @@ -83,13 +83,13 @@ public class Transaction implements Serializable { private long cid; @Transient - private final List tframes = new CopyOnWriteArrayList<>(); + private final transient List tframes = new CopyOnWriteArrayList<>(); @Transient - private final Set rframes = new HashSet<>(); + private final transient Set rframes = new HashSet<>(); @Transient private final transient WaitFrame[] lbs; @Transient - private final AtomicInteger avframeStart = new AtomicInteger(0); + private final transient AtomicInteger avframeStart = new AtomicInteger(0); @Transient private transient SQLJoin join; @Transient @@ -114,6 +114,19 @@ public Transaction () { this.timeStamp = new Date().getTime(); } + public Transaction (Transaction tran) { + this.lbs = new WaitFrame[Config.getConfig().FILES_AMOUNT]; + for (int i=0; i 0) { // undo transframe record - final List rls = Instance.getInstance().getRetrieveLocksByObjectId(tb.getObjectId()); - if (rls.size() == 0) { - final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); - cb.decreaseTcounter(this.transId); - } - } else { //change transframe record - final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); - cb.setUsed(cb.getUsed() + tb.getDiff()); - cb.decreaseTcounter(this.transId); - if (cb.getUsed() == 0) { - freeFrames(cb, s); - } else { - s.persist(cb); //update new size value to dataframe - } - } + } else { + logger.warn("commit should not be applied to transaction on remote node"); + } + } + + try { + for (Long frameId : rframes) { + Instance.getInstance().getFrameById(frameId).decreaseTcounter(this.transId); + } + } catch (Exception e) { + logger.error("exception occured during transaction commit", e); + } + + try { + for (TransFrame tb : tframes) { + if (tb.getUframeId() > 0) { // undo transframe record + final List rls = Instance.getInstance().getRetrieveLocksByObjectId(tb.getObjectId()); + if (rls.size() == 0) { + final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); + cb.decreaseTcounter(this.transId); } + } else { //change transframe record + final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); + cb.setUsed(cb.getUsed() + tb.getDiff()); + cb.decreaseTcounter(this.transId); + if (cb.getUsed() == 0) { + freeFrames(cb, s); + } else { + s.persist(cb); //update new size value to dataframe + } + } + } - this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); - this.transType = TRAN_THR; - s.persist(this); - syncq.commit(); + } catch (Exception e) { + logger.error("exception occured during transaction commit", e); + } + + try { + this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); + this.transType = TRAN_THR; + s.persist(this); + syncq.commit(); + if (!remote) { + if (isLocal()) { if (this.join != null) { join.deallocate(s); } - } catch (Exception e) { - e.printStackTrace(); } - } else { - logger.warn("commit cannot be applied to transaction on remote node"); } + } catch (Exception e) { + logger.error("exception occured during transaction commit", e); } + + rframes.clear(); tframes.clear(); started = false; logger.info("Transaction committed"); @@ -274,115 +294,124 @@ public synchronized void commit (Session s, boolean remote) { public synchronized void rollback (Session s, boolean remote) { final ArrayList ubd1 = new ArrayList<>(); final ArrayList ubd2 = new ArrayList<>(); + if (remote) { + if (isLocal()) { + logger.warn("remote rollback should not be applied to transaction on init node"); + } + } else { if (!isLocal()) { - try { - for (Long frameId : rframes) { - Instance.getInstance().getFrameById(frameId).decreaseTcounter(this.transId); - } - this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); - this.transType = TRAN_THR; - s.persist(this); //update - rframes.clear(); + logger.warn("rollback should not be applied to transaction on remote node"); + } + } + + try { + for (Long frameId : rframes) { + Instance.getInstance().getFrameById(frameId).decreaseTcounter(this.transId); + } /* if (this.join != null) { join.deallocate(s); } */ - } catch (Exception e) { - e.printStackTrace(); + } catch (Exception e) { + logger.error("exception occured during transaction rollback", e); + } + + try { + Collections.sort(tframes); + + for (TransFrame tb : tframes) { + final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); + if (cb.isIndex()) { + if (!ubd2.contains(cb)) { + ubd2.add(cb); + } + } else { + if (!ubd1.contains(cb)) { + ubd1.add(cb); + } } - } else { - logger.warn("remote rollback cannot be applied to transaction on init node"); } - } else { - if (isLocal()) { - try { - Collections.sort(tframes); - - for (TransFrame tb : tframes) { - final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); - if (cb.isIndex()) { - if (!ubd2.contains(cb)) { - ubd2.add(cb); - } - } else { - if (!ubd1.contains(cb)) { - ubd1.add(cb); - } + for (FrameData ub : ubd2) { + final ArrayList ubs = new ArrayList<>(); + for (TransFrame tb : tframes) { + if (ub.getFrameId() == tb.getCframeId()) { + if (tb.getUframeId() > 0) { + final FrameData ubb = Instance.getInstance().getFrameById(tb.getUframeId()); + ubs.add(ubb); } } - for (FrameData ub : ubd2) { - final ArrayList ubs = new ArrayList<>(); - for (TransFrame tb : tframes) { - if (ub.getFrameId() == tb.getCframeId()) { - if (tb.getUframeId() > 0) { - final FrameData ubb = Instance.getInstance().getFrameById(tb.getUframeId()); - ubs.add(ubb); - } - } - } + } - if (ub.isIndex()) { - ub.setRbck(true); - ub.rollbackTransaction(this, ubs, s); + if (ub.isIndex()) { + ub.setRbck(true); + ub.rollbackTransaction(this, ubs, s); + } + } + for (FrameData ub : ubd1) { + final ArrayList ubs = new ArrayList<>(); + for (TransFrame tb : tframes) { + if (ub.getFrameId() == tb.getCframeId()) { + if (tb.getUframeId() > 0) { + final FrameData ubb = Instance.getInstance().getFrameById(tb.getUframeId()); + ubs.add(ubb); } } - for (FrameData ub : ubd1) { - final ArrayList ubs = new ArrayList<>(); - for (TransFrame tb : tframes) { - if (ub.getFrameId() == tb.getCframeId()) { - if (tb.getUframeId() > 0) { - final FrameData ubb = Instance.getInstance().getFrameById(tb.getUframeId()); - ubs.add(ubb); - } - } - } + } - if (!ub.isIndex()) { - ub.rollbackTransaction(this, ubs, s); - } - } - for (FrameData ub : ubd2) { - final Frame frame_ = ub.getFrame(); - if (frame_ instanceof IndexFrame) { - ub.setRbck(false); - ((IndexFrame) frame_).cleanICEntities(); - } - } + if (!ub.isIndex()) { + ub.rollbackTransaction(this, ubs, s); + } + } + for (FrameData ub : ubd2) { + final Frame frame_ = ub.getFrame(); + if (frame_ instanceof IndexFrame) { + ub.setRbck(false); + ((IndexFrame) frame_).cleanICEntities(); + } + } - for (TransFrame tb : tframes) { - if (tb.getUframeId() > 0) { // undo transframe record - final List rls = Instance.getInstance().getRetrieveLocksByObjectId(tb.getObjectId()); - if (rls.size() == 0) { - final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); - if (cb != null) { - cb.decreaseTcounter(this.transId); - } - } - } else { //change transframe record - final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); + for (TransFrame tb : tframes) { + if (tb.getUframeId() > 0) { // undo transframe record + final List rls = Instance.getInstance().getRetrieveLocksByObjectId(tb.getObjectId()); + if (rls.size() == 0) { + final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); + if (cb != null) { cb.decreaseTcounter(this.transId); - if (cb.getUsed() == 0) { - logger.info("rollback freeing frame " + cb.getFile() + " " + cb.getPtr()); - freeFrames(cb, s); - } } } - this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); - this.transType = TRAN_THR; - s.persist(this); //update - if (this.join != null) { - join.deallocate(s); + } else { //change transframe record + final FrameData cb = Instance.getInstance().getFrameById(tb.getCframeId()); + cb.decreaseTcounter(this.transId); + if (cb.getUsed() == 0) { + logger.info("rollback freeing frame " + cb.getFile() + " " + cb.getPtr()); + freeFrames(cb, s); } - } catch (Exception e) { - e.printStackTrace(); } + } + if (this.join != null) { + join.deallocate(s); + } + } catch (Exception e) { + logger.error("exception occured during transaction rollback", e); + } + + try { + this.cid = Instance.getInstance().getTableByName(this.getClass().getName()).getIncValue(s, null); + this.transType = TRAN_THR; + s.persist(this); + } catch (Exception e) { + logger.error("exception occured during transaction rollback", e); + } + + if (!remote) { + if (isLocal()) { sendBroadcastEvents(CommandEvent.ROLLBACK, s); - } else { - logger.warn("rollback cannot be applied to transaction on remote node"); } } + + rframes.clear(); tframes.clear(); started = false; logger.info("Transaction rolled back"); @@ -419,7 +448,7 @@ public void unlockUndoFrames (int objectId, Session s) throws InternalException } } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during unlock undo frames", e); } } @@ -488,7 +517,7 @@ public synchronized void startTransaction(Session s, LLT llt) throws Exception { try { s.persist(this, llt); //update } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during start transaction", e); } } started = true; @@ -499,7 +528,7 @@ protected void startStatement (final Session s) { try { startTransaction(s, null); } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during start statement", e); } } if (this.transType==TRAN_READ_COMMITTED) { @@ -508,7 +537,7 @@ protected void startStatement (final Session s) { this.mTran = t.getIncValue(s, null); s.persist(this); //update } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during start statement", e); } } } @@ -519,7 +548,7 @@ protected void startStatement (final Session s, LLT llt) { try { startTransaction(s, llt); } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during start statement", e); } } if (this.transType==TRAN_READ_COMMITTED) { @@ -527,7 +556,7 @@ protected void startStatement (final Session s, LLT llt) { this.mTran = t.getIncValue(s, llt); s.persist(this, llt); //update } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during start statement", e); } } } @@ -541,7 +570,7 @@ protected void storeFrame (final FrameData cb, final FrameData ub, final int len try { s.persist(tb, llt); //update } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during store frame", e); } return; } @@ -555,7 +584,7 @@ protected void storeFrame (final FrameData cb, final FrameData ub, final int len this.tframes.add(ntb); } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during store frame", e); } } @@ -563,7 +592,7 @@ private void sendBroadcastEvents(int command, Session s) { try { TransportSyncTask.sendBroadcastCommand(command, this.transId, s); } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during send broadcast events", e); } } diff --git a/src/main/java/su/interference/sql/NestedCondition.java b/src/main/java/su/interference/sql/NestedCondition.java index 02002d3..58e08f8 100644 --- a/src/main/java/su/interference/sql/NestedCondition.java +++ b/src/main/java/su/interference/sql/NestedCondition.java @@ -595,7 +595,7 @@ public ArrayList getAllConditions() { return res; } - public SQLJoinDispatcher getJoinDispatcher(FrameIterator lbi, FrameIterator rbi, List rscols, IndexDescript leadingIndex, Session s) throws Exception { + public SQLJoinDispatcher getJoinDispatcher(FrameIterator lbi, FrameIterator rbi, List rscols, IndexDescript leadingIndex, boolean process, Session s) throws Exception { ArrayList cs = this.conditions; int ctype = this.type; //in cursor we can not guaranteed uniqueness of base unique field @@ -618,9 +618,9 @@ public SQLJoinDispatcher getJoinDispatcher(FrameIterator lbi, FrameIterator rbi, final SQLColumn ccr = getSQLColumnByAlias(rscols, jc.getConditionColumnRight().getAlias()); //if (jc.getConditionColumn().isIndexOrUnique()||jc.getConditionColumnRight().isIndexOrUnique()) { if (lbi.getObjectIds().contains(jc.getConditionColumn().getObjectId()) && rbi.getObjectIds().contains(jc.getConditionColumnRight().getObjectId())) { - jlist.add(new SQLJoinDispatcher(lbi, rbi, cc, ccr, getSkipCheck(jc), this, leadingIndex, s)); + jlist.add(new SQLJoinDispatcher(lbi, rbi, cc, ccr, getSkipCheck(jc), this, leadingIndex, process, s)); } else if (lbi.getObjectIds().contains(jc.getConditionColumnRight().getObjectId()) && rbi.getObjectIds().contains(jc.getConditionColumn().getObjectId())) { - jlist.add(new SQLJoinDispatcher(rbi, lbi, cc, ccr, getSkipCheck(jc), this, leadingIndex, s)); + jlist.add(new SQLJoinDispatcher(rbi, lbi, cc, ccr, getSkipCheck(jc), this, leadingIndex, process, s)); } //} } diff --git a/src/main/java/su/interference/sql/SQLCursor.java b/src/main/java/su/interference/sql/SQLCursor.java index 99dcb8e..7c1025e 100644 --- a/src/main/java/su/interference/sql/SQLCursor.java +++ b/src/main/java/su/interference/sql/SQLCursor.java @@ -153,18 +153,18 @@ public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition this.rscols = rscols; } - hmap = nc.getJoinDispatcher(lbi, rbi, this.rscols, this.leadingIndex, s); + hmap = nc.getJoinDispatcher(lbi, rbi, this.rscols, this.leadingIndex, this.process, s); //todo move to SQLJoinDispatcher if (hmap == null && rbi == null) { final ValueCondition vc = nc.getIndexVC(lbi, null); if (vc != null) { final Table lt = Instance.getInstance().getTableById(lbi.getObjectId()); - this.lbi = new SQLIndex(vc.getConditionColumn().getIndex(), lt, true, vc.getConditionColumn(), vc.getConditionColumn(), false, nc, 0, s); + this.lbi = new SQLIndex(vc.getConditionColumn().getIndex(), lt, true, vc.getConditionColumn(), vc.getConditionColumn(), false, nc, 0, this.process, s); this.rbi = rbi; } else if (this.leadingIndex != null && lbi.getObjectId() == leadingIndex.getT().getObjectId()) { final Table lt = Instance.getInstance().getTableById(lbi.getObjectId()); - this.lbi = new SQLIndex(leadingIndex.getIndex(), lt, true, null, null, false, nc, 0, true, s); + this.lbi = new SQLIndex(leadingIndex.getIndex(), lt, true, null, null, false, nc, 0, true, this.process, s); this.leadingIndex.accept(); this.rbi = null; } else { @@ -235,10 +235,14 @@ public void run() { if (bd1 != null) { if (rbi == null) { - if (ns[i] == Config.getConfig().LOCAL_NODE_ID) { - ltasks.put(new FrameApiJoin(ns[i], c1, lbi, bd1, null)); + if (lbi.noDistribute()) { + ltasks.put(new FrameApiJoin(Config.getConfig().LOCAL_NODE_ID, c1, lbi, bd1, null)); } else { - rtasks__.put(new FrameApiJoin(ns[i], c1, lbi, bd1, null)); + if (ns[i] == Config.getConfig().LOCAL_NODE_ID) { + ltasks.put(new FrameApiJoin(ns[i], c1, lbi, bd1, null)); + } else { + rtasks__.put(new FrameApiJoin(ns[i], c1, lbi, bd1, null)); + } } i++; if (i == ns.length) { @@ -283,7 +287,7 @@ public void run() { Runnable send = new Runnable() { @Override public void run() { - Thread.currentThread().setName("interference sql cursor remote send "+Thread.currentThread().getId()); + Thread.currentThread().setName("interference-sql-cursor-remote-send-"+Thread.currentThread().getId()); try { final Map> joins = new HashMap<>(); for (Integer nodeId : ns) { @@ -420,7 +424,7 @@ public void run() { target.persist(new ResultSetTerm(), s); } catch (Exception e) { ((StreamQueue) target).stop(); - e.printStackTrace(); + logger.error("exception occured during sql stream", e); } } }; @@ -620,6 +624,7 @@ private Field getTargetColumn(SQLColumn c) throws NoSuchFieldException { } public ResultSet flushTarget() throws InternalException { + final boolean process = this.isProcess(); if (!flush) { Runnable r = new Runnable() { @Override @@ -635,6 +640,10 @@ public void run() { if (!target.isPersistent()) { target.persist(new ResultSetTerm(), s); } + + if (process) { + logger.info("Process of cursor records complete"); + } } catch (Exception e) { logger.error("Exception thrown during flush target operation", e); } @@ -662,6 +671,10 @@ public FrameIterator getRbi() { return rbi; } + public FrameIterator getPbi() { + return pbi; + } + public ResultSet getTarget() { return target; } diff --git a/src/main/java/su/interference/sql/SQLIndex.java b/src/main/java/su/interference/sql/SQLIndex.java index a04979b..281f6e3 100644 --- a/src/main/java/su/interference/sql/SQLIndex.java +++ b/src/main/java/su/interference/sql/SQLIndex.java @@ -51,6 +51,7 @@ public class SQLIndex implements FrameIterator { private final boolean unique; private final boolean merged; private final boolean leading; + private final boolean process; private final LinkedBlockingQueue frames; private final AtomicBoolean returned; private final AtomicBoolean terminate; @@ -58,11 +59,11 @@ public class SQLIndex implements FrameIterator { private SQLIndexFrame mframe; private SQLIndexFrame rframe; - public SQLIndex(Table t, Table parent, boolean left, SQLColumn lkey, SQLColumn rkey, boolean merged, NestedCondition nc, int join, Session s) throws Exception { - this(t, parent, left, lkey, rkey, merged, nc, join, false, s); + public SQLIndex(Table t, Table parent, boolean left, SQLColumn lkey, SQLColumn rkey, boolean merged, NestedCondition nc, int join, boolean process, Session s) throws Exception { + this(t, parent, left, lkey, rkey, merged, nc, join, false, process, s); } - public SQLIndex(Table t, Table parent, boolean left, SQLColumn lkey, SQLColumn rkey, boolean merged, NestedCondition nc, int join, boolean leading, Session s) throws Exception { + public SQLIndex(Table t, Table parent, boolean left, SQLColumn lkey, SQLColumn rkey, boolean merged, NestedCondition nc, int join, boolean leading, boolean process, Session s) throws Exception { if (!t.isIndex()) throw new InternalException(); this.t = t; this.s = s; @@ -74,6 +75,7 @@ public SQLIndex(Table t, Table parent, boolean left, SQLColumn lkey, SQLColumn r this.unique = parent.getIndexDescriptByObjectId(t.getObjectId()).isUnique(); this.merged = merged; this.leading = leading; + this.process = process; this.frames = join == SQLJoinDispatcher.MERGE ? null : join == SQLJoinDispatcher.RIGHT_INDEX ? null : left == false ? null : leading ? null : t.getFrames(s, t.getTableClass().getSimpleName()); this.returned = new AtomicBoolean(false); this.terminate = new AtomicBoolean(false); @@ -181,7 +183,7 @@ public void setLeftfs(boolean leftfs) { @Override public boolean noDistribute() { - return this.vc != null || this.join == SQLJoinDispatcher.MERGE || this.leading; + return this.vc != null || this.join == SQLJoinDispatcher.MERGE || this.leading || this.process; } @Override diff --git a/src/main/java/su/interference/sql/SQLJoin.java b/src/main/java/su/interference/sql/SQLJoin.java index 7daf294..5649f50 100644 --- a/src/main/java/su/interference/sql/SQLJoin.java +++ b/src/main/java/su/interference/sql/SQLJoin.java @@ -24,6 +24,8 @@ this software and associated documentation files (the "Software"), to deal in package su.interference.sql; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import su.interference.metrics.Metrics; import su.interference.persistent.*; import su.interference.core.*; @@ -37,6 +39,7 @@ this software and associated documentation files (the "Software"), to deal in public class SQLJoin { + private final static Logger logger = LoggerFactory.getLogger(SQLJoin.class); private final ArrayList tables; private final CList columns; private final NestedCondition nc; @@ -138,6 +141,17 @@ public ResultSet executeJoin(Session s, int mode, ArrayList uset) t sqlc.stream(); } else { sqlc.build(); + if (sqlc.isProcess()) { + if (this.tables.size() > 1) { + throw new RuntimeException("PROCESS statement should operate with one table"); + } + final ResultSet rs = sqlc.flushTarget(); + //wait until all records will be processed + rs.poll(s); + if (!this.tables.get(0).getTable().clusterunlock(s.getTransaction().getTransId())) { + throw new RuntimeException("Unable to unlock "+this.tables.get(0).getTable().getName()+" after processing"); + } + } } } diff --git a/src/main/java/su/interference/sql/SQLJoinDispatcher.java b/src/main/java/su/interference/sql/SQLJoinDispatcher.java index cedf968..08073ab 100644 --- a/src/main/java/su/interference/sql/SQLJoinDispatcher.java +++ b/src/main/java/su/interference/sql/SQLJoinDispatcher.java @@ -47,6 +47,7 @@ public class SQLJoinDispatcher implements Comparable { private final boolean merged; private final boolean skipCheckNC; private final boolean furtherUseUC; + private final boolean process; private final SQLColumn joinedCC; private final int weight; private final int join; @@ -58,7 +59,7 @@ public class SQLJoinDispatcher implements Comparable { public final static int RIGHT_INDEX = 4; public final static int NESTED_LOOPS = 10; - public SQLJoinDispatcher(FrameIterator lbi, FrameIterator rbi, SQLColumn c1, SQLColumn c2, boolean skip, NestedCondition nc, IndexDescript leadingIndex, Session s) throws Exception { + public SQLJoinDispatcher(FrameIterator lbi, FrameIterator rbi, SQLColumn c1, SQLColumn c2, boolean skip, NestedCondition nc, IndexDescript leadingIndex, boolean process, Session s) throws Exception { //depends on join columns is unique or index, standard iterators (Table, Cursor) must replace by additional //iterators - HashMap and Index. HashMap may be only RBI, Index may be LBI in case if both indexes exists @@ -70,6 +71,7 @@ public SQLJoinDispatcher(FrameIterator lbi, FrameIterator rbi, SQLColumn c1, SQL ix2 = vc2 == null ? c2.getIndex() : vc2.getCondition() == Condition.C_EQUAL || vc1.getCondition() == Condition.C_IN ? vc2.getConditionColumn().getIndex() : c2.getIndex(); FrameIterator lbi_ = null; FrameIterator rbi_ = null; + this.process = process; //deprecated this.joinedCC = lbi.getType() == FrameIterator.TYPE_CURSOR?c1:c2; @@ -80,8 +82,8 @@ public SQLJoinDispatcher(FrameIterator lbi, FrameIterator rbi, SQLColumn c1, SQL final Table lt = Instance.getInstance().getTableById(lbi.getObjectId()); final Table rt = Instance.getInstance().getTableById(rbi.getObjectId()); logger.info("use merge join for " + lt.getName() + "." + c1.getColumn().getName() + " * " + rt.getName() + "." + c2.getColumn().getName()); - lbi_ = new SQLIndex(ix1, lt, true, c1, c2, true, nc, MERGE, s); - rbi_ = new SQLIndex(ix2, rt, false, c1, c2, true, nc, MERGE, s); + lbi_ = new SQLIndex(ix1, lt, true, c1, c2, true, nc, MERGE, this.process, s); + rbi_ = new SQLIndex(ix2, rt, false, c1, c2, true, nc, MERGE, this.process, s); if (leadingIndex != null) { if (ix1.getObjectId() == leadingIndex.getIndex().getObjectId()) { leadingIndex.accept(); @@ -93,8 +95,8 @@ public SQLJoinDispatcher(FrameIterator lbi, FrameIterator rbi, SQLColumn c1, SQL final Table lt = Instance.getInstance().getTableById(rbi.getObjectId()); final Table rt = Instance.getInstance().getTableById(lbi.getObjectId()); logger.info("use merge join for " + lt.getName() + "." + c2.getColumn().getName() + " * " + rt.getName() + "." + c1.getColumn().getName()); - lbi_ = new SQLIndex(ix2, lt, true, c2, c1, true, nc, MERGE, s); - rbi_ = new SQLIndex(ix1, rt, false, c2, c1, true, nc, MERGE, s); + lbi_ = new SQLIndex(ix2, lt, true, c2, c1, true, nc, MERGE, this.process, s); + rbi_ = new SQLIndex(ix1, rt, false, c2, c1, true, nc, MERGE, this.process, s); if (leadingIndex != null) { if (ix2.getObjectId() == leadingIndex.getIndex().getObjectId()) { leadingIndex.accept(); @@ -107,11 +109,11 @@ public SQLJoinDispatcher(FrameIterator lbi, FrameIterator rbi, SQLColumn c1, SQL final Table rt = Instance.getInstance().getTableById(rbi.getObjectId()); FrameIterator hbi = null; if (c1.isUnique()) { - lbi_ = ix2 == null ? rbi : new SQLIndex(ix2, rt, true, c1, c2, false, nc, RIGHT_HASH, s); - hbi = ix1 == null ? lbi : new SQLIndex(ix1, lt, false, c1, c2, false, nc, RIGHT_HASH, s); + lbi_ = ix2 == null ? rbi : new SQLIndex(ix2, rt, true, c1, c2, false, nc, RIGHT_HASH, this.process, s); + hbi = ix1 == null ? lbi : new SQLIndex(ix1, lt, false, c1, c2, false, nc, RIGHT_HASH, this.process, s); } else if (c2.isUnique()) { - lbi_ = ix1 == null ? lbi : new SQLIndex(ix1, lt, true, c1, c2, false, nc, RIGHT_HASH, s); - hbi = ix2 == null ? rbi : new SQLIndex(ix2, rt, false, c1, c2, false, nc, RIGHT_HASH, s); + lbi_ = ix1 == null ? lbi : new SQLIndex(ix1, lt, true, c1, c2, false, nc, RIGHT_HASH, this.process, s); + hbi = ix2 == null ? rbi : new SQLIndex(ix2, rt, false, c1, c2, false, nc, RIGHT_HASH, this.process, s); } final Table lt_ = Instance.getInstance().getTableById(lbi_.getObjectId()); final Table rt_ = Instance.getInstance().getTableById(hbi.getObjectId()); @@ -133,11 +135,11 @@ public SQLJoinDispatcher(FrameIterator lbi, FrameIterator rbi, SQLColumn c1, SQL final Table rt = Instance.getInstance().getTableById(rbi.getObjectId()); logger.info("use index scan for " + lt.getName() + "." + c1.getColumn().getName() + " * " + rt.getName() + "." + c2.getColumn().getName()); if (ix1 != null) { - lbi_ = ix2 == null ? rbi : new SQLIndex(ix1, lt, true, c1, c2, false, nc, RIGHT_INDEX, s); - rbi_ = new SQLIndex(ix1, lt, false, c2, c1, false, nc, RIGHT_INDEX, s); + lbi_ = ix2 == null ? rbi : new SQLIndex(ix1, lt, true, c1, c2, false, nc, RIGHT_INDEX, this.process, s); + rbi_ = new SQLIndex(ix1, lt, false, c2, c1, false, nc, RIGHT_INDEX, this.process, s); } else if (ix2 != null) { lbi_ = lbi; - rbi_ = new SQLIndex(ix2, rt, false, c1, c2, false, nc, RIGHT_INDEX, s); + rbi_ = new SQLIndex(ix2, rt, false, c1, c2, false, nc, RIGHT_INDEX, this.process, s); } if (lbi_ instanceof SQLIndex) { if (leadingIndex != null) { diff --git a/src/main/java/su/interference/sql/SQLSelect.java b/src/main/java/su/interference/sql/SQLSelect.java index 5aef097..3e448a1 100644 --- a/src/main/java/su/interference/sql/SQLSelect.java +++ b/src/main/java/su/interference/sql/SQLSelect.java @@ -92,7 +92,6 @@ public SQLSelect (String sql, Cursor cur, Session s) { } catch (Exception e) { sqlException = new SQLException(e.getMessage()); logger.error(e.getClass().getSimpleName()+" thrown during parse of sql statement: "+sql); - e.printStackTrace(); } } @@ -144,9 +143,9 @@ public DataSet executeSQL (Session s) { } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during sql execute", e); } - ArrayList result = new ArrayList(); + ArrayList result = new ArrayList<>(); Metrics.get("executeQuery").stop(); return new DataSet(result.toArray(new Object[]{}), t, message); } @@ -267,15 +266,24 @@ private final void parseSQL (String s, Cursor cur, Session sn) throws Exception for (int i=0; i 1) { + throw new RuntimeException("PROCESS statement should operate with one table"); + } + if (!this.tables.get(0).getTable().lock(sn.getTransaction().getTransId())) { + throw new UnableToLockTableForProcess(); + } + } + this.cols = new CList (this.tables, clds); this.nc = new NestedCondition(wpos>=0?sql.substring(wpos+7, baselen).trim():"",this,null); diff --git a/src/main/java/su/interference/sql/SQLTable.java b/src/main/java/su/interference/sql/SQLTable.java index 6fd4b66..7096f09 100644 --- a/src/main/java/su/interference/sql/SQLTable.java +++ b/src/main/java/su/interference/sql/SQLTable.java @@ -26,10 +26,12 @@ this software and associated documentation files (the "Software"), to deal in import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import su.interference.persistent.Session; import su.interference.persistent.Table; import su.interference.persistent.FrameData; import su.interference.core.Instance; import su.interference.sqlexception.InvalidTableDescription; +import su.interference.sqlexception.UnableToLockTableForProcess; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; @@ -53,7 +55,7 @@ public class SQLTable implements Comparable, FrameIterator { private final boolean process; private final Class evtprc; - public SQLTable (String table, String alias, boolean process, Class evtprc) throws InvalidTableDescription { + public SQLTable (String table, String alias, boolean process, Class evtprc, Session s) throws InvalidTableDescription, UnableToLockTableForProcess { this.alias = alias; String[] tblss = table.trim().split("\\."); if (tblss.length==1) { //without schema prefix - use user default schema @@ -65,7 +67,7 @@ public SQLTable (String table, String alias, boolean process, Class evtprc) thro throw new InvalidTableDescription(); } - frames = Instance.getInstance().getTableById(this.table.getObjectId()).getFrames(); + this.frames = Instance.getInstance().getTableById(this.table.getObjectId()).getFrames(); this.terminate = new AtomicBoolean(false); this.process = process; this.evtprc = evtprc; @@ -154,7 +156,7 @@ public void setLeftfs(boolean leftfs) { @Override public boolean noDistribute() { - return false; + return this.table.isIndexed() && this.process; } public boolean isIndexOrdered() { diff --git a/src/main/java/su/interference/sqlexception/UnableToLockTableForProcess.java b/src/main/java/su/interference/sqlexception/UnableToLockTableForProcess.java new file mode 100644 index 0000000..c321b9f --- /dev/null +++ b/src/main/java/su/interference/sqlexception/UnableToLockTableForProcess.java @@ -0,0 +1,34 @@ +/** + The MIT License (MIT) + + Copyright (c) 2010-2021 head systems, ltd + + Permission is hereby granted, free of charge, to any person obtaining a copy of + this software and associated documentation files (the "Software"), to deal in + the Software without restriction, including without limitation the rights to + use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + the Software, and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + */ + +package su.interference.sqlexception; + +/** + * @author Yuriy Glotanov + * @since 1.0 + */ + +public class UnableToLockTableForProcess extends SQLException { + +} diff --git a/src/main/java/su/interference/transport/CommandEvent.java b/src/main/java/su/interference/transport/CommandEvent.java index 3220eb7..2b793ef 100644 --- a/src/main/java/su/interference/transport/CommandEvent.java +++ b/src/main/java/su/interference/transport/CommandEvent.java @@ -29,6 +29,7 @@ this software and associated documentation files (the "Software"), to deal in import su.interference.core.Instance; import su.interference.persistent.FrameData; import su.interference.persistent.Session; +import su.interference.persistent.Table; import su.interference.persistent.Transaction; /** @@ -43,18 +44,30 @@ public class CommandEvent extends TransportEventImpl implements PersistentEvent public static final int INITTRAN = 1; public static final int COMMIT = 2; public static final int ROLLBACK = 3; - public static final int LOCK = 4; - public static final int UNLOCK = 5; + public static final int LOCK_TABLE = 4; + public static final int UNLOCK_TABLE = 5; + public static final int LOCK_FRAME = 6; + public static final int UNLOCK_FRAME = 7; public static final int MAX_COMMAND = 10; private final int command; private final int nodeId; private final long id; + private final long id2; public CommandEvent(int command, int nodeId, long id, int channelId) { super(channelId); this.command = command; this.nodeId = nodeId; this.id = id; + this.id2 = 0; + } + + public CommandEvent(int command, int nodeId, long id, long id2, int channelId) { + super(channelId); + this.command = command; + this.nodeId = nodeId; + this.id = id; + this.id2 = id2; } @Override @@ -68,7 +81,7 @@ public EventResult process() { if (t != null) { t.commit(s, true); } else { - return new EventResult(TransportCallback.FAILURE, null, 0, null, new RuntimeException("transaction in null"), null); + return new EventResult(TransportCallback.FAILURE, null, 0, null, new RuntimeException(TransportContext.TRANSACTION_ISNULL_MESSAGE), null); } } if (command == ROLLBACK) { @@ -76,16 +89,30 @@ public EventResult process() { if (t != null) { t.rollback(s, true); } else { - return new EventResult(TransportCallback.FAILURE, null, 0, null, new RuntimeException("transaction in null"), null); + return new EventResult(TransportCallback.FAILURE, null, 0, null, new RuntimeException(TransportContext.TRANSACTION_ISNULL_MESSAGE), null); } } - if (command == LOCK) { - final FrameData bd = Instance.getInstance().getFrameById(id); - bd.lock(this.nodeId); + if (command == LOCK_TABLE) { + final Table t = Instance.getInstance().getTableById((int)id2); + if (!t.lock(id)) { + return new EventResult(TransportCallback.FAILURE, null, 0, null, new RuntimeException(TransportContext.UNABLE_LOCK_TABLE_MESSAGE), null); + } + } + if (command == UNLOCK_TABLE) { + final Table t = Instance.getInstance().getTableById((int)id2); + if (!t.unlock(id)) { + return new EventResult(TransportCallback.FAILURE, null, 0, null, new RuntimeException(TransportContext.UNABLE_UNLOCK_TABLE_MESSAGE), null); + } + } + if (command == LOCK_FRAME) { + final FrameData bd = Instance.getInstance().getFrameById(id2); + if (!bd.rlock(id, this.nodeId)) { + return new EventResult(TransportCallback.FAILURE, null, 0, null, new RuntimeException(TransportContext.UNABLE_LOCK_FRAME_MESSAGE), null); + } } - if (command == UNLOCK) { - final FrameData bd = Instance.getInstance().getFrameById(id); - bd.unlock(this.nodeId); + if (command == UNLOCK_FRAME) { + //final FrameData bd = Instance.getInstance().getFrameById(id); + //bd.unlock(this.nodeId); } return new EventResult(TransportCallback.SUCCESS, null, 0, null, null, null); } diff --git a/src/main/java/su/interference/transport/HeartBeatProcess.java b/src/main/java/su/interference/transport/HeartBeatProcess.java index ec7243c..099d8c9 100644 --- a/src/main/java/su/interference/transport/HeartBeatProcess.java +++ b/src/main/java/su/interference/transport/HeartBeatProcess.java @@ -1,7 +1,7 @@ /** The MIT License (MIT) - Copyright (c) 2010-2019 head systems, ltd + Copyright (c) 2010-2021 head systems, ltd Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -78,7 +78,7 @@ public void run () { logger.debug("heartbeat failed"); } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during heartbeat process", e); } } else { logger.info(entry.getValue()+" not connected"); diff --git a/src/main/java/su/interference/transport/SQLEvent.java b/src/main/java/su/interference/transport/SQLEvent.java index aa52257..bed803b 100644 --- a/src/main/java/su/interference/transport/SQLEvent.java +++ b/src/main/java/su/interference/transport/SQLEvent.java @@ -90,8 +90,7 @@ public EventResult process() { } final FrameApi bd1 = c.getLbi() instanceof SQLIndex ? b : Instance.getInstance().getFrameByAllocId(j.getLeftAllocId()); final FrameApi bd2 = j.getRightAllocId() == 0 ? b_ : Instance.getInstance().getFrameByAllocId(j.getRightAllocId()); - final FrameIterator pbi = c.getRbi() != null && c.getRbi().isProcess() ? c.getRbi() : c.getLbi(); - flist.add(c.execute(pbi, bd1, bd2, j)); + flist.add(c.execute(c.getPbi(), bd1, bd2, j)); res.add(j); } boolean cnue = true; diff --git a/src/main/java/su/interference/transport/SyncFrameEvent.java b/src/main/java/su/interference/transport/SyncFrameEvent.java index e34c12d..26d59b3 100644 --- a/src/main/java/su/interference/transport/SyncFrameEvent.java +++ b/src/main/java/su/interference/transport/SyncFrameEvent.java @@ -61,7 +61,7 @@ public EventResult process() { try { rframe2(this.sb); } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during sync event process", e); return new EventResult(TransportCallback.FAILURE, null, 0, null, e, null); } return new EventResult(TransportCallback.SUCCESS, null, 0, null, null, null); @@ -153,7 +153,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { } } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during sync event process", e); } } @@ -215,7 +215,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { } } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during sync event process", e); } } @@ -251,14 +251,14 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { } } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during sync event process", e); } } for (Map.Entry> entry : storemap.entrySet()) { final Table t = Instance.getInstance().getTableById(entry.getKey()); if (this.getCallbackNodeId() == 0) { - throw new RuntimeException("wrong callback node id"); + throw new RuntimeException(TransportContext.WRONG_CALLBACK_NODE_MESSAGE); } final LLT llt_ = LLT.getLLT(); t.storeFrames(entry.getValue(), this.getCallbackNodeId(), llt_, s); @@ -289,11 +289,12 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception { private void updateTransactions(Map rtran, Session s) { for (Map.Entry entry : rtran.entrySet()) { - Transaction tran = Instance.getInstance().getTransactionById(entry.getKey()); - if (tran==null) { + final Transaction tran = Instance.getInstance().getTransactionById(entry.getKey()); + if (tran == null) { + final Transaction tran_ = new Transaction(entry.getValue()); try { - s.persist(entry.getValue()); - } catch(Exception e) { + s.persist(tran_); + } catch (Exception e) { logger.error("unable to persist remote transaction", e); } } diff --git a/src/main/java/su/interference/transport/TransportChannel.java b/src/main/java/su/interference/transport/TransportChannel.java index f541d0a..07ab381 100644 --- a/src/main/java/su/interference/transport/TransportChannel.java +++ b/src/main/java/su/interference/transport/TransportChannel.java @@ -105,7 +105,7 @@ public void run() { logger.debug("channel id = " + channelId + " sent " + transportMessage + " message with UUID: " + transportMessage.getUuid()); } else { if (transportMessage.getType() == TransportMessage.TRANSPORT_MESSAGE || transportMessage.getType() == TransportMessage.HEARTBEAT_MESSAGE) { - transportMessage.getTransportEvent().failure(channelId, new RuntimeException("Channel failure")); + transportMessage.getTransportEvent().failure(channelId, new RuntimeException(TransportContext.CHANNEL_FAILURE_MESSAGE)); if (transportMessage.getTransportEvent().getLatch() != null) { transportMessage.getTransportEvent().getLatch().countDown(); } @@ -120,7 +120,7 @@ public void run() { try { sock.close(); } catch (Exception e_) { - e_.printStackTrace(); + logger.error("transport channel failure: ", e_); } logger.error("channel id = " + channelId + " stopped by connection failure"); } @@ -128,7 +128,7 @@ public void run() { try { Thread.sleep(1); } catch (InterruptedException ie) { - ie.printStackTrace(); + logger.error("transport channel failure: ", ie); } } } catch (IOException e) { @@ -147,7 +147,7 @@ public void run() { try { sock.close(); } catch (Exception e_) { - e_.printStackTrace(); + logger.error("transport channel failure: ", e_); } logger.error("channel id = " + channelId + " stopped by connection failure"); logger.error("failure cause: ", e); @@ -155,12 +155,12 @@ public void run() { } } catch (IOException e) { connected.set(false); - e.printStackTrace(); + logger.error("transport channel failure: ", e); } logger.info("channel "+channelId+" closed"); } } catch (Exception e) { - e.printStackTrace(); + logger.error("transport channel failure: ", e); } finally { started.set(false); } @@ -172,7 +172,7 @@ private void cleanQueueOnFail() { while (mq.peek() != null) { final TransportMessage transportMessage = mq.poll(); if (transportMessage.getType() == TransportMessage.TRANSPORT_MESSAGE || transportMessage.getType() == TransportMessage.HEARTBEAT_MESSAGE) { - transportMessage.getTransportEvent().failure(channelId, new RuntimeException("Channel failure")); + transportMessage.getTransportEvent().failure(channelId, new RuntimeException(TransportContext.CHANNEL_FAILURE_MESSAGE)); transportMessage.getTransportEvent().getLatch().countDown(); } } @@ -182,7 +182,7 @@ private void cleanWaitMsgsOnFail() { for (Map.Entry entry : mmap.entrySet()) { final TransportMessage transportMessage = entry.getValue(); if (transportMessage.getType() == TransportMessage.TRANSPORT_MESSAGE || transportMessage.getType() == TransportMessage.HEARTBEAT_MESSAGE) { - transportMessage.getTransportEvent().failure(channelId, new RuntimeException("Channel failure")); + transportMessage.getTransportEvent().failure(channelId, new RuntimeException(TransportContext.CHANNEL_FAILURE_MESSAGE)); transportMessage.getTransportEvent().getLatch().countDown(); } mmap.remove(entry.getKey()); @@ -206,7 +206,7 @@ protected void send(TransportMessage transportMessage) { mq.offer(transportMessage); } else { if (transportMessage.getType() == TransportMessage.TRANSPORT_MESSAGE || transportMessage.getType() == TransportMessage.HEARTBEAT_MESSAGE) { - transportMessage.getTransportEvent().failure(channelId, new RuntimeException("Channel failure")); + transportMessage.getTransportEvent().failure(channelId, new RuntimeException(TransportContext.CHANNEL_FAILURE_MESSAGE)); transportMessage.getTransportEvent().getLatch().countDown(); } else if (transportMessage.getType() == TransportMessage.CALLBACK_MESSAGE) { cbq.add(transportMessage); diff --git a/src/main/java/su/interference/transport/TransportContext.java b/src/main/java/su/interference/transport/TransportContext.java index 0c59ece..2e50e75 100644 --- a/src/main/java/su/interference/transport/TransportContext.java +++ b/src/main/java/su/interference/transport/TransportContext.java @@ -46,11 +46,20 @@ public class TransportContext implements TransportApi { private static TransportContext transportContext; private final AtomicBoolean started = new AtomicBoolean(true); private final ConcurrentLinkedQueue mq = new ConcurrentLinkedQueue<>(); + private final LinkedBlockingQueue inq = new LinkedBlockingQueue<>(10000); private final ExecutorService pool = Executors.newFixedThreadPool(1); + private final ExecutorService pool2 = Executors.newFixedThreadPool(2); private final Map mmap = new ConcurrentHashMap<>(); private final int callbackPort; private final TransportChannel clientChannel; private TransportServer transportServer; + protected static final String CHANNEL_FAILURE_MESSAGE = "Channel failure"; + protected static final String TRANSACTION_ISNULL_MESSAGE = "Transaction is null"; + protected static final String WRONG_CALLBACK_NODE_MESSAGE = "Wrong callback node id"; + protected static final String UNABLE_LOCK_TABLE_MESSAGE = "Unable to lock table"; + protected static final String UNABLE_UNLOCK_TABLE_MESSAGE = "Unable to unlock table"; + protected static final String UNABLE_LOCK_FRAME_MESSAGE = "Unable to lock frame"; + protected static final String UNABLE_UNLOCK_FRAME_MESSAGE = "Unable to unlock frame"; private TransportContext() { this.clientChannel = null; @@ -64,6 +73,7 @@ private TransportContext(TransportChannel clientChannel, int callbackPort) { public void start() { startServer(); + startIncomingMessageProcess(); startClient(); } @@ -100,11 +110,7 @@ public void run() { if (channel != null) { channel.send(transportMessage); } else { - try { - throw new InternalException(); - } catch(Exception e) { - e.printStackTrace(); - } + logger.error("unable to find trasport channel by id="+transportEvent.getChannelId(), new RuntimeException()); } } else { if (clientChannel != null) { @@ -126,7 +132,7 @@ public void run() { try { Thread.sleep(1); } catch (InterruptedException ie) { - ie.printStackTrace(); + logger.error("exception occured", ie); } } } @@ -135,9 +141,44 @@ public void run() { } // process incoming events - protected void onMessage(TransportMessage transportMessage, InetAddress inetAddress) throws InternalException { + protected void onMessage(TransportMessage transportMessage, InetAddress inetAddress) throws Exception { + inq.put(transportMessage); + } + + private void startIncomingMessageProcess() { + pool2.submit(new Runnable() { + @Override + public void run() { + try { + while (started.get()) { + final TransportMessage transportMessage = inq.take(); + onMessage(transportMessage); + } + } catch (InterruptedException ie) { + logger.error("exception occured", ie); + } + } + }); +/* + pool2.submit(new Runnable() { + @Override + public void run() { + try { + while (started.get()) { + final TransportMessage transportMessage = inq.take(); + onMessage(transportMessage); + } + } catch (InterruptedException ie) { + logger.error("exception occured", ie); + } + } + }); +*/ + } + + private void onMessage(TransportMessage transportMessage) throws InternalException { if (transportMessage.getType() == TransportMessage.HEARTBEAT_MESSAGE) { - TransportCallback transportCallback = new TransportCallback(Config.getConfig().LOCAL_NODE_ID, transportMessage.getUuid(), + final TransportCallback transportCallback = new TransportCallback(Config.getConfig().LOCAL_NODE_ID, transportMessage.getUuid(), new EventResult(TransportCallback.SUCCESS, null, 0, null, null, null)); sendCallback(transportMessage.getSender(), new TransportMessage(TransportMessage.CALLBACK_MESSAGE, Config.getConfig().LOCAL_NODE_ID, null, transportCallback)); logger.debug("heartbeat callback " + transportCallback.getMessageUUID() + " sent to node "+transportMessage.getSender()); @@ -148,7 +189,7 @@ protected void onMessage(TransportMessage transportMessage, InetAddress inetAddr logger.debug("transport message received with UUID: " + transportMessage.getUuid() + ", type = " + transportMessage.getTransportEvent().getClass()); transportMessage.getTransportEvent().setCallbackNodeId(transportMessage.getSender()); final EventResult result = transportMessage.getTransportEvent().process(); - TransportCallback transportCallback = new TransportCallback(Config.getConfig().LOCAL_NODE_ID, transportMessage.getUuid(), result); + final TransportCallback transportCallback = new TransportCallback(Config.getConfig().LOCAL_NODE_ID, transportMessage.getUuid(), result); sendCallback(transportMessage.getSender(), new TransportMessage(TransportMessage.CALLBACK_MESSAGE, Config.getConfig().LOCAL_NODE_ID, null, transportCallback)); logger.debug("callback sent with UUID: " + transportMessage.getUuid() + ", type = " + transportMessage.getTransportEvent().getClass()+", destination="+transportMessage.getSender()); } @@ -173,7 +214,7 @@ public void send(TransportEvent transportEvent) { transportEvent.setLatch(new CountDownLatch(1)); mq.offer(transportEvent); } else { - transportEvent.failure(transportEvent.getChannelId(), new RuntimeException("Channel failure")); + transportEvent.failure(transportEvent.getChannelId(), new RuntimeException(CHANNEL_FAILURE_MESSAGE)); } } diff --git a/src/main/java/su/interference/transport/TransportServer.java b/src/main/java/su/interference/transport/TransportServer.java index 73e566f..1d01b1e 100644 --- a/src/main/java/su/interference/transport/TransportServer.java +++ b/src/main/java/su/interference/transport/TransportServer.java @@ -58,7 +58,7 @@ private TransportServer() { final int serverPort = Config.getConfig().RMPORT + i; startServer(pool, serverPort, started); } catch(IOException e){ - e.printStackTrace(); + logger.error("exception occured during TransportServer.init", e); } } } @@ -124,7 +124,7 @@ public void run() { try { Thread.sleep(1000); } catch (InterruptedException ie) { - ie.printStackTrace(); + logger.error("exception occured", ie); } */ logger.debug("server socket timeout exception"); diff --git a/src/main/java/su/interference/transport/TransportSyncTask.java b/src/main/java/su/interference/transport/TransportSyncTask.java index 4a6222e..ab00a58 100644 --- a/src/main/java/su/interference/transport/TransportSyncTask.java +++ b/src/main/java/su/interference/transport/TransportSyncTask.java @@ -27,7 +27,6 @@ this software and associated documentation files (the "Software"), to deal in import org.slf4j.Logger; import org.slf4j.LoggerFactory; import su.interference.core.*; -import su.interference.exception.InternalException; import su.interference.metrics.Metrics; import su.interference.persistent.*; @@ -116,9 +115,8 @@ public void run () { final boolean sent = event.getLatch().await(Config.getConfig().REMOTE_SYNC_TIMEOUT, TimeUnit.MILLISECONDS); if (event.isFail() || !sent) { if (event.getProcessException() != null) { - event.getProcessException().printStackTrace(); + logger.error("exception occured during remote SFE process: ", event.getProcessException()); } - throw new InternalException(); } logger.info(sb.length + " frame(s) were sent and synced (node id = " + channel.getChannelId() + ")"); } @@ -146,7 +144,7 @@ public void run () { } } } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured during sync process", e); } } @@ -165,6 +163,32 @@ public static synchronized void sendBroadcastCommand(int command, long id, Sessi } } + public static synchronized boolean sendNoPersistBroadcastCommand(int command, long id, long id2) throws Exception { + for (Map.Entry entry : HeartBeatProcess.channels.entrySet()) { + final TransportChannel channel = entry.getValue(); + final CommandEvent event = new CommandEvent(command, Config.getConfig().LOCAL_NODE_ID, id, id2, channel.getChannelId()); + TransportContext.getInstance().send(event); + event.getLatch().await(); + if (event.isFail()) { + if (Config.getConfig().IGNORE_COMMAND_CHANNEL_FAILURES && event.getProcessException().getMessage().equals(TransportContext.CHANNEL_FAILURE_MESSAGE)) { + continue; + } + return false; + } + } + return true; + } + + public static synchronized boolean sendNoPersistCommand(int channelId, int command, long id, long id2) throws Exception { + final CommandEvent event = new CommandEvent(command, Config.getConfig().LOCAL_NODE_ID, id, id2, channelId); + TransportContext.getInstance().send(event); + event.getLatch().await(); + if (event.isFail()) { + return false; + } + return true; + } + private void createInitTranCommands(SyncFrame[] sb, int channelId, Session s) throws Exception { final Map tmap = new HashMap<>(); final List events = new ArrayList<>();