Skip to content

Commit

Permalink
made it possible the the config contains severall backends
Browse files Browse the repository at this point in the history
  • Loading branch information
Orbiter committed Sep 10, 2017
1 parent c764b5d commit 92dd7da
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 54 deletions.
2 changes: 1 addition & 1 deletion conf/config.properties
Expand Up @@ -115,7 +115,7 @@ DoS.servicereduction = 1000
# peer-to-peer back-end: this is used to assign a 'shadow' peer which
# receives all message data that this peer creates
# if you don't want a p2p operation,remove the backend value
backend=http://api.loklak.org
backend=http://api.loklak.org,http://root.loklak.org
backend.push.enabled=true

# peer-to-peer front peer: this is used to assign scraping activities to
Expand Down
20 changes: 10 additions & 10 deletions src/org/loklak/Caretaker.java
Expand Up @@ -75,7 +75,7 @@ public void shutdown() {
public void run() {
Thread.currentThread().setName("CARETAKER");
// send a message to other peers that I am alive
String[] remote = DAO.getConfig("backend", new String[0], ",");
String[] backends = DAO.getBackend();
int maxRetries = Integer.valueOf(DAO.getConfig("caretaker.backendpush.retries", "5"));
int backOffParameter = Integer.valueOf(DAO.getConfig("caretaker.backendpush.backoff", "3000"));

Expand All @@ -96,7 +96,7 @@ public void run() {
// check ping
if (System.currentTimeMillis() - helloPeriod > helloTime) {
helloTime = System.currentTimeMillis();
HelloService.propagate(remote);
HelloService.propagate(backends);
}

// clear caches
Expand All @@ -111,28 +111,28 @@ public void run() {

// peer-to-peer operation
Timeline tl = DAO.outgoingMessages.takeTimelineMin(Timeline.Order.CREATED_AT, TIMELINE_PUSH_MINSIZE, TIMELINE_PUSH_MAXSIZE);
if (tl != null && tl.size() > 0 && remote.length > 0) {
if (tl != null && tl.size() > 0 && backends.length > 0) {
// transmit the timeline
long start = System.currentTimeMillis();
boolean success = PushServlet.push(remote, tl);
boolean success = PushServlet.push(backends, tl);
if (success) {
DAO.log("success pushing " + tl.size() + " messages to backend " + Arrays.toString(remote) + " in 1st attempt in " + (System.currentTimeMillis() - start) + " ms");
DAO.log("success pushing " + tl.size() + " messages to backend " + Arrays.toString(backends) + " in 1st attempt in " + (System.currentTimeMillis() - start) + " ms");
}
if (!success) {
// we should try again.. but not an infinite number because then
// our timeline in RAM would fill up our RAM creating a memory leak
retrylook: for (int retry = 0; retry < maxRetries; retry++) {
// give back-end time to recover
try {Thread.sleep((retry + 1) * backOffParameter);} catch (InterruptedException e) {}
DAO.log("trying to push (again) " + tl.size() + " messages to backend " + Arrays.toString(remote) + ", attempt #" + (retry + 1) + "/5");
DAO.log("trying to push (again) " + tl.size() + " messages to backend " + Arrays.toString(backends) + ", attempt #" + (retry + 1) + "/5");
start = System.currentTimeMillis();
if (PushServlet.push(remote, tl)) {
DAO.log("success pushing " + tl.size() + " messages to backend " + Arrays.toString(remote) + " in " + (retry + 2) + ". attempt in " + (System.currentTimeMillis() - start) + " ms");
if (PushServlet.push(backends, tl)) {
DAO.log("success pushing " + tl.size() + " messages to backend " + Arrays.toString(backends) + " in " + (retry + 2) + ". attempt in " + (System.currentTimeMillis() - start) + " ms");
success = true;
break retrylook;
}
}
if (!success) DAO.log("failed pushing " + tl.size() + " messages to backend " + Arrays.toString(remote));
if (!success) DAO.log("failed pushing " + tl.size() + " messages to backend " + Arrays.toString(backends));
}
busy = true;
}
Expand All @@ -147,7 +147,7 @@ public void run() {
// run some harvesting steps
if (DAO.getConfig("retrieval.forbackend.enabled", false) &&
DAO.getConfig("backend.push.enabled", false) &&
(DAO.getConfig("backend", "").length() > 0) &&
(DAO.getBackend().length > 0) &&
DAO.outgoingMessages.timelineSize() < TIMELINE_PUSH_MAXSIZE) {
int retrieval_forbackend_concurrency = (int) DAO.getConfig("retrieval.forbackend.concurrency", 1);
int retrieval_forbackend_loops = (int) DAO.getConfig("retrieval.forbackend.loops", 10);
Expand Down
16 changes: 14 additions & 2 deletions src/org/loklak/api/admin/StatusService.java
Expand Up @@ -64,6 +64,18 @@ public JSONObject getDefaultPermissions(BaseUserRole baseUserRole) {
}


public static JSONObject status(final String[] protocolhostportstubs) throws IOException {
IOException e = null;
for (String protocolhostportstub: protocolhostportstubs) {
try {
return status(protocolhostportstub);
} catch (IOException ee) {
e = ee;
}
}
throw e == null ? new IOException("no url given") : e;
}

public static JSONObject status(final String protocolhostportstub) throws IOException {
final String urlstring = protocolhostportstub + "/api/status.json";
byte[] response = ClientConnection.downloadPeer(urlstring);
Expand Down Expand Up @@ -116,11 +128,11 @@ public static JSONObject getSystemConfig(Runtime runtime) throws Exception {
}

public static JSONObject getIndexConfig() throws Exception {
final String backend = DAO.getConfig("backend", "");
final String[] backend = DAO.getBackend();
final boolean backend_push = DAO.getConfig("backend.push.enabled", false);
JSONObject backend_status = null;
JSONObject backend_status_index_sizes = null;
if (backend.length() > 0 && !backend_push) {
if (backend.length > 0 && !backend_push) {
try {
backend_status = StatusService.status(backend);
backend_status_index_sizes = backend_status == null ? null : (JSONObject) backend_status.get("index_sizes");
Expand Down
4 changes: 2 additions & 2 deletions src/org/loklak/api/p2p/HelloService.java
Expand Up @@ -69,11 +69,11 @@ public static void propagate(final String[] hoststubs) {
String peername = (String) DAO.getConfig("peername", "anonymous");

// retrieve some simple statistics from the index
final String backend = DAO.getConfig("backend", "");
final String[] backend = DAO.getBackend();
final boolean backend_push = DAO.getConfig("backend.push.enabled", false);
JSONObject backend_status = null;
JSONObject backend_status_index_sizes = null;
if (backend.length() > 0 && !backend_push) {
if (backend.length > 0 && !backend_push) {
try {
backend_status = StatusService.status(backend);
} catch (IOException e) {
Expand Down
30 changes: 20 additions & 10 deletions src/org/loklak/api/search/SearchServlet.java
Expand Up @@ -72,21 +72,31 @@ public class SearchServlet extends HttpServlet {
public final static String frontpeer_hash = Integer.toHexString(Integer.MAX_VALUE - 1);

// possible values: cache, twitter, all
public static Timeline search(final String protocolhostportstub, final String query, final ArrayList<String> filterList, final Timeline.Order order, final String source, final int count, final int timezoneOffset, final String provider_hash, final long timeout) throws IOException {
public static Timeline search(final String[] protocolhostportstubs, final String query, final ArrayList<String> filterList, final Timeline.Order order, final String source, final int count, final int timezoneOffset, final String provider_hash, final long timeout) throws IOException {
Timeline tl = new Timeline(order);
if ("".equals(query)) return tl;
String urlstring = "";
String filterString = "";

try {
urlstring = protocolhostportstub + "/api/search.json?q=" + URLEncoder.encode(query.replace(' ', '+'), "UTF-8") + "&timezoneOffset=" + timezoneOffset + "&maximumRecords=" + count + "&source=" + (source == null ? "all" : source) + "&minified=true&shortlink=false&timeout=" + timeout;
byte[] jsonb = null;
IOException ee = null;
backendloop: for (String protocolhostportstub: protocolhostportstubs) {
ee = null;
try {
urlstring = protocolhostportstub + "/api/search.json?q=" + URLEncoder.encode(query.replace(' ', '+'), "UTF-8") + "&timezoneOffset=" + timezoneOffset + "&maximumRecords=" + count + "&source=" + (source == null ? "all" : source) + "&minified=true&shortlink=false&timeout=" + timeout;

if (!"".equals(filterString = String.join(", ", filterList))) {
urlstring = urlstring + "&filter=" + filterString;
}

if(!"".equals(filterString = String.join(", ", filterList))) {
urlstring = urlstring + "&filter=" + filterString;
jsonb = ClientConnection.downloadPeer(urlstring);
break backendloop;
} catch (IOException e) {
ee = e;
}
}

byte[] jsonb = ClientConnection.downloadPeer(urlstring);
if (jsonb == null || jsonb.length == 0) throw new IOException("empty content from " + protocolhostportstub);
if (jsonb == null || jsonb.length == 0) throw ee == null ? new IOException("empty content from " + protocolhostportstubs) : ee;
String jsons = UTF8.String(jsonb);
JSONObject json = new JSONObject(jsons);
if (json == null || json.length() == 0) return tl;
Expand Down Expand Up @@ -118,10 +128,10 @@ public static Timeline search(final String protocolhostportstub, final String qu
}

public static Timeline search(
final String protocolhostportstub, final String query, final Timeline.Order order,
final String[] protocolhostportstubs, final String query, final Timeline.Order order,
final String source, final int count, final int timezoneOffset,
final String provider_hash, final long timeout) throws IOException {
return search(protocolhostportstub, query, new ArrayList<>(), order, source, count, timezoneOffset, provider_hash, timeout);
return search(protocolhostportstubs, query, new ArrayList<>(), order, source, count, timezoneOffset, provider_hash, timeout);
}

@Override
Expand Down Expand Up @@ -520,7 +530,7 @@ private static void setTwitterMetaData(JSONObject metadata, final int startRecor

public static void main(String[] args) {
try {
Timeline tl = search("http://loklak.org", "beer", Timeline.Order.CREATED_AT, "cache", 20, -120, backend_hash, 10000);
Timeline tl = search(new String[]{"http://root.loklak.org"}, "beer", Timeline.Order.CREATED_AT, "cache", 20, -120, backend_hash, 10000);
System.out.println(tl.toJSON(false, "search_metadata", "statuses").toString(2));
} catch (IOException e) {
DAO.severe(e);
Expand Down
46 changes: 28 additions & 18 deletions src/org/loklak/api/search/SuggestServlet.java
Expand Up @@ -65,7 +65,7 @@ public class SuggestServlet extends HttpServlet {
public static Map<Integer, JSONObject> cache = new ConcurrentHashMap<>();

public static ResultList<QueryEntry> suggest(
final String protocolhostportstub,
final String[] protocolhostportstubs,
final String q,
final String source,
final int count,
Expand All @@ -80,22 +80,32 @@ public static ResultList<QueryEntry> suggest(
int httpsport = (int) DAO.getConfig("port.https", 9443);
String peername = (String) DAO.getConfig("peername", "anonymous");
ResultList<QueryEntry> rl = new ResultList<QueryEntry>();
String urlstring = "";
urlstring = protocolhostportstub + "/api/suggest.json?q=" + URLEncoder.encode(q.replace(' ', '+'), "UTF-8") +
"&timezoneOffset=" + timezoneOffset +
"&count=" + count +
"&source=" + (source == null ? "all" : source) +
(order == null ? "" : ("&order=" + order)) +
(orderby == null ? "" : ("&orderby=" + orderby)) +
(since == null ? "" : ("&since=" + since)) +
(until == null ? "" : ("&until=" + until)) +
(selectby == null ? "" : ("&selectby=" + selectby)) +
(random < 0 ? "" : ("&random=" + random)) +
"&minified=true" +
"&port.http=" + httpport +
"&port.https=" + httpsport +
"&peername=" + peername;
byte[] response = ClientConnection.downloadPeer(urlstring);
byte[] response = null;
IOException ee = null;
backendloop: for (String protocolhostportstub: protocolhostportstubs) {
try {
ee = null;
String urlstring = protocolhostportstub + "/api/suggest.json?q=" + URLEncoder.encode(q.replace(' ', '+'), "UTF-8") +
"&timezoneOffset=" + timezoneOffset +
"&count=" + count +
"&source=" + (source == null ? "all" : source) +
(order == null ? "" : ("&order=" + order)) +
(orderby == null ? "" : ("&orderby=" + orderby)) +
(since == null ? "" : ("&since=" + since)) +
(until == null ? "" : ("&until=" + until)) +
(selectby == null ? "" : ("&selectby=" + selectby)) +
(random < 0 ? "" : ("&random=" + random)) +
"&minified=true" +
"&port.http=" + httpport +
"&port.https=" + httpsport +
"&peername=" + peername;
response = ClientConnection.downloadPeer(urlstring);
break backendloop;
} catch (IOException e) {
ee = e;
}
}
if (response == null && ee != null) throw ee;
if (response == null || response.length == 0) return rl;
String responseString = UTF8.String(response);
if (responseString == null || responseString.length() == 0 || responseString.startsWith("<")) {
Expand Down Expand Up @@ -228,7 +238,7 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response) t

public static void main(String[] args) {
try {
ResultList<QueryEntry> rl = suggest("http://loklak.org", "","query",1000,"asc","retrieval_next",DateParser.getTimezoneOffset(),null,"now","retrieval_next",3);
ResultList<QueryEntry> rl = suggest(new String[]{"http://root.loklak.org"}, "","query",1000,"asc","retrieval_next",DateParser.getTimezoneOffset(),null,"now","retrieval_next",3);
for (QueryEntry qe: rl) {
System.out.println(UTF8.String(qe.toJSONBytes()));
}
Expand Down
8 changes: 6 additions & 2 deletions src/org/loklak/data/DAO.java
Expand Up @@ -1515,10 +1515,14 @@ public static ArrayList<String> getFrontPeers() {
return getBestPeers(testpeers);
}

public static String[] getBackend() {
return DAO.getConfig("backend", new String[]{"http://api.loklak.org"}, ",");
}

public static List<String> getBackendPeers() {
List<String> testpeers = new ArrayList<>();
if (backendPeerCache.size() == 0) {
String[] remote = DAO.getConfig("backend", new String[0], ",");
final String[] remote = DAO.getBackend();
for (String peer: remote) backendPeerCache.add(peer);
}
testpeers.addAll(backendPeerCache);
Expand All @@ -1545,7 +1549,7 @@ public static Timeline searchOnOtherPeers(final List<String> remote, final Strin
String peer = remote.get(pick);
long start = System.currentTimeMillis();
try {
Timeline tl = SearchServlet.search(peer, q, filterList, order, source, count, timezoneOffset, provider_hash, timeout);
Timeline tl = SearchServlet.search(new String[]{peer}, q, filterList, order, source, count, timezoneOffset, provider_hash, timeout);
peerLatency.put(peer, System.currentTimeMillis() - start);
// to show which peer was used for the retrieval, we move the picked peer to the front of the list
if (pick != 0) remote.add(0, remote.remove(pick));
Expand Down
4 changes: 2 additions & 2 deletions src/org/loklak/data/OutgoingMessageBuffer.java
Expand Up @@ -37,7 +37,7 @@ public OutgoingMessageBuffer() {
}

public void transmitTimelineToBackend(Timeline tl) {
if (DAO.getConfig("backend", new String[0], ",").length > 0) {
if (DAO.getBackend().length > 0) {
boolean clone = false;
for (TwitterTweet message: tl) {
if (!message.getSourceType().propagate()) {clone = true; break;}
Expand All @@ -56,7 +56,7 @@ public void transmitTimelineToBackend(Timeline tl) {

public void transmitMessage(final TwitterTweet tweet, final UserEntry user) {
if (!tweet.getSourceType().propagate()) return;
if (DAO.getConfig("backend", new String[0], ",").length <= 0) return;
if (DAO.getBackend().length <= 0) return;
if (!DAO.getConfig("backend.push.enabled", false)) return;
Timeline tl = this.pushToBackendTimeline.poll();
if (tl == null) tl = new Timeline(Timeline.Order.CREATED_AT);
Expand Down
8 changes: 4 additions & 4 deletions src/org/loklak/harvester/PushThread.java
Expand Up @@ -5,10 +5,10 @@
import org.loklak.objects.Timeline;

public class PushThread implements Runnable {
private String peer;
private String[] peers;
private Timeline tl;
public PushThread(String peer, Timeline tl) {
this.peer = peer;
public PushThread(String[] peers, Timeline tl) {
this.peers = peers;
this.tl = tl;
}
@Override
Expand All @@ -17,7 +17,7 @@ public void run() {
for (int i = 0; i < 5; i++) {
try {
long start = System.currentTimeMillis();
success = PushServlet.push(new String[]{peer}, tl);
success = PushServlet.push(this.peers, tl);
if (success) {
DAO.log("retrieval of " + tl.size() + " new messages for q = " + tl.getQuery() +
", pushed to backend synchronously in " + (System.currentTimeMillis() - start) + " ms; amount = " + tl.size());
Expand Down
2 changes: 1 addition & 1 deletion src/org/loklak/harvester/strategy/ClassicHarvester.java
Expand Up @@ -74,7 +74,7 @@ public void checkContext(String s, boolean front) {
}

public int harvest() {
String backend = DAO.getConfig("backend","http://api.loklak.org");
String[] backend = DAO.getBackend();

if (random.nextInt(100) != 0 && hitsOnBackend < HITS_LIMIT_4_QUERIES && pendingQueries.size() == 0 && pendingContext.size() > 0) {
// harvest using the collected keys instead using the queries
Expand Down
4 changes: 2 additions & 2 deletions src/org/loklak/harvester/strategy/KaizenHarvester.java
Expand Up @@ -35,7 +35,7 @@
*/
public class KaizenHarvester implements Harvester {

private final String BACKEND;
private final String[] BACKEND;
private final int SUGGESTIONS_COUNT;
private final int SUGGESTIONS_RANDOM;
private final int PLACE_RADIUS;
Expand All @@ -50,7 +50,7 @@ public class KaizenHarvester implements Harvester {
private Twitter twitter = null;

public KaizenHarvester(KaizenQueries queries) {
BACKEND = DAO.getConfig("backend", "http://loklak.org");
BACKEND = DAO.getBackend();
SUGGESTIONS_COUNT = DAO.getConfig("harvester.kaizen.suggestions_count", 1000);
SUGGESTIONS_RANDOM = DAO.getConfig("harvester.kaizen.suggestions_random", 5);
PLACE_RADIUS = DAO.getConfig("harvester.kaizen.place_radius", 5);
Expand Down

0 comments on commit 92dd7da

Please sign in to comment.