@@ -51,7 +51,7 @@ void IncParser::Insert(const List& inode, const List* pos) {
m_ilistPool.Insert (node);
std::pair<input_iter, bool > p = m_inputMap.insert (std::make_pair (&inode, n));
if (!p.second ) {
throw SError (" Cannot double-insert input node " + string ( inode));
throw SError (" Cannot double-insert input node " + inode. Print ( ));
}
List* ipos = NULL ;
input_iter ipos_i = m_inputMap.find (pos);
@@ -60,14 +60,15 @@ void IncParser::Insert(const List& inode, const List* pos) {
}
if (ipos) {
if (n == ipos) {
throw SError (" Cannot Insert node " + string ( inode) + " after itself" );
throw SError (" Cannot Insert node " + inode. Print ( ) + " after itself" );
}
n->left = ipos;
n->right = ipos->right ;
if (n->right ) {
n->right ->left = n;
}
ipos->right = n;
g_log.debug () << " INSERT:::::: for pos " << *pos << " we have ipos " << *ipos << " which has right " << *ipos->right ;
} else {
List* first = m_firstINode;
if (first) {
@@ -82,7 +83,7 @@ void IncParser::Insert(const List& inode, const List* pos) {
void IncParser::Delete (const List& inode) {
input_mod_iter n = m_inputMap.find (&inode);
if (m_inputMap.end () == n) {
throw SError (" Cannot delete input node " + string ( inode) + " : node not found" );
throw SError (" Cannot delete input node " + inode. Print ( ) + " : node not found" );
}
List& node = *n->second ;
if (node.left ) {
@@ -102,7 +103,7 @@ void IncParser::Delete(const List& inode) {
void IncParser::Update (const List& inode, const string& value) {
input_iter n = m_inputMap.find (&inode);
if (m_inputMap.end () == n) {
throw SError (" Cannot update input node " + string ( inode) + " : node not found" );
throw SError (" Cannot update input node " + inode. Print ( ) + " : node not found" );
}
List& node = *n->second ;
node.value = value;
@@ -133,7 +134,7 @@ void IncParser::ExtractChanges(Batch& out_batch) {
g_log.debug () << " EXTRACT START: " << m_name;
if (m_touchedNodes.find (&m_root) != m_touchedNodes.end ()) {
const List* behind_node = NULL ;
ComputeOutput_Update (m_root, out_batch, behind_node);
ComputeOutput_Update (m_root, out_batch, behind_node, State::ST_COMPLETE );
}
g_log.debug () << " EXTRACT DONE: " << m_name;
m_touchedNodes.clear ();
@@ -177,7 +178,7 @@ void IncParser::UnlistenAll(STree& x) {
}
STree* IncParser::OwnNode (auto_ptr<STree> node) {
g_log.debug () << " IncParser " << m_name << " : Owning node " << string ( *node) ;
g_log.debug () << " IncParser " << m_name << " : Owning node " << *node;
return m_nodePool.Insert (node);
}
@@ -354,12 +355,9 @@ void IncParser::ProcessActions() {
}
}
void IncParser::ComputeOutput_Update (const STree& node, Batch& out_batch, const List*& behind_node) {
void IncParser::ComputeOutput_Update (const STree& node, Batch& out_batch, const List*& behind_node, State::Station worst_station ) {
g_log.debug () << " IncParser " << m_name << " : Computing output UPDATE for node " << node;
if (node.GetState ().IsPending () || node.GetState ().IsBad ()) {
g_log.debug () << " - state is " << node.GetState () << " , so not computing output" ;
return ;
}
worst_station = std::min (node.GetState ().GetStation (), worst_station);
node.GetOutputFunc () ();
OutputList& output = node.GetOutputFunc ().GetOutput ();
@@ -372,7 +370,7 @@ void IncParser::ComputeOutput_Update(const STree& node, Batch& out_batch, const
return ;
}
g_log.debug () << " - was not emitting, but is now; inserting all" ;
return ComputeOutput_Insert (node, out_batch, behind_node);
return ComputeOutput_Insert (node, out_batch, behind_node, worst_station );
}
const OutputList& prev_output = prev_output_i->second ;
@@ -384,13 +382,14 @@ void IncParser::ComputeOutput_Update(const STree& node, Batch& out_batch, const
OutputList::const_iterator prev_item = prev_output.begin ();
while (item != output.end () && prev_item != prev_output.end ()) {
if (*item == *prev_item) {
g_log.debug () << " case 1" ;
UpdateOutput (*item, out_batch, behind_node);
g_log.debug () << " CO_Update linear pass, case 1 (lists match); before call, have batch=" << out_batch;
UpdateOutput (*item, out_batch, behind_node, worst_station);
g_log.debug () << " case 1 inner call gave us batch: " << out_batch << " at item " << *item;
behind_node = GetOEnd (*item);
++item;
++prev_item;
} else {
g_log.debug () << " case 2" ;
g_log.debug () << " CO_Update linear pass, case 2 (lists don't match) " ;
// Buffer both sides, and keep a lookup-set so we can identify if/when we
// find a node in one list that was surpassed in the other.
typedef set<OutputItem> node_set;
@@ -406,12 +405,14 @@ void IncParser::ComputeOutput_Update(const STree& node, Batch& out_batch, const
// insert nodes up to find_ins
do {
g_log.debug () << " do1" ;
InsertOutput (*item, out_batch, behind_node);
InsertOutput (*item, out_batch, behind_node, worst_station );
behind_node = GetOEnd (*item);
++item;
} while (*find_ins != *item);
// then remove the nodes up to find_ins from ins_set, and advance item to find_ins+1
// - actually, we can just nix the ins_set, since we'll break. meh
// - nope? because if we don't break and just finish the list(s(?)), we'll want to Insert/Remove the unprocessed intermediaries
ins_set.clear ();
++item;
// prev_item stays where it is
// then break
@@ -421,9 +422,10 @@ void IncParser::ComputeOutput_Update(const STree& node, Batch& out_batch, const
if (find_del != del_set.end ()) {
do {
g_log.debug () << " do2" ;
RemoveOutput (*prev_item, out_batch);
RemoveOutput (*prev_item, out_batch, worst_station );
++prev_item;
} while (*find_del != *prev_item);
del_set.clear ();
++prev_item;
break ;
}
@@ -433,18 +435,36 @@ void IncParser::ComputeOutput_Update(const STree& node, Batch& out_batch, const
++del_item;
g_log.debug () << " woop" ;
}
item = ins_item;
prev_item = del_item;
// grab any remaining items in the ins_set / del_set (no-break case)
for (; item != ins_item; ++item) {
g_log.debug () << " ins wow" ;
InsertOutput (*item, out_batch, behind_node, worst_station);
behind_node = GetOEnd (*item);
}
for (; prev_item != del_item; ++prev_item) {
g_log.debug () << " del wow" ;
RemoveOutput (*prev_item, out_batch, worst_station);
}
g_log.debug () << " outwhile" ;
if (item != output.end ()) {
g_log.debug () << " have item" ;
} else {
g_log.debug () << " do not have item" ;
}
if (prev_item != prev_output.end ()) {
g_log.debug () << " have prev_item" ;
} else {
g_log.debug () << " do not have prev_item" ;
}
}
}
g_log.info () << " ok..." ;
for (; item != output.end (); ++item) {
InsertOutput (*item, out_batch, behind_node);
InsertOutput (*item, out_batch, behind_node, worst_station );
behind_node = GetOEnd (*item);
}
for (; prev_item != prev_output.end (); ++prev_item) {
RemoveOutput (*prev_item, out_batch);
RemoveOutput (*prev_item, out_batch, worst_station );
}
if (output.empty ()) {
@@ -457,10 +477,11 @@ void IncParser::ComputeOutput_Update(const STree& node, Batch& out_batch, const
node.GetOutputFunc ().Sync ();
}
void IncParser::ComputeOutput_Insert (const STree& node, Batch& out_batch, const List*& behind_node) {
void IncParser::ComputeOutput_Insert (const STree& node, Batch& out_batch, const List*& behind_node, State::Station worst_station ) {
g_log.debug () << " IncParser " << m_name << " : Computing output INSERT for node " << node;
if (node.GetState ().IsPending () || node.GetState ().IsBad ()) {
g_log.debug () << " - state is " << node.GetState () << " , so not computing output" ;
worst_station = std::min (node.GetState ().GetStation (), worst_station);
if (State::ST_PENDING == worst_station || State::ST_BAD == worst_station) {
g_log.debug () << " NOT computing output insert to node " << node << " since worst station is " << worst_station;
return ;
}
@@ -471,7 +492,7 @@ void IncParser::ComputeOutput_Insert(const STree& node, Batch& out_batch, const
// previous output
for (OutputList::const_iterator item = output.begin ();
item != output.end (); ++item) {
InsertOutput (*item, out_batch, behind_node);
InsertOutput (*item, out_batch, behind_node, worst_station );
g_log.info () << " insert woo..." ;
behind_node = GetOEnd (*item);
}
@@ -485,12 +506,9 @@ void IncParser::ComputeOutput_Insert(const STree& node, Batch& out_batch, const
node.GetOutputFunc ().Sync ();
}
void IncParser::ComputeOutput_Delete (const STree& node, Batch& out_batch) {
void IncParser::ComputeOutput_Delete (const STree& node, Batch& out_batch, State::Station worst_station ) {
g_log.debug () << " IncParser " << m_name << " : Computing output DELETE for node " << node;
if (node.GetState ().IsPending () || node.GetState ().IsBad ()) {
g_log.debug () << " - state is " << node.GetState () << " , so not computing output" ;
return ;
}
worst_station = std::min (node.GetState ().GetStation (), worst_station);
output_mod_iter prev_output_i = m_outputPerNode.find (&node);
if (m_outputPerNode.end () == prev_output_i) {
@@ -502,52 +520,60 @@ void IncParser::ComputeOutput_Delete(const STree& node, Batch& out_batch) {
// current output
for (OutputList::const_iterator prev_item = prev_output.begin ();
prev_item != prev_output.end (); ++prev_item) {
RemoveOutput (*prev_item, out_batch);
RemoveOutput (*prev_item, out_batch, worst_station );
}
m_outputPerNode.erase (&node);
g_log.debug () << " Done computing DELETE output for node " << node << " . Batch so far: " << out_batch;
}
void IncParser::Cleanup () {
g_log.debug () << " IncParser " << m_name << " - cleaning up" ;
g_san.debug () << " Cleaning up node pool: " << string ( m_nodePool) ;
g_san.debug () << " Cleaning up node pool: " << m_nodePool;
m_nodePool.Cleanup ();
g_san.debug () << " Cleaning up IList pool: " << string ( m_ilistPool) ;
g_san.debug () << " Cleaning up IList pool: " << m_ilistPool;
m_ilistPool.Cleanup ();
SanityCheck ();
}
void IncParser::InsertOutput (const OutputItem& item, Batch& out_batch, const List*& behind_node) {
void IncParser::InsertOutput (const OutputItem& item, Batch& out_batch, const List*& behind_node, State::Station worst_station ) {
if (item.onode ) {
g_log.debug () << " InsertOutput for item onode " << *item.onode ;
if (behind_node) {
g_log.debug () << " - at pos " << *behind_node;
}
if (State::ST_PENDING == worst_station || State::ST_BAD == worst_station) {
g_log.debug () << " NOT emitting onode " << *item.onode << " since worst station is " << worst_station;
return ;
}
out_batch.Insert (*item.onode , behind_node);
} else {
g_log.debug () << " InsertOutput for item child " << *item.child ;
ComputeOutput_Insert (*item.child , out_batch, behind_node);
ComputeOutput_Insert (*item.child , out_batch, behind_node, worst_station );
}
}
void IncParser::RemoveOutput (const OutputItem& item, Batch& out_batch) {
void IncParser::RemoveOutput (const OutputItem& item, Batch& out_batch, State::Station worst_station ) {
if (item.onode ) {
g_log.debug () << " RemoveOutput for item onode " << *item.onode ;
out_batch.Delete (*item.onode );
} else {
g_log.debug () << " RemoveOutput for item child " << *item.child ;
ComputeOutput_Delete (*item.child , out_batch);
ComputeOutput_Delete (*item.child , out_batch, worst_station );
}
}
void IncParser::UpdateOutput (OutputItem& item, Batch& out_batch, const List*& behind_node) {
void IncParser::UpdateOutput (OutputItem& item, Batch& out_batch, const List*& behind_node, State::Station worst_station ) {
if (item.onode && item.onode ->value != item.prev_value ) {
g_log.debug () << " UpdateOutput for item onode " << *item.onode ;
if (State::ST_PENDING == worst_station || State::ST_BAD == worst_station) {
g_log.debug () << " NOT emitting update to onode " << *item.onode << " since worst station is " << worst_station;
return ;
}
item.prev_value = item.onode ->value ;
out_batch.Update (*item.onode );
} else if (item.child && m_touchedNodes.find (item.child ) != m_touchedNodes.end ()) {
g_log.debug () << " UpdateOutput for item child " << *item.child ;
ComputeOutput_Update (*item.child , out_batch, behind_node);
ComputeOutput_Update (*item.child , out_batch, behind_node, worst_station );
}
}
0 comments on commit
11365bb