Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is it possible to write to WebAssembly.Memory in PHP that is exported to and read in JavaScript in parallel? #121

Closed
guest271314 opened this issue Aug 16, 2020 · 12 comments
Assignees
Labels
❓ question Further information is requested
Projects

Comments

@guest271314
Copy link

Summary

Consider

<?php 
  if (isset($_GET["start"])) {
    header("Access-Control-Allow-Origin: *");
    header("Content-Type: application/octet-stream");
    echo passthru("parec -v --raw -d alsa_output.pci-0000_00_1b.0.analog-stereo.monitor");
    exit();
  }

where due to Chromium refusing to capture monitor devices when getUserMedia({audio: true}) is executed at Linux we use
php -S localhost:8000 to stream output of passthru() to Response.body (ReadableStream) from fetch() call.

let controller, signal;
const [start, stop] = document.querySelectorAll('button');
start.onclick = async e => {
  start.disabled = true;
  // set parameters as arrays for potential "infinite" input, output stream
  let { track, port } = await new AudioWorkletStream({
    latencyHint: 1,
    workletOptions: {
      numberOfInputs: 1,
      numberOfOutputs: 2,
      channelCount: 2,
      processorOptions: {
        codec: 'audio/wav',
        offset: 0,
      },
    },
  }).promise;

  console.log(track, port);
  controller = new AbortController();
  signal = controller.signal;
  fetch('http://localhost:8000?start=true', { signal })
    .then(r => r.body)
    .then(readable => {
      const reader = readable.getReader();
      reader
        .read()
        .then(function process({ value, done }) {
          if (done) {
            console.log(done);
            return;
          }
          console.log(value);
          return reader.read().then(process);
        })
        .catch(console.error);
    });
};
stop.onclick = e => {
  controller.abort();
};

Ideally we should be able to write the output from the shell script directly to a WebAssembly Memory instance, grow the shared array buffer if necessary, export that same Memory instance to JavaScript for the purpose of reading the shared memory in parallel to writing to and possible growing the single buffer, in this case, to stream audio in "real-time" using AudioWorklet, and finally flush the buffer once the stream closes.

Is this possible using the code in this repository?

Additional details

See https://github.com/WebAudio/web-audio-api-v2/issues/5#issuecomment-673831406

@guest271314 guest271314 added the ❓ question Further information is requested label Aug 16, 2020
@Hywan
Copy link
Contributor

Hywan commented Aug 17, 2020

Why closing the issue? It starts getting fun!

@guest271314 guest271314 reopened this Aug 17, 2020
@guest271314
Copy link
Author

@Hywan

Why closing the issue?

Some individuals and institutions do not prefer certain questions. Am not sure if writing to a single Memory instance in PHP or any other language actually solves the entire issue.

At the machine am currently testing, creating a WebAssembly.Memory() instance or SharedArrayBuffer or ArrayBuffer with bytes set to 512 * 344 * 60 * 60 (60 minutes of audio data) throws a memory allocation error, have to use 512 * 344 * 60 * 50 (50 minutes for maximum), unless create multiple Memory instances and try to potentially divide input Uint8Arrays across multiple buffers, the task becomes more complicated.

Achieved capturing data at localhost to a MediaStreamTrack. Now trying to perform signaling so that can create a WebRTC connection with localhost from any web page, similar to how am doing with dynamically setting permissions for Chromium app at https://github.com/guest271314/captureSystemAudio/blob/master/native_messaging/file_stream/app/set_externally_connectable.js.

The question still stands, is it possible to write directly from passthru() to WebAssembly.Memory and read that same memory, (that grows when necessary without throwing RangeErrors) in JavaScript?

@guest271314
Copy link
Author

@Hywan

It starts getting fun!

That is the scientific state of mind that have been anticipating by asking these questions while carrying on nonetheless without waiting for answers. Current working code, where fetch() adds the element of CORS due to using localhost to get the data with passthru()

<doctype html>
  <html>
    <body>
      <button>start</button><button>stop</button>
      <script>
        if (globalThis.gc) {
          gc();
        }

        let controller, signal;
        // throws memory allocation error, use multiple Memory instances
        // const memory = new WebAssembly.Memory({initial: 8062, maximum: 9675, shared: true});
        const [start, stop] = document.querySelectorAll('button');
        start.onclick = async e => {
          class AudioWorkletProcessor {}
          class AudioWorkletNativeFileStream extends AudioWorkletProcessor {
            constructor(options) {
              super(options);
              this.byteSize = 512 * 344 * 60 * 50;
              this.memory = new Uint8Array(this.byteSize);
              Object.assign(this, options.processorOptions);
              this.port.onmessage = this.appendBuffers.bind(this);
            }
            async appendBuffers({ data: readable }) {
              Object.assign(this, { readable });
              const source = {
                write: (value, c) => {
                  if (this.totalBytes < this.byteSize) {
                    this.memory.set(value, this.readOffset);
                    this.readOffset = this.readOffset + value.buffer.byteLength;
                    this.totalBytes = this.readOffset;
                  } else {
                    console.log(
                      value.buffer.byteLength,
                      this.readOffset,
                      this.totalBytes
                    );
                  }
                  if (this.totalBytes >= (344 * 512) / 2 && !this.started) {
                    this.started = true;
                    this.port.postMessage({ started: this.started });
                  }
                },
                close() {
                  console.log('stopped');
                },
              };
              try {
                await this.readable.pipeTo(new WritableStream(source));
              } catch (e) {
                console.warn(e);
                console.log(this.writeOffset, this.totalBytes);
                this.endOfStream();
              }
            }
            endOfStream() {
              this.port.postMessage({
                ended: true,
                currentTime,
                currentFrame,
                readOffset: this.readOffset,
                readIndex: this.readIndex,
                writeOffset: this.writeOffet,
                writeIndex: this.writeIndex,
                totalBytes: this.totalBytes,
              });
            }
            process(inputs, outputs) {
              const channels = outputs.flat();
              if (
                this.writeOffset >= this.totalBytes ||
                this.totalBytes === this.byteSize
              ) {
                console.log(this);
                this.endOfStream();
                return false;
              }

              const uint8 = new Uint8Array(512);
              try {
                for (let i = 0; i < 512; i++, this.writeOffset++) {
                  if (this.writeOffset === this.byteSize) {
                    break;
                  }
                  uint8[i] = this.memory[this.cursor];
                  // ++this.writeOffset;
                }
                const uint16 = new Uint16Array(uint8.buffer);
                // based on int16ToFloat32 function at https://stackoverflow.com/a/35248852
                for (let i = 0, j = 0, n = 1; i < uint16.length; i++) {
                  const int = uint16[i];
                  // If the high bit is on, then it is a negative number, and actually counts backwards.
                  const float =
                    int >= 0x8000 ? -(0x10000 - int) / 0x8000 : int / 0x7fff;
                  // interleave
                  channels[(n = ++n % 2)][!n ? j++ : j - 1] = float;
                }
              } catch (e) {
                console.error(e);
              }

              return true;
            }
          }
          // register processor in AudioWorkletGlobalScope
          function registerProcessor(name, processorCtor) {
            return `${processorCtor};\nregisterProcessor('${name}', ${processorCtor.name});`;
          }
          const worklet = URL.createObjectURL(
            new Blob(
              [
                registerProcessor(
                  'audio-worklet-native-file-stream',
                  AudioWorkletNativeFileStream
                ),
              ],
              { type: 'text/javascript' }
            )
          );
          const ac = new AudioContext({
            latencyHint: 1,
            sampleRate: 44100,
            numberOfChannels: 2,
          });
          ac.onstatechange = e => console.log(ac.state);
          if (ac.state === 'running') {
            await ac.suspend();
          }
          await ac.audioWorklet.addModule(worklet);
          const aw = new AudioWorkletNode(
            ac,
            'audio-worklet-native-file-stream',
            {
              numberOfInputs: 1,
              numberOfOutputs: 2,
              channelCount: 2,
              processorOptions: {
                totalBytes: 0,
                cursor: 0,
                readIndex: 0,
                writeIndex: 0,
                readOffset: 0,
                writeOffset: 0,
                done: false,
                ended: false,
                started: false,
              },
            }
          );

          aw.onprocessorerror = e => {
            console.error(e);
            console.trace();
          };
          // comment MediaStream, MediaStreamTrack creation
          const msd = new MediaStreamAudioDestinationNode(ac);
          const { stream } = msd;
          const [track] = stream.getAudioTracks();
          aw.connect(msd);
          const recorder = new MediaRecorder(stream);
          recorder.ondataavailable = e =>
            console.log(URL.createObjectURL(e.data));
          aw.port.onmessage = async e => {
            console.log(e.data);
            if (
              e.data.started &&
              ac.state === 'suspended' &&
              recorder.state === 'inactive'
            ) {
              recorder.start();
              await ac.resume();
            } else if (recorder.state === 'recording') {
              track.stop();
              recorder.stop();
              aw.disconnect();
              msd.disconnect();
              await ac.close();
              console.log(track);
              gc();
            }
          };

          try {
            start.disabled = true;

            controller = new AbortController();
            signal = controller.signal;
            const { body: readable } = await fetch(
              'http://localhost:8000?start=true',
              {
                cache: 'no-store',
                signal,
              }
            );
            aw.port.postMessage(readable, [readable]);
          } catch (e) {
            console.warn(e);
          } finally {
            console.log('stream stopped');
          }
        };
        stop.onclick = e => {
          controller.abort();
          start.disabled = false;
        };
      </script>
    </body>
  </html>

@guest271314
Copy link
Author

Ideally, we can create a Transferable Stream directly from PHP (Go, C++, R, bash, etc.) to JavaScript where readable at

await this.readable.pipeTo(new WritableStream(source));

is STDOUT directly from native application or shell script https://bugs.chromium.org/p/chromium/issues/detail?id=1115640, however that concept requires some work to implement correctly without trying to squeeze use cases into API's that were no intended for that purpose.

@Hywan
Copy link
Contributor

Hywan commented Aug 18, 2020

To start from the base, what you're trying to do is:

  • Generate audio bytes from PHP on the server side
  • Send the audio bytes to the client
  • Audio bytes are received by JS
  • JS sends audio bytes to Wasm memory bytes

Is that correct?

You've to be careful regarding the security of all of this, but it's funny. So you don't need this php-ext-wasm extension to do that. Wasm on the server side and on the client side are two different instances, they don't talk between each others. Also, for what I understand, you've no Wasm on the PHP (server) side.

Is that correct?

@Hywan Hywan self-assigned this Aug 18, 2020
@Hywan Hywan added this to 📬 Backlog in Kanban via automation Aug 18, 2020
@Hywan Hywan moved this from 📬 Backlog to 🌱 In progress in Kanban Aug 18, 2020
@guest271314
Copy link
Author

To start from the base, what you're trying to do is:

  • Generate audio bytes from PHP on the server side
  • Send the audio bytes to the client
  • Audio bytes are received by JS
  • JS sends audio bytes to Wasm memory bytes

Is that correct?

You've to be careful regarding the security of all of this, but it's funny. So you don't need this php-ext-wasm extension to do that. Wasm on the server side and on the client side are two different instances, they don't talk between each others. Also, for what I understand, you've no Wasm on the PHP (server) side.

Is that correct?

No.

  • PHP, or any other programming language that is capable of executing the necessary code to stream raw bytes of system audio output (or the given use case) and is capable of creating WASM memory instance
    • Creates such a Memory instance
    • Exports that WASM Memory instance to JavaScript from PHP context
    • When user decides, commences to write the data from STDOUT to the WASM Memory instance in PHP, or any other programming language context
    • The WASM Memory instance grows when necessary (in this case user might decide to stream 1 minute of system audio, or 2 hours of system audio, WASM Memory growth is based on dynamic requirements of the use case)
  • The bytes written to WASM memory in real-time in PHP (C, bash, R, APL, Jelly, Python, Pyth, etc.) while JavaScript is reading the written bytes

The link to the Chromium bug requesting NativeTransferableStream (see https://bugs.chromium.org/p/chromium/issues/detail?id=89483; https://docs.google.com/document/d/1_KuZzg5c3pncLJPFa8SuVm23AP4tft6mzPCL5at3I9M/edit for description of Transferable Streams) takes the above described process a step further describing a ReadableStream <=> WritableStream between JavaScript in the browser and any programming language or shell at the OS.

For example, consider


// this creates a dynamic shell
// when writable writes from JavaScript that is STDIN to the program or script
const {writable, readable} = new NativeTransferableStream(options);
// this is STDOUT from the native program or shell script
const reader = readable.getReader();
reader.read().then(function processSTDOUT({value, done}) {
  if (done) {return await read.closed;}
  // do stuff with value Uint8Array which is a subarray of WASM Memory instance
  // alternatively, there could be no backing at all, just streams of STDIN and STDOUT
  // where STDOUT could then by written to a WASM Memory instance  
  console.log(value); // Uint8Array(4) [116, 101, 115, 116]
  return reader.read().then(processSTDOUT);
}).catch(console.error);
// STDIN 
const writer = writable.getWriter();
writer.write(`php echo "test"`);

"Security" is not an issue. Whatever code is being executed in the shell can only be performed with user permissions, similar to how Native Messaging and Native File System operate right now.

@guest271314
Copy link
Author

The above is description of the full context of what am trying to achieve. For this issue am trying to determine if it is even possible to create a WASM Memory instance in a given programming language outside of a JavaScript browsing context, export that single WASM Memory instance to JavaScript context, write to that shared memory in the native application or shell script, for example, raw audio bytes, grow the WASM Memory when necessary dynamically due to content length of binary data sent to STDOUT which is actually the WASM Memory, not a terminal, read that WASM Memory in JavaScript while the write is occurring.

There are two possible paths

  • Write data to single WASM Memory
  • Just stream the data without writing anything to WASM Memory, essentially a MessageChannel between native program or shell script and JavaScript in the browser that is a binary data stream from the program to JavaScript, and since no memory is stored, the pattern is truly a pass-through of raw data

@guest271314
Copy link
Author

Am already able to achieve

  • Audio bytes are received by JS
  • JS sends audio bytes to Wasm memory bytes

within the limitations of WASM Memory and JavaScript implementations, at the code at #121 (comment) by substituting, for example (3000 is arbitrary below, find the number that does not throw memory allocation error)

this.memory = new WebAssembly.Memory({initial: 3000, maximum: 3000, shared: true});

for

this.memory = new Uint8Array(this.byteSize);

though it is difficult to synchronize the write and read when AudioWorkletProcessor.process() executes between 344-384 times per second, and we need at least 512 bytes per process() call (which is synchronous) to convert raw PCM to 2 channels Float32Arrays from Uint8Arrays, hence we store at least 1/2 second of data

if (this.totalBytes >= (344 * 512) / 2 && !this.started) {
  this.started = true;
  this.port.postMessage({ started: this.started });
}

before starting audio output at JavaScript to keep the write ahead of the read.

Asked similar question at WASI repository WebAssembly/WASI#307 where it appears interaction of WASI with JavaScript (in the browser; web platform) is off-topic.

@guest271314
Copy link
Author

Re "Security" all of the native applications or shell scripts that will be used in the procedure can be declared beforehand. All of the code that is to be used can be stored in a single directory which user grants read/write permissions for, similar to using Native File System, see https://github.com/guest271314/captureSystemAudio/blob/master/native_messaging/file_stream/app/captureSystemAudio.js for a version of the code that uses Native Messaging and Native File System to stream to one or more files and read the files instead of STDOUT directly

    // select app/data folder in extension directory
    const dir = await showDirectoryPicker();
    // set mode to readwrite for app/data directory
    const status = await dir.requestPermission({ mode: 'readwrite' });

@guest271314
Copy link
Author

Put together a minimal, verifiable, complete working example of streaming from fetch() to AudioWorkletProcessor via Native Messaging, including the observable restriction which lead to filing this issue, and those issues referenced above, specifically, RangeError being thrown, and limitations on the amount of memory that can be pre-allocated at an ArrayBuffer or SharedArrayBuffer.

Posting the code here as not infrequently crash the browser and, or the OS in the midst of ongoing experiments.

Flow chart

  • Create Native Messaging Chromium app (app)
  • At an arbitrary web page ynamically set permissions for the app using Native File System to re-write manifest.json in app/ folder by setting the property externally_connectable value
  • Connect to the backgound script, background.js, that passes messages between the Native Messaging host (in this case we use bash and execute PHP within the shell script) and the arbitrary web page
  • We do not use the Native Messaging protocol to pass STDOUT to background.js then to the web page, instead we begin PHP built-in server process, localhost, to handle the subsequent call to fetch() with AbortController set within object passed as send parameter where we read STDOUT as a ReadableStream, converting to and from JSON and Uint8Array
  • Create a MediaStreamAudioDestinationNode for the ability to record, perform other tasks with MediaStream, and MediaStreamTrack of live audio stream of system audio capture
  • Use Transferable Streams to post response.body (ReadableStream) to AudioWorkletProcessor, where we pipe the stream to WritableStream and write Uint8Arrays to a single Uint8Array pre-allocated to a set amount (344 * 512 * 60 * 60 throws cannot allocate memory, here)
  • Store minimal data before process() is started by resuming AudioContext
  • Pass message to background.js to kill PHP built-in server process
  • Close ReadableStreamDefaultController

Chromium Native Messaging extenstion.

app/

manifest.json

{
  // Extension ID: <id>
  "key": "<key>",
  "name": "native_messaging_stream",
  "version": "1.0",
  "manifest_version": 2,
  "description": "Capture system audio",
  "icons": {},
  "permissions": [
    "nativeMessaging", "*://localhost/"
  ],
  "app": {
    "background": {
      "scripts": [
        "background.js"
      ],
      "persistent": false
    }
  },

  "externally_connectable": {
    "matches": [
      "https://developer.mozilla.org/*",
      "http://localhost/*"
    ],
    "ids": [
      "lmkllhfhgnmbcmhdagfhejkpicehnida"
    ]
  },
  "author": "guest271314"
}

background.js

const id = 'native_messaging_stream';
let externalPort, controller, signal;

chrome.runtime.onConnectExternal.addListener(port => {
  console.log(port);
  externalPort = port;
  externalPort.onMessage.addListener(message => {
    if (message === 'start') {
      chrome.runtime.sendNativeMessage(id, {}, async _ => {
        console.log(_);
        if (chrome.runtime.lastError) {
          console.warn(chrome.runtime.lastError.message);
        }
        controller = new AbortController();
        signal = controller.signal;
        // wait until bash script completes, server starts
        for await (const _ of (async function* stream() {
          while (true) {
            try {
              if ((await fetch('http://localhost:8000', { method: 'HEAD' })).ok)
                break;
            } catch (e) {
              console.warn(e.message);
              yield;
            }
          }
        })());
        try {
          const response = await fetch('http://localhost:8000?start=true', {
            cache: 'no-store',
            mode: 'cors',
            method: 'get',
            signal
          });
          console.log(...response.headers);
          const readable = response.body;
          readable
            .pipeTo(
              new WritableStream({
                write: async value => {
                  // value is a Uint8Array, postMessage() here only supports cloning, not transfer
                  externalPort.postMessage(JSON.stringify(value));
                },
              })
            )
            .catch(err => {
              console.warn(err);
              externalPort.postMessage('done');
            });
        } catch (err) {
          console.error(err);
        }
      });
    }
    if (message === 'stop') {
      controller.abort();
      chrome.runtime.sendNativeMessage(id, {}, _ => {
        if (chrome.runtime.lastError) {
          console.warn(chrome.runtime.lastError.message);
        }
        console.log('everything should be done');
      });
    }
  });
});

set_externally_connectable.js

(async(set_externally_connectable = ["https://example.com/*"], unset_externally_connectable = true) => {
  const dir = await self.showDirectoryPicker();
  const status = await dir.requestPermission({writable: true});
  const fileHandle = await dir.getFileHandle("manifest.json", {create: false});
  const file = await fileHandle.getFile();
  const manifest_text = await file.text();
  const match_extension_id = /\/\/ Extension ID: \w{32}/;
  const [extension_id] = manifest_text.match(match_extension_id);
  let text = manifest_text.replace(match_extension_id, `"_": 0,`);
  const manifest_json = JSON.parse(text);
  manifest_json.externally_connectable.matches = unset_externally_connectable ? set_externally_connectable :
    [...manifest_json.externally_connectable.matches, ...set_externally_connectable];
  const writer = await fileHandle.createWritable({keepExistingData:false});
  await writer.write(JSON.stringify(manifest_json, null, 2).replace(/"_": 0,/, extension_id)); 
  return await writer.close();
})([`${location.origin}/*`]);

host/

native_messaging_stream.json ($ cp native_messaging_stream.json ~/.config/chromium/NativeMessagingHosts)

{
  "name": "native_messaging_file_stream",
  "description": "Capture system audio",
  "path": "/path/to/host/captureSystemAudio.sh",
  "type": "stdio",
  "allowed_origins": [
    "chrome-extension://<id>/"
  ],
  "author": "guest271314"
}

index.php

<?php 
  if($_SERVER['REQUEST_METHOD'] == 'HEAD') {
    header('Vary: Origin');
    header("Access-Control-Allow-Origin: chrome-extension://lmkllhfhgnmbcmhdagfhejkpicehnida");
    header("Access-Control-Allow-Methods: GET, OPTIONS, HEADERS");
    header("Access-Control-Allow-Headers: Content-Type, Access-Control-Allow-Headers");    
    header("X-Powered-By:");
    header("HTTP/1.1 200 OK");
    die();
  }
  if (isset($_GET["start"])) {
    header('Vary: Origin');
    header("Access-Control-Allow-Origin: chrome-extension://lmkllhfhgnmbcmhdagfhejkpicehnida");
    header("Access-Control-Allow-Methods: GET, OPTIONS, HEAD");
    header("Access-Control-Allow-Headers: Content-Type, Access-Control-Allow-Headers");    
    header("Content-Type: text/plain");
    header("X-Powered-By:");
    echo passthru("parec -v --raw -d alsa_output.pci-0000_00_1b.0.analog-stereo.monitor");
    exit();
  }

captureSystemAudio.sh

#!/bin/bash
sendMessage() {
    # https://stackoverflow.com/a/24777120
    message='{"message": "ok"}'
    # Calculate the byte size of the string.
    # NOTE: This assumes that byte length is identical to the string length!
    # Do not use multibyte (unicode) characters, escape them instead, e.g.
    # message='"Some unicode character:\u1234"'
    messagelen=${#message}
    # Convert to an integer in native byte order.
    # If you see an error message in Chrome's stdout with
    # "Native Messaging host tried sending a message that is ... bytes long.",
    # then just swap the order, i.e. messagelen1 <-> messagelen4 and
    # messagelen2 <-> messagelen3
    messagelen1=$(( ($messagelen      ) & 0xFF ))               
    messagelen2=$(( ($messagelen >>  8) & 0xFF ))               
    messagelen3=$(( ($messagelen >> 16) & 0xFF ))               
    messagelen4=$(( ($messagelen >> 24) & 0xFF ))               
    # Print the message byte length followed by the actual message.
    printf "$(printf '\\x%x\\x%x\\x%x\\x%x' \
        $messagelen1 $messagelpen2 $messagelen3 $messagelen4)%s" "$message"
}

captureSystemAudio() {
  if pgrep -f php > /dev/null; then
    killall -9 php & sendMessage  
  else
    php -S localhost:8000 -t /path/to/host/ & sendMessage
  fi
}
captureSystemAudio

at the arbitrary web page, in this case, MDN, where we previously set the matching URL pattern at externally_connectable for permission to communicate with the Chromium Native Messaging application

At web page

var id = 'lmkllhfhgnmbcmhdagfhejkpicehnida';
var port = chrome.runtime.connect(id);
var controller;
var readable = new ReadableStream({
  start(c) {
    return (controller = c);
  },
});
var init = false;
port.onMessage.addListener(async message => {
  if (message !== 'done') {
    if (!init) {
      init = true;
      class AudioWorkletProcessor {}
      class AudioWorkletNativeMessageStream extends AudioWorkletProcessor {
        constructor(options) {
          super(options);
          this.byteSize = 512 * 384 * 60 * 1; // 5 minutes of data
          this.memory = new Uint8Array(this.byteSize); // TODO: grow memory, dynamically
          Object.assign(this, options.processorOptions);
          this.port.onmessage = this.appendBuffers.bind(this);
        }
        async appendBuffers({ data: readable }) {
          Object.assign(this, { readable });
          const source = {
            write: (value, controller) => {
              console.log(globalThis.currentTime % 3);
              if (this.totalBytes + value.byteLength < this.byteSize) {
                this.memory.set(value, this.readOffset);
                this.readOffset = this.readOffset + value.buffer.byteLength;
                this.totalBytes = this.readOffset;
              } else {
                const last = value.subarray(0, this.byteSize - this.totalBytes);
                this.memory.set(last, this.readOffset);
                this.readOffset = this.readOffset + last.length;
                this.totalBytes = this.readOffset;
                console.log(
                  value.buffer.byteLength,
                  this.readOffset,
                  this.totalBytes
                );
                controller.close();

              }
              // await 250 milliseconds of audio data
              if (this.totalBytes >= (512 * 384) / 4 && !this.started) {
                this.started = true;
                this.port.postMessage({ started: this.started });
              }
            },
            close() {
              console.log('stopped');
            },
          };
          try {
            await this.readable.pipeTo(new WritableStream(source));
          } catch (e) {
            console.warn(e);
            console.log(this.writeOffset, this.totalBytes);
            this.endOfStream();
          }
        }
        endOfStream() {
          this.port.postMessage({
            ended: true,
            currentTime,
            currentFrame,
            readOffset: this.readOffset,
            writeOffset: this.writeOffet,
            totalBytes: this.totalBytes,
          });
        }
        process(inputs, outputs) {
          const channels = outputs.flat();
          if (
            this.writeOffset >= this.totalBytes ||
            this.totalBytes === this.byteSize
          ) {
            console.log(this);
            this.endOfStream();
            return false;
          }

          const uint8 = new Uint8Array(512);
          try {
            for (let i = 0; i < 512; i++, this.writeOffset++) {
              if (this.writeOffset === this.byteSize) {
                break;
              }
              uint8[i] = this.memory[this.writeOffset];
            }
            const uint16 = new Uint16Array(uint8.buffer);
            // https://stackoverflow.com/a/35248852
            for (let i = 0, j = 0, n = 1; i < uint16.length; i++) {
              const int = uint16[i];
              // If the high bit is on, then it is a negative number, and actually counts backwards.
              const float =
                int >= 0x8000 ? -(0x10000 - int) / 0x8000 : int / 0x7fff;
              // interleave
              channels[(n = ++n % 2)][!n ? j++ : j - 1] = float;
            }
            // console.log(channels[0]);
          } catch (e) {
            console.error(e);
          }

          return true;
        }
      }
      // register processor in AudioWorkletGlobalScope
      function registerProcessor(name, processorCtor) {
        return `${processorCtor};\nregisterProcessor('${name}', ${processorCtor.name});`;
      }
      const worklet = URL.createObjectURL(
        new Blob(
          [
            registerProcessor(
              'audio-worklet-native-message-stream',
              AudioWorkletNativeMessageStream
            ),
          ],
          { type: 'text/javascript' }
        )
      );
      const ac = new AudioContext({
        latencyHint: 1,
        sampleRate: 44100,
        numberOfChannels: 2,
      });
      ac.onstatechange = e => console.log(ac.state);
      if (ac.state === 'running') {
        await ac.suspend();
      }
      await ac.audioWorklet.addModule(worklet);
      const aw = new AudioWorkletNode(ac, 'audio-worklet-native-message-stream', {
        numberOfInputs: 1,
        numberOfOutputs: 2,
        channelCount: 2,
        processorOptions: {
          totalBytes: 0,
          readOffset: 0,
          writeOffset: 0,
          done: false,
          ended: false,
          started: false,
        },
      });

      aw.onprocessorerror = e => {
        console.error(e);
        console.trace();
      };

      const msd = new MediaStreamAudioDestinationNode(ac);
      const { stream } = msd;
      const [track] = stream.getAudioTracks();
      aw.connect(msd);
      const recorder = new MediaRecorder(stream);
      recorder.ondataavailable = e => console.log(URL.createObjectURL(e.data));
      aw.port.onmessage = async e => {
        console.log(e.data);
        if (
          e.data.started &&
          ac.state === 'suspended' &&
          recorder.state === 'inactive'
        ) {
          recorder.start();
          await ac.resume();
          setTimeout(_ => {
            port.postMessage('stop');
            controller.close();
          }, 1000 * 60 * 1);

        } else if (recorder.state === 'recording') {
          recorder.stop();
          track.stop();
          aw.disconnect();
          msd.disconnect();
          await ac.close();
          console.log(track);
          gc();
        }
      };

      aw.port.postMessage(readable, [readable]);

    }
    if (readable.locked)
      controller.enqueue(Uint8Array.from(Object.values(JSON.parse(message))));
  } else {
    if (readable.locked) controller.close();
  }
});

port.postMessage('start');

A version using WebAssembly.Memory.grow() to dynamically grow memory
during the stream. The expected result is 30 minutes of audio being written to
SharedArrayBuffer. An error is thrown at

Testing at a legacy 4.15.0-20-lowlatency 32-bit kernel attempted to grow memory to 30 minutes of data, with maximum set to 1 hour of data
An error was thrown when calling grow(1) when current offset is 177143808

before grow 177012736
after grow 177078272
before grow 177078272
after grow 177143808
before grow 177143808
e4056867-5209-4bea-8f76-a86106ab83af:75 RangeError: WebAssembly.Memory.grow(): Unable to grow instance memory.
    at Memory.grow (<anonymous>)
    at Object.write (e4056867-5209-4bea-8f76-a86106ab83af:26)
appendBuffers @ e4056867-5209-4bea-8f76-a86106ab83af:75
async function (async)
appendBuffers @ e4056867-5209-4bea-8f76-a86106ab83af:73
177111040 177139656
{ended: true, currentTime: 1004.0308390022676, currentFrame: 44277760, readOffset: 177139656, writeOffset: 177111040, …}
MediaStreamTrack {kind: "audio", id: "e0958602-f0ea-4565-9e34-f624afce7c12", label: "MediaStreamAudioDestinationNode", enabled: true, muted: false, …}
blob:https://developer.mozilla.org/f9d716ff-c65e-4ddf-b7c7-5e385d0602ec
closed

resulting in 16 minutes and 43 seconds of data being written to memory, observable at recorded audio

Screenshot_2020-08-23_21-16-22

Asked the question if 1GB is the maximum the underlying memory can grow at 32-bit architectures at https://bugs.chromium.org/p/v8/issues/detail?id=7881.

if (globalThis.gc) gc();
var id = 'lmkllhfhgnmbcmhdagfhejkpicehnida';
var port = chrome.runtime.connect(id);
var controller;
var readable = new ReadableStream({
  start(c) {
    return (controller = c);
  },
});
var init = false;
port.onMessage.addListener(async message => {
  if (message !== 'done') {
    if (!init) {
      init = true;
      class AudioWorkletProcessor {}
      class AudioWorkletNativeMessageStream extends AudioWorkletProcessor {
        constructor(options) {
          super(options);
          this.initial = (384 * 512) / 65536; // 1 minute
          this.maximum = (384 * 512 * 60 * 60) / 65536; // 1 hour
          this.byteSize = this.maximum * 65536;
          this.memory = new WebAssembly.Memory({
            initial: this.initial,
            maximum: this.maximum,
            shared: true,
          });
          Object.assign(this, options.processorOptions);
          this.port.onmessage = this.appendBuffers.bind(this);
        }
        async appendBuffers({ data: readable }) {
          Object.assign(this, { readable });
          
          const source = {
            write: (value, controller) => {
              if (
                this.totalBytes + value.byteLength >
                  this.memory.buffer.byteLength &&
                this.totalBytes + value.buffer.byteLength < this.byteSize
              ) {
                console.log('before grow', this.memory.buffer.byteLength);
                this.memory.grow(1);
                console.log('after grow', this.memory.buffer.byteLength);
              }

              const uint8_sab = new Uint8Array(this.memory.buffer, this.readOffset);
              // console.log(this.totalBytes, this.totalBytes % 65536, this.totalBytes / 65536, this.memory.buffer.byteLength);
              if (this.totalBytes + value.buffer.byteLength < this.byteSize) {
                

                for (
                  let i = 0;
                  i < value.buffer.byteLength;
                  i++, this.readOffset++
                ) {
                  uint8_sab[this.readOffset] = value[i];
                }

                this.totalBytes = this.readOffset;
              } else {
                const lastBytes = value.subarray(0, this.byteSize - this.totalBytes);
                // const uint8 = new Uint8Array(this.memory.buffer, this.readOffset);
                for (
                  let i = 0;
                  i < lastBytes.buffer.byteLength;
                  i++, this.readOffset++
                ) {
                  uint8_sab[this.readOffset] = value[i];
                }
                this.totalBytes = this.readOffset;
                console.log(
                  value.buffer.byteLength,
                  this.readOffset,
                  this.totalBytes
                );
                this.readable.cancel();
              }
              // accumulate 250 milliseconds of data before resuming AudioContext
              if (this.totalBytes >= (512 * 384) / 4 && !this.started) {
                this.started = true;
                this.port.postMessage({ started: this.started });
              }
            },
            close() {
              console.log('stream closed');
            },
          };
          try {
            await this.readable.pipeTo(new WritableStream(source));
          } catch (e) {
            console.warn(e);
            console.log(this.writeOffset, this.totalBytes);
            this.endOfStream();
          }
        }
        endOfStream() {
          this.port.postMessage({
            ended: true,
            currentTime,
            currentFrame,
            readOffset: this.readOffset,
            writeOffset: this.writeOffset,
            totalBytes: this.totalBytes,
          });
        }
        process(inputs, outputs) {
          const channels = outputs.flat();
          if (
            this.writeOffset >= this.totalBytes ||
            this.totalBytes === this.byteSize
          ) {
            console.log(this);
            this.endOfStream();
            return false;
          }

          const uint8 = new Uint8Array(512);
          const uint8_sab = new Uint8Array(this.memory.buffer, this.writeOffset); // .slice(this.writeOffset, this.writeOffset + 512)
          
          try {
            for (let i = 0; i < 512; i++, this.writeOffset++) {
              if (this.writeOffset === this.byteSize) {
                break;
              }
              uint8[i] = uint8_sab[this.writeOffset];
            }

            const uint16 = new Uint16Array(uint8.buffer);
            // https://stackoverflow.com/a/35248852
            for (let i = 0, j = 0, n = 1; i < uint16.length; i++) {
              const int = uint16[i];
              // If the high bit is on, then it is a negative number, and actually counts backwards.
              const float =
                int >= 0x8000 ? -(0x10000 - int) / 0x8000 : int / 0x7fff;
              // interleave
              channels[(n = ++n % 2)][!n ? j++ : j - 1] = float;
            }
            // console.log(channels[0]);
          } catch (e) {
            console.error(e);
          }

          return true;
        }
      }
      // register processor in AudioWorkletGlobalScope
      function registerProcessor(name, processorCtor) {
        return `${processorCtor};\nregisterProcessor('${name}', ${processorCtor.name});`;
      }
      const worklet = URL.createObjectURL(
        new Blob(
          [
            registerProcessor(
              'audio-worklet-native-message-stream',
              AudioWorkletNativeMessageStream
            ),
          ],
          { type: 'text/javascript' }
        )
      );
      const ac = new AudioContext({
        latencyHint: 1,
        sampleRate: 44100,
        numberOfChannels: 2,
      });
      ac.onstatechange = e => console.log(ac.state);
      if (ac.state === 'running') {
        await ac.suspend();
      }
      await ac.audioWorklet.addModule(worklet);
      const aw = new AudioWorkletNode(
        ac,
        'audio-worklet-native-message-stream',
        {
          numberOfInputs: 1,
          numberOfOutputs: 2,
          channelCount: 2,
          processorOptions: {
            totalBytes: 0,
            readOffset: 0,
            writeOffset: 0,
            done: false,
            ended: false,
            started: false,
          },
        }
      );

      aw.onprocessorerror = e => {
        console.error(e);
        console.trace();
      };

      const msd = new MediaStreamAudioDestinationNode(ac);
      const { stream } = msd;
      const [track] = stream.getAudioTracks();
      aw.connect(msd);
      const recorder = new MediaRecorder(stream);
      recorder.ondataavailable = e => console.log(URL.createObjectURL(e.data));
      aw.port.onmessage = async e => {
        console.log(e.data);
        if (
          e.data.started &&
          ac.state === 'suspended' &&
          recorder.state === 'inactive'
        ) {
          recorder.start();
          await ac.resume();
          setTimeout(_ => {
            port.postMessage('stop');
            controller.close();
          }, 1000 * 60 * 30);
        } else if (recorder.state === 'recording') {
          recorder.stop();
          track.stop();
          aw.disconnect();
          msd.disconnect();
          await ac.close();
          console.log(track);
          gc();
        }
      };

      aw.port.postMessage(readable, [readable]);
    }
    if (readable.locked)
      controller.enqueue(Uint8Array.from(Object.values(JSON.parse(message))));
  } else {
    if (readable.locked) controller.close();
  }
});

port.postMessage('start');

Ideally, in this case, we should be able to stream directly to AudioWorkletProcessor.process() instead of writing to memory, i.e., the gist of the proposal at Chromium: Use Streams API for STDIN and STDOUT, flushing memory as we proceed.

One approach to reduce memory usage would be to write Opus audio to memory and decode to raw PCM as Float32Array values at outputs channels, which have not yet tried.

@guest271314
Copy link
Author

Why closing the issue? It starts getting fun!

@Hywan

One issue with WebAssembly.Memory.grow() is that eventually an exception will be thrown when memory reaches implementation maximum.

I solved the use case using existing web platform technologies https://github.com/guest271314/NativeTransferableStreams.

Thanks!

Kanban automation moved this from 🌱 In progress to 🎉 Done May 24, 2021
@Hywan
Copy link
Contributor

Hywan commented May 25, 2021

:-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓ question Further information is requested
Projects
Kanban
  
🎉 Done
Development

No branches or pull requests

2 participants