Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public final class HttpSegmentChangeFetcher implements SegmentChangeFetcher {

private static final String SINCE = "since";
private static final String PREFIX = "segmentChangeFetcher";
private static final String NAME_CACHE = "Cache-Control";
private static final String VALUE_CACHE = "no-cache";

private final CloseableHttpClient _client;
private final URI _target;
Expand All @@ -49,7 +51,7 @@ private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, Metrics me
}

@Override
public SegmentChange fetch(String segmentName, long since) {
public SegmentChange fetch(String segmentName, long since, boolean addCacheHeader) {
long start = System.currentTimeMillis();

CloseableHttpResponse response = null;
Expand All @@ -58,6 +60,9 @@ public SegmentChange fetch(String segmentName, long since) {
String path = _target.getPath() + "/" + segmentName;
URI uri = new URIBuilder(_target).setPath(path).addParameter(SINCE, "" + since).build();
HttpGet request = new HttpGet(uri);
if(addCacheHeader) {
request.setHeader(NAME_CACHE, VALUE_CACHE);
}
response = _client.execute(request);

int statusCode = response.getCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher {

private static final String SINCE = "since";
private static final String PREFIX = "splitChangeFetcher";
private static final String NAME_CACHE = "Cache-Control";
private static final String VALUE_CACHE = "no-cache";

private final CloseableHttpClient _client;
private final URI _target;
Expand All @@ -49,7 +51,7 @@ private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, Metrics metr
}

@Override
public SplitChange fetch(long since) {
public SplitChange fetch(long since, boolean addCacheHeader) {

long start = System.currentTimeMillis();

Expand All @@ -59,6 +61,9 @@ public SplitChange fetch(long since) {
URI uri = new URIBuilder(_target).addParameter(SINCE, "" + since).build();

HttpGet request = new HttpGet(uri);
if(addCacheHeader) {
request.setHeader(NAME_CACHE, VALUE_CACHE);
}
response = _client.execute(request);

int statusCode = response.getCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public SplitJmxMonitor(SplitClient splitClient, SplitFetcher featureFetcher, Spl

@Override
public boolean forceSyncFeatures() {
_featureFetcher.forceRefresh();
_featureFetcher.forceRefresh(true);
_log.info("Features successfully refreshed via JMX");
return true;
}
Expand All @@ -43,7 +43,7 @@ public boolean forceSyncFeatures() {
public boolean forceSyncSegment(String segmentName) {
SegmentFetcher fetcher = _segmentSynchronizationTask.getFetcher(segmentName);
try{
fetcher.fetch();
fetcher.fetch(true);
}
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
catch (NullPointerException np){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
@Override
public void syncAll() {
_syncAllScheduledExecutorService.schedule(() -> {
_splitFetcher.run();
_segmentSynchronizationTaskImp.run();
_splitFetcher.fetchAll(true);
_segmentSynchronizationTaskImp.fetchAll(true);
}, 0, TimeUnit.SECONDS);
}

Expand All @@ -70,7 +70,7 @@ public void stopPeriodicFetching() {
@Override
public void refreshSplits(long targetChangeNumber) {
if (targetChangeNumber > _splitCache.getChangeNumber()) {
_splitFetcher.forceRefresh();
_splitFetcher.forceRefresh(true);
}
}

Expand All @@ -87,7 +87,7 @@ public void refreshSegment(String segmentName, long changeNumber) {
if (changeNumber > _segmentCache.getChangeNumber(segmentName)) {
SegmentFetcher fetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
try{
fetcher.fetch();
fetcher.fetch(true);
}
//We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
catch (NullPointerException np){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ public interface SplitChangeFetcher {
* @return SegmentChange
* @throws java.lang.RuntimeException if there was a problem computing split changes
*/
SplitChange fetch(long since);
SplitChange fetch(long since, boolean addCacheHeader);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,11 @@ public interface SplitFetcher extends Runnable {
* Forces a sync of splits, outside of any scheduled
* syncs. This method MUST NOT throw any exceptions.
*/
void forceRefresh();
void forceRefresh(boolean addCacheHeader);

/**
* Forces a sync of ALL splits, outside of any scheduled
* syncs. This method MUST NOT throw any exceptions.
*/
void fetchAll(boolean addCacheHeader);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser
}

@Override
public void forceRefresh() {
public void forceRefresh(boolean addCacheHeader) {
_log.debug("Force Refresh splits starting ...");
try {
while (true) {
long start = _splitCache.getChangeNumber();
runWithoutExceptionHandling();
runWithoutExceptionHandling(addCacheHeader);
long end = _splitCache.getChangeNumber();

if (start >= end) {
Expand All @@ -65,28 +65,11 @@ public void forceRefresh() {

@Override
public void run() {
_log.debug("Fetch splits starting ...");
long start = _splitCache.getChangeNumber();
try {
runWithoutExceptionHandling();
_gates.splitsAreReady();
} catch (InterruptedException e) {
_log.warn("Interrupting split fetcher task");
Thread.currentThread().interrupt();
} catch (Throwable t) {
_log.error("RefreshableSplitFetcher failed: " + t.getMessage());
if (_log.isDebugEnabled()) {
_log.debug("Reason:", t);
}
} finally {
if (_log.isDebugEnabled()) {
_log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber());
}
}
this.fetchAll(false);
}

private void runWithoutExceptionHandling() throws InterruptedException {
SplitChange change = _splitChangeFetcher.fetch(_splitCache.getChangeNumber());
private void runWithoutExceptionHandling(boolean addCacheHeader) throws InterruptedException {
SplitChange change = _splitChangeFetcher.fetch(_splitCache.getChangeNumber(), addCacheHeader);

if (change == null) {
throw new IllegalStateException("SplitChange was null");
Expand Down Expand Up @@ -155,4 +138,25 @@ private void runWithoutExceptionHandling() throws InterruptedException {
_splitCache.setChangeNumber(change.till);
}
}
@Override
public void fetchAll(boolean addCacheHeader) {
_log.debug("Fetch splits starting ...");
long start = _splitCache.getChangeNumber();
try {
runWithoutExceptionHandling(addCacheHeader);
_gates.splitsAreReady();
} catch (InterruptedException e) {
_log.warn("Interrupting split fetcher task");
Thread.currentThread().interrupt();
} catch (Throwable t) {
_log.error("RefreshableSplitFetcher failed: " + t.getMessage());
if (_log.isDebugEnabled()) {
_log.debug("Reason:", t);
}
} finally {
if (_log.isDebugEnabled()) {
_log.debug("split fetch before: " + start + ", after: " + _splitCache.getChangeNumber());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ public interface SegmentChangeFetcher {
* @return SegmentChange
* @throws java.lang.RuntimeException if there was a problem fetching segment changes
*/
SegmentChange fetch(String segmentName, long changesSinceThisChangeNumber);
SegmentChange fetch(String segmentName, long changesSinceThisChangeNumber, boolean addCacheHeader);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,9 @@ public interface SegmentFetcher {
/**
* fetch
*/
void fetch();
void fetch(boolean addCacheHeader);

void runWhitCacheHeader();

void fetchAll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

public class SegmentFetcherImp implements Runnable, SegmentFetcher {
public class SegmentFetcherImp implements SegmentFetcher {
private static final Logger _log = LoggerFactory.getLogger(SegmentFetcherImp.class);

private final String _segmentName;
Expand All @@ -31,14 +31,9 @@ public SegmentFetcherImp(String segmentName, SegmentChangeFetcher segmentChangeF
}

@Override
public void run() {
public void fetch(boolean addCacheHeader){
try {
// Do this again in case the previous call errored out.
_gates.registerSegment(_segmentName);
callLoopRun(true);

_gates.segmentIsReady(_segmentName);

callLoopRun(false, addCacheHeader);
} catch (Throwable t) {
_log.error("RefreshableSegmentFetcher failed: " + t.getMessage());
if (_log.isDebugEnabled()) {
Expand All @@ -47,20 +42,8 @@ public void run() {
}
}

@Override
public void fetch(){
try {
callLoopRun(false);
} catch (Throwable t) {
_log.error("RefreshableSegmentFetcher failed: " + t.getMessage());
if (_log.isDebugEnabled()) {
_log.debug("Reason:", t);
}
}
}

private void runWithoutExceptionHandling() {
SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCache.getChangeNumber(_segmentName));
private void runWithoutExceptionHandling(boolean addCacheHeader) {
SegmentChange change = _segmentChangeFetcher.fetch(_segmentName, _segmentCache.getChangeNumber(_segmentName), addCacheHeader);

if (change == null) {
throw new IllegalStateException("SegmentChange was null");
Expand Down Expand Up @@ -126,10 +109,10 @@ private String summarize(List<String> changes) {
return bldr.toString();
}

private void callLoopRun(boolean isFetch){
private void callLoopRun(boolean isFetch, boolean addCacheHeader){
while (true) {
long start = _segmentCache.getChangeNumber(_segmentName);
runWithoutExceptionHandling();
runWithoutExceptionHandling(addCacheHeader);
long end = _segmentCache.getChangeNumber(_segmentName);
if (isFetch && _log.isDebugEnabled()) {
_log.debug(_segmentName + " segment fetch before: " + start + ", after: " + _segmentCache.getChangeNumber(_segmentName) /*+ " size: " + _concurrentKeySet.size()*/);
Expand All @@ -139,4 +122,36 @@ private void callLoopRun(boolean isFetch){
}
}
}

@Override
public void runWhitCacheHeader(){
this.fetchAndUpdate(true);
}

/**
* Calls callLoopRun and after fetchs segment.
* @param addCacheHeader indicates if CacheHeader is required
*/
private void fetchAndUpdate(boolean addCacheHeader) {
try {
// Do this again in case the previous call errored out.
_gates.registerSegment(_segmentName);
callLoopRun(true, addCacheHeader);

_gates.segmentIsReady(_segmentName);

} catch (Throwable t) {
_log.error("RefreshableSegmentFetcher failed: " + t.getMessage());
if (_log.isDebugEnabled()) {
_log.debug("Reason:", t);
}
}
}

@Override
public void fetchAll() {
this.fetchAndUpdate(false);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ public interface SegmentSynchronizationTask extends Runnable {
* stops the thread
*/
void stop();

/**
* fetch every Segment
* @param addCacheHeader
*/
void fetchAll(boolean addCacheHeader);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class SegmentSynchronizationTaskImp implements SegmentSynchronizationTask
private final AtomicLong _refreshEveryNSeconds;
private final AtomicBoolean _running;
private final Object _lock = new Object();
private final ConcurrentMap<String, SegmentFetcherImp> _segmentFetchers = Maps.newConcurrentMap();
private final ConcurrentMap<String, SegmentFetcher> _segmentFetchers = Maps.newConcurrentMap();
private final SegmentCache _segmentCache;
private final SDKReadinessGates _gates;
private final ScheduledExecutorService _scheduledExecutorService;
Expand Down Expand Up @@ -58,20 +58,12 @@ public SegmentSynchronizationTaskImp(SegmentChangeFetcher segmentChangeFetcher,

@Override
public void run() {
for (Map.Entry<String, SegmentFetcherImp> entry : _segmentFetchers.entrySet()) {
SegmentFetcherImp fetcher = entry.getValue();

if (fetcher == null) {
continue;
}

_scheduledExecutorService.submit(fetcher);
}
this.fetchAll(false);
}

@Override
public void initializeSegment(String segmentName) {
SegmentFetcherImp segment = _segmentFetchers.get(segmentName);
SegmentFetcher segment = _segmentFetchers.get(segmentName);
if (segment != null) {
return;
}
Expand All @@ -94,7 +86,7 @@ public void initializeSegment(String segmentName) {
segment = new SegmentFetcherImp(segmentName, _segmentChangeFetcher, _gates, _segmentCache);

if (_running.get()) {
_scheduledExecutorService.submit(segment);
_scheduledExecutorService.submit(segment::fetchAll);
}

_segmentFetchers.putIfAbsent(segmentName, segment);
Expand Down Expand Up @@ -148,4 +140,21 @@ public void close() {
Thread.currentThread().interrupt();
}
}

@Override
public void fetchAll(boolean addCacheHeader) {
for (Map.Entry<String, SegmentFetcher> entry : _segmentFetchers.entrySet()) {
SegmentFetcher fetcher = entry.getValue();

if (fetcher == null) {
continue;
}

if(addCacheHeader) {
_scheduledExecutorService.submit(fetcher::runWhitCacheHeader);
continue;
}
_scheduledExecutorService.submit(fetcher::fetchAll);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testFetcherWithSpecialCharacters() throws URISyntaxException, IOExce
Metrics.NoopMetrics metrics = new Metrics.NoopMetrics();
HttpSegmentChangeFetcher fetcher = HttpSegmentChangeFetcher.create(httpClientMock, rootTarget, metrics);

SegmentChange change = fetcher.fetch("some_segment", 1234567);
SegmentChange change = fetcher.fetch("some_segment", 1234567, true);

Assert.assertNotNull(change);
Assert.assertEquals(1, change.added.size());
Expand Down
Loading