Skip to content

Commit

Permalink
とりあえず、DummyでつくったFileChannelの読み込みで動作するようにしてみた。タイマーがひっかかる部分で微妙につっかえるみたい…
Browse files Browse the repository at this point in the history
…ですね。
  • Loading branch information
taktod committed Nov 23, 2012
1 parent a89e7e3 commit a429295
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 54 deletions.
115 changes: 115 additions & 0 deletions src/com/ttProject/flazr/FlvPacketReader.java
@@ -0,0 +1,115 @@
package com.ttProject.flazr;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import org.jboss.netty.buffer.ChannelBuffers;

import com.flazr.io.flv.FlvAtom;
import com.flazr.util.Utils;

/**
* byteBufferの中身を確認するReader
* @author taktod
*/
public class FlvPacketReader {
private boolean firstChecked = false;
private ByteBuffer buffer = null;
public List<FlvAtom> getPackets(ByteBuffer data) {
if(buffer != null) {
int position = buffer.position();
byte[] dat = new byte[11];
buffer.get(dat);
System.out.println(Utils.toHex(dat, true));
buffer.position(position);
System.out.println(buffer.remaining());
System.out.println(data.remaining());
int length = buffer.remaining() + data.remaining();
ByteBuffer newBuffer = ByteBuffer.allocate(length);
newBuffer.put(buffer);
buffer = newBuffer;
buffer.put(data);
buffer.flip();
System.out.println(buffer.remaining());
}
else {
buffer = data;
}
List<FlvAtom> result = new ArrayList<FlvAtom>();
while(buffer.remaining() > 0) {
FlvAtom flvAtom = analizeData(buffer);
if(flvAtom != null) {
result.add(flvAtom);
}
else {
break;
}
}
return result;
}
/**
* 内容を解析します
* @return
*/
private FlvAtom analizeData(ByteBuffer buffer) {
if(!firstChecked) {
System.out.println("check again");
if(buffer.remaining() < 13) {
// データがたりないので、次のデータ待ち
return null;
}
byte[] flvHeader = new byte[13];
buffer.get(flvHeader);
System.out.println(Utils.toHex(flvHeader, true));
// 内容を解析する。46 4C 56 01 05 00 00 00 09 00 00 00 00
if(flvHeader[0] == 0x46
&& flvHeader[1] == 0x4C
&& flvHeader[2] == 0x56
&& flvHeader[3] == 0x01
// && flvHeader[4] == 0x05
&& flvHeader[5] == 0x00
&& flvHeader[6] == 0x00
&& flvHeader[7] == 0x00
&& flvHeader[8] == 0x09
&& flvHeader[9] == 0x00
&& flvHeader[10] == 0x00
&& flvHeader[11] == 0x00
&& flvHeader[12] == 0x00) {
firstChecked = true;
}
else {
throw new RuntimeException("flvのヘッダ情報解析がうまくいきませんでした。");
}
}
if(buffer.remaining() < 11) {
return null;
}
// データの続きを解析する。
int position = buffer.position();
System.out.println(position);
// 11バイト読み込んでデータを調べ上げる。
byte[] packetHeader = new byte[11];
buffer.get(packetHeader);
System.out.println(Utils.toHex(packetHeader, true));
int size = getSizeFromHeader(packetHeader);
System.out.println(11+ size + 4);
if(buffer.remaining() < size + 4) {
// 読み込むのにはデータがたりなかった。
buffer.position(position);
byte[] dat = new byte[11];
buffer.get(dat);
System.out.println(Utils.toHex(dat, true));
buffer.position(position);
return null;
}
byte[] data = new byte[11 + size + 4];
buffer.position(position);
buffer.get(data);
System.out.println(buffer.position());
return new FlvAtom(ChannelBuffers.copiedBuffer(data));
}
private int getSizeFromHeader(byte[] header) {
return (((header[1] & 0xFF) << 16) + ((header[2] & 0xFF) << 8) + (header[3] & 0xFF));
}
}
5 changes: 3 additions & 2 deletions src/com/ttProject/flazr/RtmpClient.java
Expand Up @@ -12,8 +12,8 @@
import org.slf4j.LoggerFactory;

import com.flazr.rtmp.client.ClientOptions;
import com.flazr.rtmp.client.ClientPipelineFactory;
import com.flazr.util.Utils;
import com.ttProject.flazr.client.ClientPipelineFactoryEx;
import com.ttProject.flazr.io.flv.FlvLiveReader;

/**
Expand Down Expand Up @@ -74,7 +74,8 @@ private static ClientBootstrap getBootstrap(final Executor executor, final Clien
final ChannelFactory factory = new NioClientSocketChannelFactory(executor, executor);
final ClientBootstrap bootstrap = new ClientBootstrap(factory);
// clientPipelineFactoryをオーバーライドすることで、独自定義動作させます。
bootstrap.setPipelineFactory(new ClientPipelineFactoryEx(options));
// bootstrap.setPipelineFactory(new ClientPipelineFactoryEx(options));
bootstrap.setPipelineFactory(new ClientPipelineFactory(options));
bootstrap.setOption("tcpNoDelay" , true);
bootstrap.setOption("keepAlive", true);
return bootstrap;
Expand Down
Expand Up @@ -4,8 +4,8 @@

import com.flazr.io.BufferReader;

public class SydinChannelReader implements BufferReader {

public class StdinChannelReader implements BufferReader {
// はじめのデータがflvHeaderであることを確認する必要がある。
@Override
public long size() {
// TODO Auto-generated method stub
Expand Down
106 changes: 57 additions & 49 deletions src/com/ttProject/flazr/io/flv/FlvLiveReader.java
@@ -1,45 +1,66 @@
package com.ttProject.flazr.io.flv;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.flazr.io.BufferReader;
import com.flazr.io.FileChannelReader;
import com.flazr.io.flv.FlvAtom;
import com.flazr.io.flv.FlvReader;
import com.flazr.rtmp.RtmpMessage;
import com.flazr.rtmp.RtmpReader;
import com.flazr.rtmp.message.Aggregate;
import com.flazr.rtmp.message.MessageType;
import com.flazr.rtmp.message.Metadata;
import com.flazr.rtmp.message.MetadataAmf0;
import com.ttProject.flazr.FlvPacketReader;

public class FlvLiveReader implements RtmpReader {
private static final Logger logger = LoggerFactory.getLogger(FlvReader.class);
private final LinkedBlockingQueue<FlvAtom> dataQueue = new LinkedBlockingQueue<FlvAtom>();

private final BufferReader in;
// private final long mediaStartPosition;
private final Metadata metadata;
// private final BufferReader in;
private Metadata metadata;
private int aggregateDuration;
private final Thread stdinThread;

public FlvLiveReader(final String path) {
// FlvLiveReaderの作成の方がさきにくるので、metaデータはデフォルトとします。
// 標準入力を解析するThreadをつくっておく。
stdinThread = new Thread(new Runnable() {
@Override
public void run() {
try {
FlvPacketReader reader = new FlvPacketReader();
// ReadableByteChannel stdinChannel = Channels.newChannel(System.in);
// dummy
String targetFile = System.getProperty("user.home") + "/Sites/mario/mario.flv";
FileChannel stdinChannel = new FileInputStream(targetFile).getChannel();
// 実処理
// データを確認する。
while(true) {
// オブジェクトを変更しないと、getPacketsがうまく動作しないみたいです。
ByteBuffer buffer = ByteBuffer.allocate(65536);
if(stdinChannel.read(buffer) < 0) {
break;
}
buffer.flip();
List<FlvAtom> data = reader.getPackets(buffer);
dataQueue.addAll(data);
// System.out.println(data);
}
}
catch (Exception e) {
e.printStackTrace();
}
}
});
stdinThread.start();
// threadをつくって、stdinのデータを取り込みqueueにいれていく動作をつくっておく。
logger.info("FlvLiveReader");
in = new FileChannelReader(path);
in.position(13); // skip flv header
final RtmpMessage metadataAtom = next();
final RtmpMessage metadataTemp =
MessageType.decode(metadataAtom.getHeader(), metadataAtom.encode());
if(metadataTemp.getHeader().isMetadata()) {
metadata = (Metadata) metadataTemp;
// mediaStartPosition = in.position();
} else {
logger.warn("flv file does not start with 'onMetaData', using empty one");
metadata = new MetadataAmf0("onMetaData");
in.position(13);
// mediaStartPosition = 13;
}
metadata = new MetadataAmf0("onMetaData");
logger.debug("flv file metadata: {}", metadata);
}
@Override
Expand Down Expand Up @@ -86,45 +107,32 @@ public long seek(final long time) {
@Override
public boolean hasNext() {
logger.info("hasNext");
return in.position() < in.size();
return true; // 絶対にあることにしておく。
}


private static final int AGGREGATE_SIZE_LIMIT = 65536;

@Override
public RtmpMessage next() {
logger.info("next");
if(aggregateDuration <= 0) {
return new FlvAtom(in);
// aggregateDirationは0であることが見越されます。
// flvAtomを生成して、linkedBlockingQueueにいれておき、popして応答すればそれでいいと思う。
try {
return dataQueue.take();
}
catch (Exception e) {
logger.error("error", e);
throw new RuntimeException("takeに失敗しました。");
}
}
final ChannelBuffer out = ChannelBuffers.dynamicBuffer();
int firstAtomTime = -1;
while(hasNext()) {
final FlvAtom flvAtom = new FlvAtom(in);
final int currentAtomTime = flvAtom.getHeader().getTime();
if(firstAtomTime == -1) {
firstAtomTime = currentAtomTime;
}
final ChannelBuffer temp = flvAtom.write();
if(out.readableBytes() + temp.readableBytes() > AGGREGATE_SIZE_LIMIT) {
// この部分は必要になったら再度利用できるようにしなければいけないかも・・・
throw new RuntimeException("前のデータを読み直すことはとりあえず禁止しておく。");
// prev();
// break;
}
out.writeBytes(temp);
if(currentAtomTime - firstAtomTime > aggregateDuration) {
break;
}
else {
throw new RuntimeException("aggregateDurationが0よりおおきかった。");
}
return new Aggregate(firstAtomTime, out);
}

@Override
public void close() {
logger.info("close");
in.close();
stdinThread.interrupt();
}

}
4 changes: 3 additions & 1 deletion src/com/ttProject/flazr/rtmp/RtmpPublisherEx.java
Expand Up @@ -155,6 +155,7 @@ private void write(final Channel channel) {
}
} //====================================================================
if (message == null || playLength >= 0 && timePosition > (seekTime + playLength)) {
// ここで強制的に次のtic待ちにもっていく必要あり。
stop(channel);
return;
}
Expand All @@ -177,7 +178,8 @@ private void write(final Channel channel) {
header.setStreamId(streamId);
final ChannelFuture future = channel.write(message);
future.addListener(new ChannelFutureListener() {
@Override public void operationComplete(final ChannelFuture cf) {
@Override
public void operationComplete(final ChannelFuture cf) {
final long completedIn = System.currentTimeMillis() - writeTime;
if(completedIn > 2000) {
logger.warn("channel busy? time taken to write last message: {}", completedIn);
Expand Down

0 comments on commit a429295

Please sign in to comment.