From 3059bc4cb78c3f537f5fdfcfbd9a05a67a17b7c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Romain=20Vimont=20=28=C2=AEom=29?= Date: Fri, 8 Mar 2013 13:42:56 +0100 Subject: [PATCH] Walkie-talkie service --- AndroidManifest.xml | 9 + .../walkietalkie/AudioReceiver.java | 256 ++++++++++++++++++ .../walkietalkie/AudioSender.java | 237 ++++++++++++++++ .../walkietalkie/Compression.java | 18 ++ src/org/servalproject/walkietalkie/G711.java | 91 +++++++ src/org/servalproject/walkietalkie/Mixer.java | 250 +++++++++++++++++ .../walkietalkie/StreamBuffer.java | 125 +++++++++ .../walkietalkie/WalkieTalkieRecipient.java | 80 ++++++ .../walkietalkie/WalkieTalkieService.java | 207 ++++++++++++++ 9 files changed, 1273 insertions(+) create mode 100644 src/org/servalproject/walkietalkie/AudioReceiver.java create mode 100644 src/org/servalproject/walkietalkie/AudioSender.java create mode 100644 src/org/servalproject/walkietalkie/Compression.java create mode 100644 src/org/servalproject/walkietalkie/G711.java create mode 100644 src/org/servalproject/walkietalkie/Mixer.java create mode 100644 src/org/servalproject/walkietalkie/StreamBuffer.java create mode 100644 src/org/servalproject/walkietalkie/WalkieTalkieRecipient.java create mode 100644 src/org/servalproject/walkietalkie/WalkieTalkieService.java diff --git a/AndroidManifest.xml b/AndroidManifest.xml index 05b34ce7..8f202134 100644 --- a/AndroidManifest.xml +++ b/AndroidManifest.xml @@ -253,6 +253,15 @@ android:resource="@xml/contacts" /> + + + + + + + + + diff --git a/src/org/servalproject/walkietalkie/AudioReceiver.java b/src/org/servalproject/walkietalkie/AudioReceiver.java new file mode 100644 index 00000000..f429bac9 --- /dev/null +++ b/src/org/servalproject/walkietalkie/AudioReceiver.java @@ -0,0 +1,256 @@ +package org.servalproject.walkietalkie; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.servalproject.servald.mdp.MeshPacket; +import org.servalproject.servald.mdp.MeshSocket; +import org.servalproject.servald.mdp.MeshSocketException; + +import android.media.AudioFormat; +import android.media.AudioManager; +import android.media.AudioTrack; +import android.os.SystemClock; +import android.util.Log; + +/** + * Receive audio packets, bufferize and mix them, then play. + * + * @author Romain Vimont (®om) + * + */ +public class AudioReceiver { + + private final ExecutorService bufferizerExecutor = Executors.newSingleThreadExecutor(); + private final ExecutorService playerExecutor = Executors.newSingleThreadExecutor(); + + private static final String TAG = "AudioReceiver"; + + public static final Compression COMPRESSION = AudioSender.COMPRESSION; + + public static final int PACKET_SIZE = AudioSender.PACKET_SIZE; + public static final int HEADER_SIZE = AudioSender.HEADER_SIZE; + public static final int PAYLOAD_SIZE = AudioSender.PAYLOAD_SIZE; + + private static final int RATE = AudioSender.RATE; // Hz + private static final int CHANNEL = AudioFormat.CHANNEL_OUT_MONO; + private static final int FORMAT = AudioFormat.ENCODING_PCM_16BIT; + private static final int PLAYER_BUFFER_SIZE = AudioTrack + .getMinBufferSize(RATE, CHANNEL, FORMAT) * 2; + + private static final int BUFFER_MS = 1000; + private static final int DELAY_IN_MS = 100; + private static final int DELAY_IN_SAMPLES = Mixer.toSamples(DELAY_IN_MS, RATE); + + private MeshSocket socket; + private int port; + + private Bufferizer bufferizer; + private Player player; + private Future bufferizerFuture; + private Future playerFuture; + + public AudioReceiver(int port) { + this.port = port; + } + + public synchronized void start() { + Mixer mixer = new Mixer(RATE, BUFFER_MS, DELAY_IN_SAMPLES); + + bufferizer = new Bufferizer(mixer); + bufferizerFuture = bufferizerExecutor.submit(bufferizer); + + player = new Player(mixer); + playerFuture = playerExecutor.submit(player); + } + + public synchronized void stop() { + if (isRunning()) { + bufferizerFuture.cancel(true); + bufferizer.stopSocket(); + bufferizer = null; + playerFuture.cancel(true); + player.stopMixer(); + player = null; + } + } + + private synchronized boolean isRunning() { + return bufferizer != null; + } + + private class Bufferizer implements Runnable { + + private Mixer mixer; + + private volatile boolean stopped; + + Bufferizer(Mixer mixer) { + this.mixer = mixer; + } + + public synchronized void stopSocket() { + stopped = true; + if (socket != null) { + /* close socket for unblocking receive */ + socket.close(); + } + } + + @Override + public void run() { + try { + int attempts = 0; + do { + try { + synchronized (this) { + if (stopped) { + return; + } + /* try to initialize mesh socket */ + socket = new MeshSocket(port); + } + } catch (MeshSocketException e) { + /* + * attempt failed, maybe because servald is not started yet (State becomes + * "on" *before* servald is really on, need to fix it upstream on batphone) + */ + + if (++attempts >= 5) { + /* definitively fail after 5 tries */ + throw e; + } + + Log.e(TAG, "Receiver socket creation failed, retrying...", e); + + /* retry after 1.5 s */ + try { + Thread.sleep(1500); + } catch (InterruptedException ie) { + /* do nothing, but sleep() is interrupted */ + } + } + } while (socket == null); + + byte[] buf = new byte[PACKET_SIZE]; + byte[] writeBuf = new byte[COMPRESSION.ratio * PAYLOAD_SIZE]; + MeshPacket packet = new MeshPacket(buf, PACKET_SIZE); + + while (!stopped) { + try { + socket.receive(packet); + + int seq = (buf[0] & 0xff) << 8 | buf[1] & 0xff; + int timestamp = (buf[2] & 0xff) << 24 | (buf[3] & 0xff) << 16 + | (buf[4] & 0xff) << 8 | buf[5] & 0xff; + int ssrc = (buf[6] & 0xff) << 24 | (buf[7] & 0xff) << 16 + | (buf[8] & 0xff) << 8 | buf[9] & 0xff; + Log.i(TAG, "ssrc=" + ssrc + ", " + buf[6] + ":" + buf[7] + ":" + buf[8] + + ":" + buf[9]); + + int writeBufLength = decompress(buf, writeBuf, HEADER_SIZE, + packet.getLength() - HEADER_SIZE); + int written = mixer.write(ssrc, timestamp, writeBuf, 0, writeBufLength); + Log.i(TAG, "(" + ssrc + ") Packet " + seq + "[" + packet.getBuf().length + + "] " + written); + + } catch (IOException e) { + if (!stopped) { + Log.e(TAG, "Cannot receive data", e); + } + } + } + } catch (MeshSocketException e) { + Log.e(TAG, "Cannot create receiver socket", e); + } finally { + + } + } + }; + + private class Player implements Runnable { + + private Mixer mixer; + + private AudioTrack audioTrack; + + private volatile boolean stopped; + + Player(Mixer mixer) { + this.mixer = mixer; + } + + public synchronized void stopMixer() { + stopped = true; + mixer.close(); + } + + @Override + public void run() { + try { + synchronized (this) { + if (stopped) { + return; + } + audioTrack = new AudioTrack(AudioManager.STREAM_MUSIC, RATE, CHANNEL, FORMAT, + PLAYER_BUFFER_SIZE, AudioTrack.MODE_STREAM); + } + + byte[] buf = new byte[PAYLOAD_SIZE]; + + while (!stopped) { + int read; + synchronized (mixer) { + read = mixer.read(buf, 0, buf.length); + + if (stopped || read == 0) { + return; + } + + Log.i(TAG, "mixerPlayer.read() : " + SystemClock.elapsedRealtime() + " [" + + read + "]"); + mixer.move(read); + } + audioTrack.write(buf, 0, read); + /* play and stop after playing this packet (unless another call play() again) */ + audioTrack.play(); + audioTrack.stop(); + } + } finally { + if (audioTrack == null) { + audioTrack.release(); + } + } + } + }; + + private static int decompress(byte[] buf, byte[] writeBuf, int bufOffset, int bufPayloadLength) { + switch (COMPRESSION) { + case NONE: + System.arraycopy(buf, bufOffset, writeBuf, 0, bufPayloadLength); + return bufPayloadLength; + case TO_8_BITS: + /* convert 8 bits to 16 bits */ + for (int i = 0; i < bufPayloadLength; i++) { + /* recreate lower bits read 8 bits (little endian) */ + writeBuf[2 * i + 1] = buf[bufOffset + i]; + } + return bufPayloadLength * 2; + case A_LAW: + for (int i = 0; i < bufPayloadLength; i++) { + byte alaw = buf[i + bufOffset]; + int sample = G711.decodeALaw(alaw); + byte msb = (byte) (sample >> 8); + byte lsb = (byte) sample; + writeBuf[2 * i + 1] = msb; + writeBuf[2 * i] = lsb; + } + return bufPayloadLength * 2; + default: + throw new UnsupportedOperationException(COMPRESSION + " not implemented"); + } + } + +} diff --git a/src/org/servalproject/walkietalkie/AudioSender.java b/src/org/servalproject/walkietalkie/AudioSender.java new file mode 100644 index 00000000..538a0d8f --- /dev/null +++ b/src/org/servalproject/walkietalkie/AudioSender.java @@ -0,0 +1,237 @@ +package org.servalproject.walkietalkie; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.servalproject.servald.mdp.MeshPacket; +import org.servalproject.servald.mdp.MeshSocket; +import org.servalproject.servald.mdp.MeshSocketAddress; +import org.servalproject.servald.mdp.MeshSocketException; + +import android.media.AudioFormat; +import android.media.AudioRecord; +import android.media.MediaRecorder; +import android.os.SystemClock; +import android.util.Log; + +/** + * Record from micropohne, packetize and send audio packets. + * + * @author Romain Vimont (®om) + * + */ +public class AudioSender { + + private final ExecutorService workerExecutor = Executors.newSingleThreadExecutor(); + + private static final String TAG = "AudioSender"; + + private static final Random RANDOM = new Random(); + + public static final Compression COMPRESSION = Compression.A_LAW; + + public static final int PACKET_SIZE = 512 / COMPRESSION.ratio; + public static final int HEADER_SIZE = 10; + public static final int PAYLOAD_SIZE = PACKET_SIZE - HEADER_SIZE; + + public static final int RATE = 8000; // Hz + private static final int CHANNEL = AudioFormat.CHANNEL_IN_MONO; + private static final int FORMAT = AudioFormat.ENCODING_PCM_16BIT; + private static final int RECORDER_BUFFER_SIZE = AudioRecord.getMinBufferSize(RATE, CHANNEL, + FORMAT) * 2; + + private int localPort; + + private Worker worker; + private Future future; + + public AudioSender(int localPort) { + this.localPort = localPort; + } + + public synchronized void start(MeshSocketAddress... recipients) { + worker = new Worker(recipients); + future = workerExecutor.submit(worker); + } + + public synchronized void stop() { + if (isRunning()) { + future.cancel(true); + worker.stopRecording(); + worker = null; + future = null; + } + } + + private synchronized boolean isRunning() { + return worker != null; + } + + private class Worker implements Runnable { + + private MeshSocketAddress[] recipients; + + private AudioRecord audioRecord; + + private volatile boolean stopped; + + Worker(MeshSocketAddress... recipients) { + this.recipients = recipients; + } + + public synchronized void stopRecording() { + stopped = true; + if (audioRecord != null) { + try { + /* stop now for unblocking read */ + audioRecord.stop(); + } catch (IllegalStateException e) { + /* do nothing */ + } + } + } + + @Override + public void run() { + MeshSocket socket = null; + try { + /* initialize mesh socket */ + socket = new MeshSocket(localPort); + + synchronized (this) { + if (stopped) { + return; + } + /* init audioRecord only if not already stopped */ + audioRecord = new AudioRecord(MediaRecorder.AudioSource.MIC, RATE, + AudioFormat.CHANNEL_IN_MONO, FORMAT, RECORDER_BUFFER_SIZE); + + /* start recording microphone */ + audioRecord.startRecording(); + } + + /* packet headers */ + short seq = 0; + int timestamp = 0; + int ssrc = RANDOM.nextInt(); + + /* packet buffer */ + byte[] buf = new byte[PACKET_SIZE]; + + /* always read 16 bits / sample, but send 8 or 16 bits / sample. */ + byte[] readBuf = new byte[COMPRESSION.ratio * PAYLOAD_SIZE]; + + /* static headers */ + buf[6] = (byte) (ssrc >> 24); + buf[7] = (byte) (ssrc >> 16); + buf[8] = (byte) (ssrc >> 8); + buf[9] = (byte) ssrc; + + /* mesh packet */ + MeshPacket packet = new MeshPacket(buf, PACKET_SIZE); + packet.setQos(MeshPacket.OQ_ISOCHRONOUS_VOICE); + packet.setFlags(MeshPacket.FLAG_MDP_NOCRYPT | MeshPacket.FLAG_MDP_NOSIGN); + + long start = SystemClock.elapsedRealtime(); + long fromStart = 0; /* in milliseconds */ + int fromStartTU = 0; /* in timestamp units (for example 1 TU = 1 sample = 2 bytes) */ + + while (!stopped) { + Log.i(TAG, "Reading packet " + seq); + + /* dynamic headers */ + buf[0] = (byte) (seq >> 8); + buf[1] = (byte) seq; + buf[2] = (byte) (timestamp >> 24); + buf[3] = (byte) (timestamp >> 16); + buf[4] = (byte) (timestamp >> 8); + buf[5] = (byte) timestamp; + + /* read from microphone */ + int read = audioRecord.read(readBuf, 0, COMPRESSION.ratio * PAYLOAD_SIZE); + + /* compress data */ + int bufLength = compress(readBuf, buf, HEADER_SIZE, read); + + packet.setLength(HEADER_SIZE + bufLength); + + /* send the packet to all recipients */ + for (MeshSocketAddress to : recipients) { + packet.setSid(to.getSid()); + packet.setPort(to.getPort()); + try { + Log.i(TAG, "Prepare packet " + seq + " [" + read / 2 + "] to send to " + + to.getSid() + ":" + to.getPort()); + socket.send(packet); + Log.i(TAG, + "Paquet " + seq + " sent to " + to.getSid() + ":" + + to.getPort()); + } catch (IOException e) { + Log.e(TAG, "Cannot send data", e); + } + } + + long now = SystemClock.elapsedRealtime(); + fromStart = now - start; + fromStartTU = (int) Math.max(0, fromStart * RATE / 1000); + + if (Math.abs(fromStartTU - timestamp) > 1000) { + /* + * micro does not record exactly at the right rate, we have to correct it + * (here every 1k samples of deviation) + */ + Log.i(TAG, "--MICROPHONE DEVIATION CORRECTION-- timestamp was " + timestamp + + ", timestamp = " + fromStartTU); + timestamp = fromStartTU; + } + Log.i(TAG, "theoretical-timestamp(" + fromStartTU + ") - recorded-timestamp(" + + timestamp + ") = " + (fromStartTU - timestamp)); + + seq++; + timestamp += read / 2; /* 2 bytes per sample */ + } + } catch (MeshSocketException e) { + Log.e(TAG, "Cannot create mesh socket", e); + } finally { + if (audioRecord != null) { + audioRecord.release(); + } + if (socket != null) { + socket.close(); + } + } + } + }; + + private static int compress(byte[] readBuf, byte[] buf, int bufOffset, int readBufLength) { + switch (COMPRESSION) { + case NONE: + System.arraycopy(readBuf, 0, buf, bufOffset, readBufLength); + return readBufLength; + case TO_8_BITS: + /* convert 16 bits to 8 bits */ + for (int i = 0; i < readBufLength / 2; i++) { + byte b = readBuf[2 * i + 1]; + if (b < 0) { + b++; + } + /* discard lower bits read 8 bits (little endian) */ + buf[bufOffset + i] = b; + } + return readBufLength / 2; + case A_LAW: + for (int i = 0; i < readBufLength / 2; i++) { + int sample = readBuf[2 * i] & 0xff | readBuf[2 * i + 1] << 8; /* 16 bits signed */ + byte alaw = G711.encodeALaw(sample); + buf[bufOffset + i] = alaw; + } + return readBufLength / 2; + default: + throw new UnsupportedOperationException(COMPRESSION + " not implemented"); + } + } + +} diff --git a/src/org/servalproject/walkietalkie/Compression.java b/src/org/servalproject/walkietalkie/Compression.java new file mode 100644 index 00000000..7084caa0 --- /dev/null +++ b/src/org/servalproject/walkietalkie/Compression.java @@ -0,0 +1,18 @@ +package org.servalproject.walkietalkie; + +/** + * Compression type. + * + * @author Romain Vimont (®om) + * + */ +public enum Compression { + + NONE(1), TO_8_BITS(2), A_LAW(2); + + int ratio; + + Compression(int ratio) { + this.ratio = ratio; + } +} diff --git a/src/org/servalproject/walkietalkie/G711.java b/src/org/servalproject/walkietalkie/G711.java new file mode 100644 index 00000000..ee159f6e --- /dev/null +++ b/src/org/servalproject/walkietalkie/G711.java @@ -0,0 +1,91 @@ +package org.servalproject.walkietalkie; + +/** + * Encode and decode linear to A-law. + * + * Implementation deduced from principle explained on Wikipedia: + * http://fr.wikipedia.org/wiki/Loi_A#Transformation_discr.C3.A8te + * + * @author Romain Vimont (®om) + * + */ +public class G711 { + + /** + * Encode a linear sample to A-Law. + * + * @param linear + * Linear sample. + * @return A-Law sample. + */ + public static byte encodeALaw(int linear) { + /* @formatter:off + * + * (if s=1, then the remaining of "linear" is 2's complement) + * in out + * s0000000wxyz---- s000wxyz + * s0000001wxyz---- s001wxyz + * s000001wxyz----- s010wxyz + * s00001wxyz------ s011wxyz + * s0001wxyz------- s100wxyz + * s001wxyz-------- s101wxyz + * s01wxyz--------- s110wxyz + * s1wxyz---------- s111wxyz + */ + int sign = linear < 0 ? 0x80 : 0; + if (linear < 0) { + linear = -linear; + } + int sample11 = (linear >> 4) & 0x7ff; /* 11 most significant bits unsigned */ + int prefix = 7; + int tmp = sample11; + while (prefix > 0 && (tmp & 0x400) == 0) { + prefix--; + tmp <<= 1; + } + int wxyz; + if (prefix == 0) { + wxyz = sample11 & 0xf; + } else { + wxyz = (tmp >> 6) & 0xf; + } + byte res = (byte) (sign | (prefix << 4) | wxyz); + return res; + } + + /** + * Decode an A-Law sample to linear. + * + * @param alaw + * A-Law sample. + * @return Linear sample. + */ + public static int decodeALaw(byte alaw) { + /* @formatter:off + * + * (if s=1, then the remaining of "linear" is 2's complement) + * in out + * s000wxyz s0000000wxyz0000 + * s001wxyz s0000001wxyz0000 + * s010wxyz s000001wxyz00000 + * s011wxyz s00001wxyz000000 + * s100wxyz s0001wxyz0000000 + * s101wxyz s001wxyz00000000 + * s110wxyz s01wxyz000000000 + * s111wxyz s1wxyz0000000000 + */ + int prefix = (alaw >> 4) & 7; + int wxyz = alaw & 0xf; + int res; + if (prefix == 0) { + res = wxyz << 4; + } else { + res = (0x10 | wxyz) << (prefix + 3); + } + if (alaw < 0) { + res = -res; + } + return res; + } + +} diff --git a/src/org/servalproject/walkietalkie/Mixer.java b/src/org/servalproject/walkietalkie/Mixer.java new file mode 100644 index 00000000..602cebfc --- /dev/null +++ b/src/org/servalproject/walkietalkie/Mixer.java @@ -0,0 +1,250 @@ +package org.servalproject.walkietalkie; + +import java.util.ArrayList; +import java.util.List; + +import android.os.SystemClock; +import android.util.Log; + +/** + * Audio stream mixer. + * + * Supports only 2 byte per sample (it is always the case once the audio sample is decoded). + * + * @author Romain Vimont (®om) + * + */ +public class Mixer { + + private static final String TAG = "Mixer"; + + private static final int BUFFER_MAX_IDLE_TIME = 600; /* ms without writes */ + private static final int MAX_PLAYING_LAG = 50; /* ms */ + + private int rate; + private int bufferMs; + private long origin; /* timestamp of cursor=0, in milliseconds */ + private int cursor; /* number of next mix sample to read */ + private int delayInSamples; /* delay of read */ + + private boolean closed; + + public Mixer(int rate, int bufferMs, int delayInSamples) { + this.rate = rate; + this.bufferMs = bufferMs; + this.delayInSamples = delayInSamples; + } + + public static int toSamples(int delayInMs, int rate) { + return delayInMs * rate / 1000; + } + + public synchronized int getRate() { + return rate; + } + + public synchronized int getBufferMs() { + return bufferMs; + } + + public long getOrigin() { + return origin; + } + + public int getCursor() { + return cursor; + } + + private class Source { + + private int ssrc; + private StreamBuffer streamBuffer; + private long lastTouch; + + private Source(int ssrc, int sampleOffset) { + this.ssrc = ssrc; + this.streamBuffer = new StreamBuffer(2 * rate * bufferMs / 1000, 2 * sampleOffset); + } + + private void touch() { + lastTouch = SystemClock.elapsedRealtime(); + } + } + + public List sources = new ArrayList(); + + private int getSourceIndex(int ssrc) { + int len = sources.size(); + int i = 0; + while (i < len && sources.get(i).ssrc != ssrc) { + i++; + } + return i == len ? -1 : i; + } + + private long nextGCTimestamp; + + private void removeOldSources() { + long now = SystemClock.elapsedRealtime(); + if (nextGCTimestamp > now) + return; // later + long notOlderThan = now - BUFFER_MAX_IDLE_TIME; + int i = sources.size() - 1; + long min = Long.MAX_VALUE; + while (i >= 0) { + if (sources.get(i).lastTouch < notOlderThan) { + sources.remove(i); + } else { + min = Math.min(sources.get(i).lastTouch, min); + nextGCTimestamp = min + BUFFER_MAX_IDLE_TIME; + } + i--; + } + } + + public synchronized void flush() { + sources.clear(); + } + + private synchronized Source createSource(int ssrc, int sampleOffset) { + int sourceIndex; + boolean wasEmpty; + if (sources.isEmpty()) { + wasEmpty = true; + sourceIndex = -1; + cursor = 0; + origin = SystemClock.elapsedRealtime(); + } else { + wasEmpty = false; + sourceIndex = getSourceIndex(ssrc); + } + Source source; + Log.i(TAG, "sourceIndex[" + ssrc + "] = " + sourceIndex); + if (sourceIndex == -1) { + source = new Source(ssrc, sampleOffset); + sources.add(source); + } else { + source = sources.get(sourceIndex); + } + if (wasEmpty) { + notify(); + } + return source; + } + + public synchronized int write(int ssrc, int sampleOffset, byte[] data, int dataOffset, + int dataLength) { + Log.i(TAG, "write(ssrc=" + ssrc + ", sampleOffset=" + sampleOffset + ", ...)"); + Source source = createSource(ssrc, sampleOffset - delayInSamples); + source.touch(); + return source.streamBuffer.write(2 * sampleOffset, data, dataOffset, dataLength); + } + + public synchronized int read(byte[] data, int dataOffset, int dataLength) { + removeOldSources(); + + /* blocking read() */ + while (!closed && sources.size() == 0) { + try { + wait(); + } catch (InterruptedException e) { + return 0; + } + } + + if (closed) { + return 0; + } + + /* wait for reading "at real time" */ + long now = SystemClock.elapsedRealtime(); + long target = origin + (cursor + dataLength / 2) * 1000 / rate; + if (target > now) { + try { + Thread.sleep(target - now); + } catch (InterruptedException e) { + return 0; + } + } else if (target < now - MAX_PLAYING_LAG) { + /* playing lag, eat bytes */ + int msToEat = (int) (now - target); + int samplesToEat = toSamples(msToEat, rate); + move(2 * samplesToEat); + Log.i(TAG, "Playing lag: eat " + msToEat + " ms (" + samplesToEat + " samples)"); + } + + dataLength &= ~1; /* make dataLength even */ + int[] sum = new int[dataLength / 2]; + int n = sources.size(); + if (n == 0) { + return 0; + } + + int maxRead = 0; + + /* for each source, add value to the sum */ + for (Source src : sources) { + byte[] buf = new byte[dataLength]; + int read = src.streamBuffer.read(buf, 0, dataLength); + if (read > maxRead) { + maxRead = read; + } + /* for each sample */ + for (int j = 0; j < read / 2; j++) { + /* little endian */ + int sample = buf[2 * j] & 0xff | buf[2 * j + 1] << 8; + sum[j] += sample; + } + } + + /* mix */ + for (int i = 0; i < maxRead / 2; i++) { + /* z is the mix result between -1 and 1 */ + double z = sum[i] / (n * 32768.); + /* use mix_f from http://blog.rom1v.com/2013/01/le-mixage-audio/ */ + int sgn = z >= 0 ? 1 : -1; + double g = sgn * (1 - Math.pow(1 - sgn * z, n)); + int mix = toInt(g, 16); + // int mix = sum[i] / n; + + /* little endian */ + data[2 * i + dataOffset] = (byte) mix; + data[2 * i + 1 + dataOffset] = (byte) (mix >> 8); + } + + return maxRead; + } + + public synchronized void move(int bytes) { + bytes &= ~1; /* make bytes even */ + for (int i = 0; i < sources.size(); i++) { + sources.get(i).streamBuffer.move(bytes); + } + + cursor += bytes / 2; + } + + public synchronized void close() { + closed = true; + notifyAll(); + } + + /** + * Convert a sample having value between -1 and 1 to a signed int value on {@code bits} bits. + * + * @param x + * Sample value between -1 and 1. + * @param bits + * Number of bits + * @return Signed int sample value. + */ + private static int toInt(double x, int bits) { + int maxAmpl = 1 << (bits - 1); // 2^(bits-1) + int res = (int) (x * maxAmpl); + if (res == maxAmpl) { + res = maxAmpl - 1; + } + return res; + } + +} diff --git a/src/org/servalproject/walkietalkie/StreamBuffer.java b/src/org/servalproject/walkietalkie/StreamBuffer.java new file mode 100644 index 00000000..1738de9f --- /dev/null +++ b/src/org/servalproject/walkietalkie/StreamBuffer.java @@ -0,0 +1,125 @@ +package org.servalproject.walkietalkie; + +import java.util.Arrays; + +import android.util.Log; + +/** + * Stream buffer. + * + * Contains a window (implemented as circular buffer) of {@code length} bytes of a stream, starting + * at {@code streamOffset}. + * + * @author Romain Vimont (®om) + * + */ +public class StreamBuffer { + + private byte[] buf; /* Stream buffer */ + private int length; /* data.length */ + private int streamOffset; /* data[head] = stream[streamOffset] */ + private int head; + + public StreamBuffer(int length) { + buf = new byte[length]; + this.length = length; + } + + public StreamBuffer(int length, int streamOffset) { + this(length); + this.streamOffset = streamOffset; + } + + public synchronized int write(int streamOffset, byte[] data, int dataOffset, int dataLength) { + int start = streamOffset - this.streamOffset; + + Log.i("StreamBuffer", "--> " + (streamOffset - this.streamOffset)); + if (start > length) { + Log.i("StreamBuffer", "Buffer overflow : " + this.streamOffset + " - " + streamOffset); + /* buffer overflow (write too far on the right) */ + return 0; + } + + if (start < 0) { + Log.i("StreamBuffer", "Buffer underrun : " + this.streamOffset + " - " + streamOffset); + /* buffer underrun (write too far on the left) */ + dataOffset -= start; + dataLength += start; + start = 0; + } + + if (dataLength <= 0) { + /* no data to write (out of range) */ + return 0; + } + + if (dataLength > length) { + /* ignore first bytes */ + dataOffset += dataLength - length; + dataLength = length; + } + + if (start + dataLength > length) { + /* ignore last bytes */ + dataLength = length - start; + } + + /* written bytes to array (dataLength can change below) */ + int written = dataLength; + + /* byte count for the right part (or -offset for the left part) */ + int r = Math.min(dataLength, length - head - start); + if (r > 0) { + System.arraycopy(data, dataOffset, buf, head + start, r); /* right part */ + dataOffset += r; + dataLength -= r; + } + if (dataLength > 0) { + int bufOffset = Math.max(0, -r); + System.arraycopy(data, dataOffset, buf, bufOffset, dataLength); /* left part */ + } + + return written; + } + + public synchronized int read(byte[] data, int dataOffset, int dataLength) { + int r = Math.min(dataLength, length - head); + System.arraycopy(buf, head, data, dataOffset, r); + if (dataLength - r > 0) { + System.arraycopy(buf, 0, data, dataOffset + r, dataLength - r); + } + return dataLength; + } + + public synchronized void move(int bytes) { + streamOffset += bytes; + if (Math.abs(bytes) >= length) { + Arrays.fill(buf, (byte) 0); + head = 0; + return; + } + if (bytes > 0) { + /* byte count for the right part */ + int r = Math.min(bytes, length - head); + Arrays.fill(buf, head, head + r, (byte) 0); /* right part */ + if (bytes - r > 0) { + Arrays.fill(buf, 0, bytes - r, (byte) 0); /* left part */ + } + head = (head + bytes) % length; + } else if (bytes < 0) { + int r = Math.min(-bytes, head); + Arrays.fill(buf, head - r, head, (byte) 0); /* left part */ + if (bytes + r < 0) { + Arrays.fill(buf, length + bytes + r, length, (byte) 0); /* right part */ + } + head = (length + head + bytes) % length; + } + } + + public synchronized void flush() { + Arrays.fill(buf, (byte) 0); + streamOffset = 0; + head = 0; + } + +} diff --git a/src/org/servalproject/walkietalkie/WalkieTalkieRecipient.java b/src/org/servalproject/walkietalkie/WalkieTalkieRecipient.java new file mode 100644 index 00000000..99aef48d --- /dev/null +++ b/src/org/servalproject/walkietalkie/WalkieTalkieRecipient.java @@ -0,0 +1,80 @@ +package org.servalproject.walkietalkie; + +import org.servalproject.servald.AbstractId.InvalidBinaryException; +import org.servalproject.servald.SubscriberId; +import org.servalproject.servald.mdp.MeshSocketAddress; + +import android.os.Parcel; +import android.os.Parcelable; + +/** + * Wrapper of {@link MeshSocketAddress} implementing {@link Parcelable}. + * + * @author Romain Vimont (®om) + * + */ +public class WalkieTalkieRecipient implements Parcelable { + + private MeshSocketAddress addr; + + public WalkieTalkieRecipient(MeshSocketAddress addr) { + this.addr = addr; + } + + public MeshSocketAddress getAddr() { + return addr; + } + + public static WalkieTalkieRecipient[] wrap(MeshSocketAddress... addr) { + int len = addr.length; + WalkieTalkieRecipient[] recipients = new WalkieTalkieRecipient[len]; + for (int i = 0; i < len; i++) { + recipients[i] = new WalkieTalkieRecipient(addr[i]); + } + return recipients; + } + + public static MeshSocketAddress[] unwrap(T... recipients) { + int len = recipients.length; + MeshSocketAddress[] addr = new MeshSocketAddress[len]; + for (int i = 0; i < len; i++) { + addr[i] = ((WalkieTalkieRecipient) recipients[i]).getAddr(); + } + return addr; + } + + @Override + public int describeContents() { + return 0; + } + + @Override + public void writeToParcel(Parcel dest, int flags) { + dest.writeByteArray(addr.getSid().binary); + dest.writeInt(addr.getPort()); + } + + public static final Parcelable.Creator CREATOR = new Parcelable.Creator() { + + @Override + public WalkieTalkieRecipient createFromParcel(Parcel source) { + try { + byte[] rawSid = source.createByteArray(); + SubscriberId sid = new SubscriberId(rawSid); + int port = source.readInt(); + MeshSocketAddress addr = new MeshSocketAddress(sid, port); + return new WalkieTalkieRecipient(addr); + } catch (InvalidBinaryException e) { + throw new RuntimeException(e); + } + + } + + @Override + public WalkieTalkieRecipient[] newArray(int size) { + return new WalkieTalkieRecipient[size]; + } + + }; + +} diff --git a/src/org/servalproject/walkietalkie/WalkieTalkieService.java b/src/org/servalproject/walkietalkie/WalkieTalkieService.java new file mode 100644 index 00000000..bd95f2c9 --- /dev/null +++ b/src/org/servalproject/walkietalkie/WalkieTalkieService.java @@ -0,0 +1,207 @@ +package org.servalproject.walkietalkie; + +import org.servalproject.ServalBatPhoneApplication; +import org.servalproject.ServalBatPhoneApplication.State; +import org.servalproject.servald.mdp.MeshSocketAddress; + +import android.app.Service; +import android.content.BroadcastReceiver; +import android.content.Context; +import android.content.Intent; +import android.content.IntentFilter; +import android.os.IBinder; +import android.os.Parcelable; + +/** + * Walkie-talkie Android service. + * + * @author Romain Vimont (®om) + * + */ +public class WalkieTalkieService extends Service { + + /** + * Start speaking action. + * + * Parameters: + *
    + *
  • {@link EXTRA_RECIPIENTS}
  • + *
+ */ + public static final String ACTION_START_SPEAKING = "org.servalproject.walkietalkie.START_SPEAKING"; + + /** Stop speaking action. */ + public static final String ACTION_STOP_SPEAKING = "org.servalproject.walkietalkie.STOP_SPEAKING"; + + /** Start listening action. */ + public static final String ACTION_START_LISTENING = "org.servalproject.walkietalkie.START_LISTENING"; + + /** Stop listening action. */ + public static final String ACTION_STOP_LISTENING = "org.servalproject.walkietalkie.STOP_LISTENING"; + + /** Value is {@link WalkieTalkieRecipient[]}. */ + public static final String EXTRA_RECIPIENTS = "recipients"; + + /** Mesh socket server port for walkie-talkie communication. */ + public static final int WALKIE_TALKIE_SERVER_PORT = 4444; + + /** Mesh socket client port for walkie-talkie communication. */ + public static final int WALKIE_TALKIE_CLIENT_PORT = 5555; + + private AudioSender sender; + private AudioReceiver receiver; + + private boolean state; + + private boolean listeningAsked; + private boolean speakingAsked; + private MeshSocketAddress[] recipients = {}; + + private BroadcastReceiver stateChangeReceiver = new BroadcastReceiver() { + + @Override + public void onReceive(Context context, Intent intent) { + final int stateOrd = intent.getIntExtra(ServalBatPhoneApplication.EXTRA_STATE, 0); + updateState(State.values()[stateOrd]); + } + }; + + @Override + public void onCreate() { + super.onCreate(); + + sender = new AudioSender(WALKIE_TALKIE_CLIENT_PORT); + receiver = new AudioReceiver(WALKIE_TALKIE_SERVER_PORT); + + /* read the current state */ + updateState(getApplicationContext().getState()); + /* listen servald start/stop */ + registerReceiver(stateChangeReceiver, new IntentFilter( + ServalBatPhoneApplication.ACTION_STATE)); + } + + @Override + public void onDestroy() { + super.onDestroy(); + /* do not listen servald start/stop anymore */ + unregisterReceiver(stateChangeReceiver); + /* stop everything */ + stopSpeaking(); + stopListening(); + } + + @Override + public int onStartCommand(Intent intent, int flags, int startId) { + if (intent != null) { + String action = intent.getAction(); + if (ACTION_START_SPEAKING.equals(action)) { + /* extract recipient list */ + Parcelable[] recipients = intent.getParcelableArrayExtra(EXTRA_RECIPIENTS); + /* unwrap socket addresses */ + MeshSocketAddress[] addrList = WalkieTalkieRecipient.unwrap(recipients); + /* start speaking to the addresses when possible */ + startSpeakingWhenPossible(addrList); + } else if (ACTION_STOP_SPEAKING.equals(action)) { + stopSpeaking(); + } else if (ACTION_START_LISTENING.equals(action)) { + startListeningWhenPossible(); + } else if (ACTION_STOP_LISTENING.equals(action)) { + stopListening(); + } + return Service.START_STICKY; + } + return super.onStartCommand(intent, flags, startId); + } + + private void startSpeakingWhenPossible(MeshSocketAddress... recipients) { + if (state) { + startSpeaking(recipients); + } else { + speakingAsked = true; + } + this.recipients = recipients; + } + + private void startListeningWhenPossible() { + if (state) { + startListening(); + } else { + listeningAsked = true; + } + } + + private void startSpeaking(MeshSocketAddress... recipients) { + sender.start(recipients); + } + + private void stopSpeaking() { + if (sender != null) { + sender.stop(); + } + speakingAsked = false; + } + + private void startListening() { + receiver.start(); + } + + private void stopListening() { + if (receiver != null) { + receiver.stop(); + } + listeningAsked = false; + } + + private void updateState(State state) { + boolean newState = state == State.On; + this.state = newState; + if (newState) { + /* execute pending actions */ + if (listeningAsked) { + startListening(); + } + if (speakingAsked) { + startSpeaking(recipients); + } + } else { + if (sender != null) { + sender.stop(); + } + if (receiver != null) { + receiver.stop(); + } + } + } + + @Override + public IBinder onBind(Intent intent) { + return null; + } + + @Override + public ServalBatPhoneApplication getApplicationContext() { + return (ServalBatPhoneApplication) super.getApplicationContext(); + } + + public static void startSpeaking(Context context, MeshSocketAddress... recipients) { + Intent intent = new Intent(ACTION_START_SPEAKING); + intent.putExtra(EXTRA_RECIPIENTS, WalkieTalkieRecipient.wrap(recipients)); + context.startService(intent); + } + + public static void stopSpeaking(Context context) { + Intent intent = new Intent(ACTION_STOP_SPEAKING); + context.startService(intent); + } + + public static void startListening(Context context) { + Intent intent = new Intent(ACTION_START_LISTENING); + context.startService(intent); + } + + public static void stopListening(Context context) { + Intent intent = new Intent(ACTION_STOP_LISTENING); + context.startService(intent); + } + +}