Skip to content

Commit

Permalink
add s3 stream support
Browse files Browse the repository at this point in the history
  • Loading branch information
wanglei.w committed Apr 15, 2021
1 parent bb39562 commit 180149a
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 8 deletions.
25 changes: 22 additions & 3 deletions examples/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@ package examples
import (
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/disintegration/imaging"
"github.com/stretchr/testify/assert"
ffmpeg "github.com/u2takey/ffmpeg-go"
)

//
// More simple examples please refer to ffmpeg_test.go
//

func TestExampleStream(t *testing.T) {
ExampleStream("./sample_data/in1.mp4", "./sample_data/out1.mp4", false)
}
Expand All @@ -26,7 +34,18 @@ func TestExampleShowProgress(t *testing.T) {
ExampleShowProgress("./sample_data/in1.mp4", "./sample_data/out2.mp4")
}

func TestExampleOpenCvFaceDetect(t *testing.T) {
ExampleFaceDetection("./sample_data/head-pose-face-detection-male-short.mp4",
"./sample_data/haarcascade_frontalface_default.xml")
func TestSimpleS3StreamExample(t *testing.T) {
err := ffmpeg.Input("./sample_data/in1.mp4", nil).
Output("s3://data-1251825869/test_out.ts", ffmpeg.KwArgs{
"aws_config": &aws.Config{
Credentials: credentials.NewStaticCredentials("xx", "yyy", ""),
//Endpoint: aws.String("xx"),
Region: aws.String("yyy"),
},
// outputS3 use stream output, so you can only use supported format
// if you want mp4 format for example, you can output it to a file, and then call s3 sdk to do upload
"format": "mpegts",
}).
Run()
assert.Nil(t, err)
}
7 changes: 7 additions & 0 deletions examples/opencv.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build gocv

package examples

import (
Expand All @@ -11,6 +13,11 @@ import (
"gocv.io/x/gocv"
)

func TestExampleOpenCvFaceDetect(t *testing.T) {
ExampleFaceDetection("./sample_data/head-pose-face-detection-male-short.mp4",
"./sample_data/haarcascade_frontalface_default.xml")
}

func readProcess(infileName string, writer io.WriteCloser) <-chan error {
log.Println("Starting ffmpeg process1")
done := make(chan error)
Expand Down
51 changes: 50 additions & 1 deletion ffmpeg.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package ffmpeg_go

import (
"context"
"errors"
"io"
"log"
"os"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

// Input file URL (ffmpeg ``-i`` option)
Expand Down Expand Up @@ -86,7 +95,47 @@ func Output(streams []*Stream, fileName string, kwargs ...KwArgs) *Stream {

func (s *Stream) Output(fileName string, kwargs ...KwArgs) *Stream {
if s.Type != "FilterableStream" {
panic("cannot output on non-FilterableStream")
log.Panic("cannot output on non-FilterableStream")
}
if strings.HasPrefix(fileName, "s3://") {
return s.outputS3Stream(fileName, kwargs...)
}
return Output([]*Stream{s}, fileName, kwargs...)
}

func (s *Stream) outputS3Stream(fileName string, kwargs ...KwArgs) *Stream {
r, w := io.Pipe()
fileL := strings.SplitN(strings.TrimPrefix(fileName, "s3://"), "/", 2)
if len(fileL) != 2 {
log.Panic("s3 file format not valid")
}
args := MergeKwArgs(kwargs)
awsConfig := args.PopDefault("aws_config", &aws.Config{}).(*aws.Config)
bucket, key := fileL[0], fileL[1]
o := Output([]*Stream{s}, "pipe:", args).
WithOutput(w, os.Stdout)
done := make(chan struct{})
runHook := RunHook{
f: func() {
defer func() {
done <- struct{}{}
}()

sess, err := session.NewSession(awsConfig)
uploader := s3manager.NewUploader(sess)
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: &bucket,
Key: &key,
Body: r,
})
//fmt.Println(ioutil.ReadAll(r))
if err != nil {
log.Println("upload fail", err)
}
},
done: done,
closer: w,
}
o.Context = context.WithValue(o.Context, "run_hook", &runHook)
return o
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/u2takey/ffmpeg-go
go 1.14

require (
github.com/aws/aws-sdk-go v1.38.20
github.com/disintegration/imaging v1.6.2
github.com/stretchr/testify v1.4.0
github.com/tidwall/gjson v1.6.3
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/aws/aws-sdk-go v1.38.20 h1:QbzNx/tdfATbdKfubBpkt84OM6oBkxQZRw6+bW2GyeA=
github.com/aws/aws-sdk-go v1.38.20/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -9,6 +11,9 @@ github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand All @@ -17,6 +22,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/selinux v1.5.2/go.mod h1:yTcKuYAh6R95iDpefGLQaPaRwJFwyzAJufJyiTt7s0g=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
Expand All @@ -34,11 +40,19 @@ github.com/u2takey/go-utils v0.0.0-20200713025200-4704d09fc2c7 h1:PT7mE8HJE1mwaS
github.com/u2takey/go-utils v0.0.0-20200713025200-4704d09fc2c7/go.mod h1:ATqKFpgjUIlhGRs8j59gXmu8Cmpo1QQEHV6vwu1hs28=
gocv.io/x/gocv v0.25.0 h1:vM50jL3v9OEqWSi+urelX5M1ptZeFWA/VhGPvdTqsJU=
gocv.io/x/gocv v0.25.0/go.mod h1:Rar2PS6DV+T4FL+PM535EImD/h13hGVaHhnCu1xarBs=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 h1:hVwzHzIUGRjiF7EcUjqNxk3NCfkPxbDKRdnNE1Rpg0U=
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191115151921-52ab43148777/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
8 changes: 8 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ type Stream struct {
Context context.Context
}

type RunHook struct {
f func()
done <-chan struct{}
closer interface {
Close() error
}
}

func NewStream(node *Node, streamType string, label Label, selector Selector) *Stream {
return &Stream{
Node: node,
Expand Down
14 changes: 10 additions & 4 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,15 @@ func (s *Stream) Compile() *exec.Cmd {
}

func (s *Stream) Run() error {
err := s.Compile().Run()
if err != nil {
return err
if s.Context.Value("run_hook") != nil {
hook := s.Context.Value("run_hook").(*RunHook)
go hook.f()
defer func() {
if hook.closer != nil {
_ = hook.closer.Close()
}
<-hook.done
}()
}
return nil
return s.Compile().Run()
}
8 changes: 8 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ func (a KwArgs) GetDefault(k string, defaultV interface{}) interface{} {
return defaultV
}

func (a KwArgs) PopDefault(k string, defaultV interface{}) interface{} {
if v, ok := a[k]; ok {
defer delete(a, k)
return v
}
return defaultV
}

func ConvertKwargsToCmdLineArgs(kwargs KwArgs) []string {
var keys, args []string
for k := range kwargs {
Expand Down

0 comments on commit 180149a

Please sign in to comment.