@@ -5,9 +5,11 @@
import android.content.Intent ;
import android.os.IBinder ;
import android.support.annotation.Nullable ;
import android.support.v4.app.NotificationCompat ;
import android.util.Log ;
import org.thoughtcrime.securesms.ApplicationContext ;
import org.thoughtcrime.securesms.R ;
import org.thoughtcrime.securesms.dependencies.InjectableType ;
import org.thoughtcrime.securesms.gcm.GcmBroadcastReceiver ;
import org.thoughtcrime.securesms.jobs.PushContentReceiveJob ;
@@ -24,16 +26,20 @@
import java.util.List ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.TimeoutException ;
import java.util.concurrent.atomic.AtomicBoolean ;
import javax.inject.Inject ;
public class MessageRetrievalService extends Service implements Runnable , InjectableType , RequirementListener {
public class MessageRetrievalService extends Service implements InjectableType , RequirementListener {
private static final String TAG = MessageRetrievalService . class. getSimpleName();
public static final String ACTION_ACTIVITY_STARTED = " ACTIVITY_STARTED" ;
public static final String ACTION_ACTIVITY_FINISHED = " ACTIVITY_FINISHED" ;
public static final String ACTION_PUSH_RECEIVED = " PUSH_RECEIVED" ;
public static final String ACTION_INITIALIZE = " INITIALIZE" ;
public static final int FOREGROUND_ID = 313399 ;
private static final long REQUEST_TIMEOUT_MINUTES = 1 ;
private NetworkRequirement networkRequirement;
@@ -42,8 +48,9 @@
@Inject
public SignalServiceMessageReceiver receiver;
private int activeActivities = 0 ;
private List<Intent > pushPending = new LinkedList<> ();
private int activeActivities = 0 ;
private List<Intent > pushPending = new LinkedList<> ();
private MessageRetrievalThread retrievalThread = null ;
public static SignalServiceMessagePipe pipe = null ;
@@ -56,7 +63,11 @@ public void onCreate() {
networkRequirementProvider = new NetworkRequirementProvider (this );
networkRequirementProvider. setListener(this );
new Thread (this , " MessageRetrievalService" ). start();
retrievalThread = new MessageRetrievalThread ();
retrievalThread. start();
setForegroundIfNecessary();
}
public int onStartCommand (Intent intent , int flags , int startId ) {
@@ -70,44 +81,11 @@ public int onStartCommand(Intent intent, int flags, int startId) {
}
@Override
public void run () {
while (true ) {
Log . w(TAG , " Waiting for websocket state change...." );
waitForConnectionNecessary();
Log . w(TAG , " Making websocket connection...." );
pipe = receiver. createMessagePipe();
try {
while (isConnectionNecessary()) {
try {
Log . w(TAG , " Reading message..." );
pipe. read(REQUEST_TIMEOUT_MINUTES , TimeUnit . MINUTES ,
new SignalServiceMessagePipe .MessagePipeCallback () {
@Override
public void onMessage (SignalServiceEnvelope envelope ) {
Log . w(TAG , " Retrieved envelope! " + envelope. getSource());
PushContentReceiveJob receiveJob = new PushContentReceiveJob (MessageRetrievalService . this );
receiveJob. handle(envelope, false );
decrementPushReceived();
}
});
} catch (TimeoutException e) {
Log . w(TAG , " Application level read timeout..." );
} catch (InvalidVersionException e) {
Log . w(TAG , e);
}
}
} catch (Throwable e) {
Log . w(TAG , e);
} finally {
Log . w(TAG , " Shutting down pipe..." );
shutdown(pipe);
}
public void onDestroy () {
super . onDestroy();
Log . w(TAG , " Looping..." );
if (retrievalThread != null ) {
retrievalThread. stopThread();
}
}
@@ -123,6 +101,18 @@ public IBinder onBind(Intent intent) {
return null ;
}
private void setForegroundIfNecessary () {
if (TextSecurePreferences . isGcmDisabled(this )) {
NotificationCompat . Builder builder = new NotificationCompat .Builder (this );
builder. setContentTitle(getString(R . string. MessageRetrievalService_signal ));
builder. setContentText(getString(R . string. MessageRetrievalService_background_connection_enabled ));
builder. setPriority(NotificationCompat . PRIORITY_MIN );
builder. setWhen(0 );
builder. setSmallIcon(R . drawable. ic_signal_grey_24dp);
startForeground(FOREGROUND_ID , builder. build());
}
}
private synchronized void incrementActive () {
activeActivities++ ;
Log . w(TAG , " Active Count: " + activeActivities);
@@ -149,11 +139,13 @@ private synchronized void decrementPushReceived() {
}
private synchronized boolean isConnectionNecessary () {
Log . w(TAG , String . format(" Network requirement: %s, active activities: %s, push pending: %s" ,
networkRequirement. isPresent(), activeActivities, pushPending. size()));
boolean isGcmDisabled = TextSecurePreferences . isGcmDisabled(this );
Log . w(TAG , String . format(" Network requirement: %s, active activities: %s, push pending: %s, gcm disabled: %b" ,
networkRequirement. isPresent(), activeActivities, pushPending. size(), isGcmDisabled));
return TextSecurePreferences . isWebsocketRegistered(this ) &&
(activeActivities > 0 || ! pushPending. isEmpty()) &&
return TextSecurePreferences . isWebsocketRegistered(this ) &&
(activeActivities > 0 || ! pushPending. isEmpty() || isGcmDisabled ) &&
networkRequirement. isPresent();
}
@@ -188,4 +180,59 @@ public static void registerActivityStopped(Context activity) {
public static @Nullable SignalServiceMessagePipe getPipe () {
return pipe;
}
private class MessageRetrievalThread extends Thread {
private AtomicBoolean stopThread = new AtomicBoolean (false );
@Override
public void run () {
while (! stopThread. get()) {
Log . w(TAG , " Waiting for websocket state change...." );
waitForConnectionNecessary();
Log . w(TAG , " Making websocket connection...." );
pipe = receiver. createMessagePipe();
SignalServiceMessagePipe localPipe = pipe;
try {
while (isConnectionNecessary() && ! stopThread. get()) {
try {
Log . w(TAG , " Reading message..." );
localPipe. read(REQUEST_TIMEOUT_MINUTES , TimeUnit . MINUTES ,
new SignalServiceMessagePipe .MessagePipeCallback () {
@Override
public void onMessage (SignalServiceEnvelope envelope ) {
Log . w(TAG , " Retrieved envelope! " + envelope. getSource());
PushContentReceiveJob receiveJob = new PushContentReceiveJob (MessageRetrievalService . this );
receiveJob. handle(envelope, false );
decrementPushReceived();
}
});
} catch (TimeoutException e) {
Log . w(TAG , " Application level read timeout..." );
} catch (InvalidVersionException e) {
Log . w(TAG , e);
}
}
} catch (Throwable e) {
Log . w(TAG , e);
} finally {
Log . w(TAG , " Shutting down pipe..." );
shutdown(localPipe);
}
Log . w(TAG , " Looping..." );
}
Log . w(TAG , " Exiting..." );
}
public void stopThread () {
stopThread. set(true );
}
}
}