Permalink
Browse files

Add ForkJoin framework example

  • Loading branch information...
sayembd committed Dec 16, 2018
1 parent 03eca86 commit 01b92b168c8f359649664cd6725157aa353f3f17
@@ -22,6 +22,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.9</java.version>
<apache.httpclient.version>4.5.6</apache.httpclient.version>
</properties>

<dependencyManagement>
@@ -49,6 +50,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${apache.httpclient.version}</version>
</dependency>

<!-- Metrics -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
@@ -0,0 +1,42 @@
package com.sayemahmed.example.forkjoin;

import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.RecursiveTask;

@Slf4j
public class ListSummer extends RecursiveTask<Integer> {
private final List<Integer> listToSum;

ListSummer(List<Integer> listToSum) {
this.listToSum = listToSum;
}

@Override
protected Integer compute() {
if (listToSum.isEmpty()) {
log.info("Found empty list, sum is 0");
return 0;
}

int middleIndex = listToSum.size() / 2;
log.info("List {}, middle Index: {}", listToSum, middleIndex);

List<Integer> leftSublist = listToSum.subList(0, middleIndex);
List<Integer> rightSublist = listToSum.subList(middleIndex + 1, listToSum.size());

ListSummer leftSummer = new ListSummer(leftSublist);
ListSummer rightSummer = new ListSummer(rightSublist);

leftSummer.fork();
rightSummer.fork();

Integer leftSum = leftSummer.join();
Integer rightSum = rightSummer.join();
int total = leftSum + listToSum.get(middleIndex) + rightSum;
log.info("Left sum is {}, right sum is {}, total is {}", leftSum, rightSum, total);

return total;
}
}
@@ -0,0 +1,102 @@
package com.sayemahmed.example.forkjoin;

import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

@Slf4j
public class ResponseLengthCalculator extends RecursiveTask<Map<String, Integer>> {
private final List<String> links;

ResponseLengthCalculator(List<String> links) {
this.links = links;
}

@Override
protected Map<String, Integer> compute() {
if (links.isEmpty()) {
log.info("No more links to fetch");
return Collections.emptyMap();
}

int middle = links.size() / 2;
log.info("Middle index: {}", links, middle);
ResponseLengthCalculator leftPartition = new ResponseLengthCalculator(links.subList(0, middle));
ResponseLengthCalculator rightPartition = new ResponseLengthCalculator(links.subList(middle + 1, links.size()));

log.info("Forking left partition");
leftPartition.fork();
log.info("Left partition forked, now forking right partition");
rightPartition.fork();
log.info("Right partition forked");

String middleLink = links.get(middle);
HttpRequester httpRequester = new HttpRequester(middleLink);
String response;
try {
log.info("Calling managedBlock for {}", middleLink);
ForkJoinPool.managedBlock(httpRequester);
response = httpRequester.response;
} catch (InterruptedException ex) {
log.error("Error occurred while trying to implement blocking link fetcher", ex);
response = "";
}

Map<String, Integer> responseMap = new HashMap<>(links.size());

Map<String, Integer> leftLinks = leftPartition.join();
responseMap.putAll(leftLinks);
responseMap.put(middleLink, response.length());
Map<String, Integer> rightLinks = rightPartition.join();
responseMap.putAll(rightLinks);

log.info("Left map {}, middle length {}, right map {}", leftLinks, response.length(), rightLinks);

return responseMap;
}

private static class HttpRequester implements ForkJoinPool.ManagedBlocker {
private final String link;
private String response;

private HttpRequester(String link) {
this.link = link;
}

@Override
public boolean block() {
HttpGet headRequest = new HttpGet(link);
CloseableHttpClient client = HttpClientBuilder
.create()
.disableRedirectHandling()
.build();
try {
log.info("Executing blocking request for {}", link);
CloseableHttpResponse response = client.execute(headRequest);
log.info("HTTP request for link {} has been executed", link);
this.response = EntityUtils.toString(response.getEntity());
} catch (IOException e) {
log.error("Error while trying to fetch response from link {}: {}", link, e.getMessage());
this.response = "";
}
return true;
}

@Override
public boolean isReleasable() {
return false;
}
}
}

@@ -0,0 +1,46 @@
package com.sayemahmed.example.forkjoin;

import org.junit.Test;

import java.util.List;
import java.util.concurrent.ForkJoinPool;

import static org.assertj.core.api.Assertions.assertThat;

public class ListSummerTest {

@Test
public void shouldSumEmptyList() {
ListSummer summer = new ListSummer(List.of());
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(summer);

int result = summer.join();

assertThat(result).isZero();
}

@Test
public void shouldSumListWithOneElement() {
ListSummer summer = new ListSummer(List.of(5));
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(summer);

int result = summer.join();

assertThat(result).isEqualTo(5);
}

@Test
public void shouldSumListWithMultipleElements() {
ListSummer summer = new ListSummer(List.of(
1, 2, 3, 4, 5, 6, 7, 8, 9
));
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(summer);

int result = summer.join();

assertThat(result).isEqualTo(45);
}
}
@@ -0,0 +1,57 @@
package com.sayemahmed.example.forkjoin;

import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;

import static org.assertj.core.api.Assertions.assertThat;

public class ResponseLengthCalculatorTest {

@Test
public void shouldReturnEmptyMapForEmptyList() {
ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(Collections.emptyList());
ForkJoinPool pool = new ForkJoinPool();

pool.submit(responseLengthCalculator);

Map<String, Integer> result = responseLengthCalculator.join();
assertThat(result).isEmpty();
}

@Test
public void shouldHandle200Ok() {
ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of(
"http://httpstat.us/200"
));
ForkJoinPool pool = new ForkJoinPool();

pool.submit(responseLengthCalculator);

Map<String, Integer> result = responseLengthCalculator.join();
assertThat(result)
.hasSize(1)
.containsKeys("http://httpstat.us/200")
.containsValue(0);
}

@Test
public void shouldFetchResponseForDifferentResponseStatus() {
ResponseLengthCalculator responseLengthCalculator = new ResponseLengthCalculator(List.of(
"http://httpstat.us/200",
"http://httpstat.us/302",
"http://httpstat.us/404",
"http://httpstat.us/502"
));
ForkJoinPool pool = new ForkJoinPool();

pool.submit(responseLengthCalculator);

Map<String, Integer> result = responseLengthCalculator.join();
assertThat(result)
.hasSize(4);
}
}

0 comments on commit 01b92b1

Please sign in to comment.