Skip to content

Commit

Permalink
Merge branch 'master' into newversion
Browse files Browse the repository at this point in the history
Conflicts:
	java/proj/zoie/impl/indexing/MemoryStreamDataProvider.java
	java/proj/zoie/impl/indexing/StreamDataProvider.java
	test/proj/zoie/test/ZoieTest.java
	test/proj/zoie/test/mock/MockDataLoader.java
  • Loading branch information
xiaoyanggu committed Dec 2, 2010
2 parents 100d558 + 271b6b8 commit 5726793
Show file tree
Hide file tree
Showing 6 changed files with 489 additions and 445 deletions.
9 changes: 4 additions & 5 deletions build.xml
Expand Up @@ -154,14 +154,13 @@
<jvmarg value="-server" />
<jvmarg value="-Xms512m" />
<jvmarg value="-Xmx1g" />
<jvmarg value="-XX:NewSize=1m" />
<jvmarg value="-Xloggc:${logs}/gc.log" />
<jvmarg value="-XX:+PrintGCTimeStamps" />
<jvmarg value="-XX:+PrintGCDetails" />
<jvmarg value="-XX:+AggressiveHeap" />
<jvmarg value="-XX:+UseAdaptiveSizePolicy" />
<!--jvmarg value="-XX:+UseConcMarkSweepGC" /-->
<!--jvmarg value="-XX:MaxTenuringThreshold=10" /-->
<!--jvmarg value="-XX:+UseParallelGC" /-->
<jvmarg value="-XX:+UseConcMarkSweepGC" />
<jvmarg value="-XX:+UseParNewGC" />
<jvmarg value="-XX:MaxTenuringThreshold=10" />
<jvmarg value="-XX:NewRatio=1" />
<jvmarg value="-XX:SurvivorRatio=2" />
<jvmarg value="-Dcom.sun.management.jmxremote" />
Expand Down
4 changes: 4 additions & 0 deletions java/proj/zoie/api/ZoieVersion.java
Expand Up @@ -4,4 +4,8 @@
public abstract class ZoieVersion implements Comparable<ZoieVersion>
{
public abstract String encodeToString();
public static <T extends ZoieVersion> T max(T a, T b)
{
return a == null ? b : ((a.compareTo(b) < 0) ? b : a);
}
}
8 changes: 5 additions & 3 deletions java/proj/zoie/dataprovider/jdbc/JDBCStreamDataProvider.java
Expand Up @@ -46,7 +46,7 @@ public DataEvent<T,V> next() {
DataEvent<T,V> event = null;
try
{
while(!_res.next())
if(!_res.next())
{
try{
_res.close();
Expand All @@ -61,9 +61,11 @@ public DataEvent<T,V> next() {
}
_stmt = _stmtBuilder.buildStatment(_conn, _version);
_res = _stmt.executeQuery();
} else
{
event = _stmtBuilder.buildDataEvent(_res);
_version = event.getVersion();
}
event = _stmtBuilder.buildDataEvent(_res);
_version = event.getVersion();
}
catch (SQLException sqle)
{
Expand Down
229 changes: 112 additions & 117 deletions java/proj/zoie/impl/indexing/MemoryStreamDataProvider.java
@@ -1,4 +1,5 @@
package proj.zoie.impl.indexing;

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
Expand All @@ -22,133 +23,127 @@
import org.apache.log4j.Logger;

import proj.zoie.api.DataConsumer.DataEvent;
import proj.zoie.api.ZoieException;
import proj.zoie.api.ZoieVersion;

public class MemoryStreamDataProvider<D,V extends ZoieVersion> extends StreamDataProvider<D,V> {
public class MemoryStreamDataProvider<D, V extends ZoieVersion> extends StreamDataProvider<D, V>
{

private List<DataEvent<D,V>> _list;
private int _count;
private boolean _stop;

// private static final double DEFAULT_ITERS_PER_SECOND=100.0;
private static final Logger log = Logger.getLogger(MemoryStreamDataProvider.class);

public MemoryStreamDataProvider()
{
super();
_list= new LinkedList<DataEvent<D,V>>();
_count=0;
_stop=false;
}

@Override
public void reset()
{
synchronized(this)
{
_list.clear();
this.notifyAll();
}
}

public void flush()
{
synchronized(this)
{
while(!_list.isEmpty() && !_stop)
{
this.notifyAll();
try
{
this.wait();
}
catch(InterruptedException e)
{
log.warn(e.getMessage());
}
}
}
}

public void addEvents(List<DataEvent<D,V>> list)
private List<DataEvent<D, V>> _list;
private int _count;
private volatile V _maxVersion = null;
private boolean _stop;

// private static final double DEFAULT_ITERS_PER_SECOND=100.0;
private static final Logger log = Logger.getLogger(MemoryStreamDataProvider.class);

public MemoryStreamDataProvider()
{
super();
_list = new LinkedList<DataEvent<D, V>>();
_count = 0;
_stop = false;
}

@Override
public void reset()
{
synchronized (this)
{
_list.clear();
this.notifyAll();
}
}

/**
* flush to the max version that has been added. We only guarantee whatever
* was already added. If more events are added after the beginning of this
* call, they may or may not be flushed at the return of this call. This
* method is not supposed to be called too often.
*/
public void flush()
{
try
{
V maxVersion = _maxVersion;
log.info("flushing version: " + maxVersion);
super.syncWithVersion(3600000, maxVersion);
log.info("flushing version: " + maxVersion + " done");
} catch (ZoieException e)
{
log.error("flush timeout", e);
}
}

public void addEvents(List<DataEvent<D, V>> list)
{
if (list != null && !list.isEmpty())
{
Iterator<DataEvent<D, V>> iter = list.iterator();
synchronized (this)
{
if (list!=null && !list.isEmpty())
while (iter.hasNext())
{
Iterator<DataEvent<D,V>> iter=list.iterator();
synchronized(this)
{
while(iter.hasNext())
{
DataEvent<D,V> obj=iter.next();
_count++;
_list.add(obj);
}
this.notifyAll();
}
DataEvent<D, V> obj = iter.next();
_maxVersion = ZoieVersion.max(_maxVersion, obj.getVersion());
_count++;
_list.add(obj);
}
this.notifyAll();
}
}
}

public void addEvent(DataEvent<D,V> event)
public void addEvent(DataEvent<D, V> event)
{
if (event != null)
{
synchronized (this)
{
if (event!=null)
{
synchronized(this)
{
_count++;
_list.add(event);
this.notifyAll();
}
}
_maxVersion = ZoieVersion.max(_maxVersion, event.getVersion());
_count++;
_list.add(event);
this.notifyAll();
}

@Override
public DataEvent<D,V> next()
{
DataEvent<D,V> obj=null;
synchronized(this)
{
while(_list.isEmpty() && !_stop)
{
try
{
this.wait();
}
catch (InterruptedException e)
{
log.warn(e.getMessage());
}
}
if (!_list.isEmpty())
{
obj=_list.remove(0);
this.notifyAll();
}
}
return obj;
}

public int getCount()
{
synchronized(this)
{
return _count;
}
}

@Override
public DataEvent<D, V> next()
{
DataEvent<D, V> obj = null;
synchronized (this)
{
if (!_list.isEmpty())
{
obj = _list.remove(0);
this.notifyAll();
}
}
}
return obj;
}

public int getCount()
{
synchronized (this)
{
return _count;
}
}

@Override
public void stop()
{
try
{
synchronized(this)
{
_stop=true;
this.notifyAll();
}
}
finally
{
super.stop();
}
}
@Override
public void stop()
{
try
{
synchronized (this)
{
_stop = true;
this.notifyAll();
}
} finally
{
super.stop();
}
}
}

0 comments on commit 5726793

Please sign in to comment.