Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 179 additions & 165 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 17 additions & 6 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ members = [
"basic_room",
"mobile",
"save_to_disk",
"play_from_disk",
"wgpu_room",
"webhooks",
]
1 change: 1 addition & 0 deletions examples/play_from_disk/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.wav filter=lfs diff=lfs merge=lfs -text
13 changes: 13 additions & 0 deletions examples/play_from_disk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "play_from_disk"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["full"] }
livekit = { path = "../../livekit", version = "0.2.0" }
thiserror = "1.0.47"
log = "0.4.20"
env_logger = "0.10.0"
3 changes: 3 additions & 0 deletions examples/play_from_disk/change-sophie.wav
Git LFS file not shown
191 changes: 191 additions & 0 deletions examples/play_from_disk/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use livekit::{
options::TrackPublishOptions,
track::{LocalAudioTrack, LocalTrack, TrackSource},
webrtc::{
audio_source::native::NativeAudioSource,
prelude::{AudioFrame, AudioSourceOptions, RtcAudioSource},
},
Room, RoomOptions,
};
use std::{env, mem::size_of, sync::Arc, time::Duration};
use std::{error::Error, io};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};

#[derive(Debug, Error)]
pub enum WavError {
#[error("Invalid header: {0}")]
InvalidHeader(&'static str),
#[error("IO error: {0}")]
Io(#[from] io::Error),
}

pub struct WavReader<R: AsyncRead + Unpin> {
reader: R,
}

#[allow(dead_code)]
#[derive(Debug)]
pub struct WavHeader {
file_size: u32,
data_size: u32,
format: String,
format_length: u32,
format_type: u16,
num_channels: u16,
sample_rate: u32,
byte_rate: u32,
block_align: u16,
bits_per_sample: u16,
}

impl<R: AsyncRead + Unpin> WavReader<R> {
pub fn new(reader: R) -> Self {
Self { reader }
}

pub async fn read_header(&mut self) -> Result<WavHeader, WavError> {
let mut header = [0u8; 4];
let mut format = [0u8; 4];
let mut chunk_marker = [0u8; 4];
let mut data_chunk = [0u8; 4];

self.reader.read_exact(&mut header).await?;

if &header != b"RIFF" {
return Err(WavError::InvalidHeader("Invalid RIFF header"));
}

let file_size = self.reader.read_u32_le().await?;
self.reader.read_exact(&mut format).await?;

if &format != b"WAVE" {
return Err(WavError::InvalidHeader("Invalid WAVE header"));
}

self.reader.read_exact(&mut chunk_marker).await?;

if &chunk_marker != b"fmt " {
return Err(WavError::InvalidHeader("Invalid fmt chunk"));
}

let format_length = self.reader.read_u32_le().await?;
let format_type = self.reader.read_u16_le().await?;
let num_channels = self.reader.read_u16_le().await?;
let sample_rate = self.reader.read_u32_le().await?;
let byte_rate = self.reader.read_u32_le().await?;
let block_align = self.reader.read_u16_le().await?;
let bits_per_sample = self.reader.read_u16_le().await?;
self.reader.read_exact(&mut data_chunk).await?;
let data_size = self.reader.read_u32_le().await?;

if &data_chunk != b"data" {
return Err(WavError::InvalidHeader("Invalid data chunk"));
}

Ok(WavHeader {
file_size,
data_size,
format: String::from_utf8_lossy(&format).to_string(),
format_length,
format_type,
num_channels,
sample_rate,
byte_rate,
block_align,
bits_per_sample,
})
}

pub async fn read_i16(&mut self) -> Result<i16, WavError> {
let i = self.reader.read_i16_le().await?;
Ok(i)
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();

let url = env::var("LIVEKIT_URL").expect("LIVEKIT_URL is not set");
let token = env::var("LIVEKIT_TOKEN").expect("LIVEKIT_TOKEN is not set");

let file = tokio::fs::File::open("change-sophie.wav").await?;
let mut reader = WavReader::new(BufReader::new(file));
let header = reader.read_header().await?;
log::debug!("{:?}", header);

if header.bits_per_sample != 16 {
return Err("only 16-bit samples supported for this demo".into());
}

let (room, mut rx) = Room::connect(&url, &token, RoomOptions::default())
.await
.unwrap();
let room = Arc::new(room);
log::info!("Connected to room: {} - {}", room.name(), room.sid());

let source = NativeAudioSource::new(
AudioSourceOptions::default(),
header.sample_rate,
header.num_channels as u32,
);

let track = LocalAudioTrack::create_audio_track("file", RtcAudioSource::Native(source.clone()));

room.local_participant()
.publish_track(
LocalTrack::Audio(track),
TrackPublishOptions {
source: TrackSource::Microphone,
..Default::default()
},
)
.await?;

// Play the wav file and disconnect
tokio::spawn({
let room = room.clone();
async move {
const FRAME_DURATION: Duration = Duration::from_millis(1000); // Write 1s of audio at a time

let max_samples = header.data_size as usize / size_of::<i16>();
let ms = FRAME_DURATION.as_millis() as u32;
let num_samples = (header.sample_rate / 1000 * ms) as usize;

log::info!("sample_rate: {}", header.sample_rate);
log::info!("num_channels: {}", header.num_channels);
log::info!("max samples: {}", max_samples);
log::info!("chunk size: {}ms - {} samples", ms, num_samples);

let mut written_samples = 0;
while written_samples < max_samples {
Comment thread
theomonnom marked this conversation as resolved.
let available_samples = max_samples - written_samples;
let frame_size = num_samples.min(available_samples);

let mut audio_frame = AudioFrame {
data: vec![0i16; frame_size].into(),
num_channels: header.num_channels as u32,
sample_rate: header.sample_rate,
samples_per_channel: (frame_size / header.num_channels as usize) as u32,
};

for i in 0..frame_size {
let sample = reader.read_i16().await.unwrap();
audio_frame.data.to_mut()[i] = sample;
}

source.capture_frame(&audio_frame).await.unwrap();
written_samples += frame_size;
}

room.close().await.unwrap();
}
});

while let Some(msg) = rx.recv().await {
log::info!("Event: {:?}", msg);
}

Ok(())
}
15 changes: 7 additions & 8 deletions examples/wgpu_room/src/logo_track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct FrameData {
image: Arc<RgbaImage>,
framebuffer: Arc<Mutex<Vec<u8>>>,
video_frame: Arc<Mutex<VideoFrame<I420Buffer>>>,
pos: (u32, u32),
pos: (i32, i32),
direction: (i32, i32),
}

Expand Down Expand Up @@ -119,7 +119,7 @@ impl LogoTrack {

let mut data = FrameData {
image: Arc::new(image),
framebuffer: Arc::new(Mutex::new(vec![0u8; (FB_WIDTH * FB_HEIGHT * 4) as usize])),
framebuffer: Arc::new(Mutex::new(vec![0u8; FB_WIDTH * FB_HEIGHT * 4])),
video_frame: Arc::new(Mutex::new(VideoFrame {
rotation: VideoRotation::VideoRotation0,
buffer: I420Buffer::new(FB_WIDTH as u32, FB_HEIGHT as u32),
Expand All @@ -137,16 +137,16 @@ impl LogoTrack {
_ = interval.tick() => {}
}

data.pos.0 = (data.pos.0 as i32 + data.direction.0 * MOVE_SPEED) as u32;
data.pos.1 = (data.pos.1 as i32 + data.direction.1 * MOVE_SPEED) as u32;
data.pos.0 += data.direction.0 * MOVE_SPEED;
data.pos.1 += data.direction.1 * MOVE_SPEED;

if data.pos.0 >= (FB_WIDTH - data.image.width() as usize) as u32 {
if data.pos.0 >= (FB_WIDTH - data.image.width() as usize) as i32 {
data.direction.0 = -1;
} else if data.pos.0 <= 0 {
data.direction.0 = 1;
}

if data.pos.1 >= (FB_HEIGHT - data.image.height() as usize) as u32 {
if data.pos.1 >= (FB_HEIGHT - data.image.height() as usize) as i32 {
data.direction.1 = -1;
} else if data.pos.1 <= 0 {
data.direction.1 = 1;
Expand Down Expand Up @@ -189,8 +189,7 @@ impl LogoTrack {
stride_v,
FB_WIDTH as i32,
FB_HEIGHT as i32,
)
.unwrap();
);

source.capture_frame(&*video_frame);
}
Expand Down
7 changes: 5 additions & 2 deletions examples/wgpu_room/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{logo_track::LogoTrack, sine_track::SineTrack};
use crate::{
logo_track::LogoTrack,
sine_track::{SineParameters, SineTrack},
};
use livekit::{prelude::*, SimulateScenario};
use parking_lot::Mutex;
use std::sync::Arc;
Expand Down Expand Up @@ -120,7 +123,7 @@ async fn service_task(inner: Arc<ServiceInner>, mut cmd_rx: mpsc::UnboundedRecei
running_state = Some(RunningState {
room: new_room.clone(),
logo_track: LogoTrack::new(new_room.clone()),
sine_track: SineTrack::new(new_room.clone()),
sine_track: SineTrack::new(new_room.clone(), SineParameters::default()),
});

// Allow direct access to the room from the UI (Used for sync access)
Expand Down
Loading