Skip to content
This repository has been archived by the owner on Dec 1, 2023. It is now read-only.

Commit

Permalink
pull works, not well, but it works
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpitz committed Oct 7, 2021
1 parent b39efae commit 6d52d5e
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 65 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
/gen/
/vendor/
/docs/openapiv2/
/out/
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.5.0
github.com/minio/minio-go/v7 v7.0.14
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0 // indirect
github.com/rs/cors v1.8.0
github.com/stretchr/testify v1.7.0
Expand Down
11 changes: 11 additions & 0 deletions internal/commands/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package commands

import "os"

func lookupHost() string {
host := os.Getenv("AETHERFS_HOST")
if host != "" {
return host
}
return "localhost:8080"
}
147 changes: 140 additions & 7 deletions internal/commands/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,28 @@
package commands

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
"go.uber.org/zap"

blockv1 "github.com/mjpitz/aetherfs/api/aetherfs/block/v1"
datasetv1 "github.com/mjpitz/aetherfs/api/aetherfs/dataset/v1"
"github.com/mjpitz/aetherfs/internal/components"
"github.com/mjpitz/aetherfs/internal/flagset"
"github.com/mjpitz/aetherfs/internal/fs"
)

const (
filePermissions os.FileMode = 0644
dirPermissions os.FileMode = 0755
)

// PullConfig encapsulates all the configuration required to pull datasets from AetherFS.
Expand All @@ -38,20 +52,139 @@ func Pull() *cli.Command {
logger := ctxzap.Extract(ctx.Context)

args := ctx.Args().Slice()

if len(args) == 0 {
return fmt.Errorf("missing path where we should download datasets")
switch len(args) {
case 0:
return fmt.Errorf("missing required path")
case 1:
return fmt.Errorf("missing datasets")
}

path := args[0]
datasets := args[1:]
path, err := filepath.Abs(args[0])
if err != nil {
return err
}

if path == "" {
path, _ = os.Getwd()
_ = os.MkdirAll(path, dirPermissions)
{
info, err := os.Stat(path)
switch {
case err != nil:
return errors.Wrapf(err, "failed to make path")
case !info.IsDir():
return fmt.Errorf("path is not a directory")
}
}

aetherFSDir := filepath.Join(path, ".aetherfs")
_ = os.MkdirAll(aetherFSDir, dirPermissions)

{
info, err := os.Stat(aetherFSDir)
switch {
case err != nil:
return errors.Wrapf(err, "failed to make aetherfs dir")
case !info.IsDir():
return fmt.Errorf(".aetherfs is a file")
}
}

conn, err := components.GRPCClient(ctx.Context, components.GRPCClientConfig{
Target: lookupHost(),
})
if err != nil {
return err
}
defer conn.Close()

datasetAPI := datasetv1.NewDatasetAPIClient(conn)
blockAPI := blockv1.NewBlockAPIClient(conn)

tags := make([]*datasetv1.Tag, 0, len(datasets))
snapshots := make([]*datasetv1.LookupResponse, 0, len(datasets))

for _, dataset := range datasets {
logger.Info("pulling dataset", zap.String("name", dataset), zap.String("path", path))
parts := strings.Split(dataset, ":")
if len(parts) < 2 {
parts = append(parts, "latest")
}

req := &datasetv1.LookupRequest{
Tag: &datasetv1.Tag{
Name: parts[0],
Version: parts[1],
},
}

resp, err := datasetAPI.Lookup(ctx.Context, req)
if err != nil {
return err
}

tags = append(tags, req.Tag)
snapshots = append(snapshots, resp)
}

// save snapshots
for i, snapshot := range snapshots {
tag := tags[i]

metadataFile := tag.Name + "." + tag.Version + ".snapshot.afs.json"
metadataFile = filepath.Join(aetherFSDir, metadataFile)

datasetDir := tag.Name + "." + tag.Version
datasetDir = filepath.Join(path, datasetDir)

_, err := os.Stat(metadataFile)
if err == nil {
continue
}

// download files
// this could definitely be done in a more efficient way, but this is a good start

logger.Info("downloading dataset", zap.String("name", tag.Name), zap.String("tag", tag.Version))

_ = os.MkdirAll(datasetDir, dirPermissions)
for _, file := range snapshot.Dataset.Files {
filePath := filepath.Join(datasetDir, file.Name)
fileDir := filepath.Dir(filePath)

_ = os.MkdirAll(fileDir, dirPermissions)

logger.Info("downloading file", zap.String("file", file.Name))

datasetFile := &fs.DatasetFile{
Context: ctx.Context,
BlockAPI: blockAPI,
Dataset: snapshot.Dataset,
CurrentPath: file.Name,
File: file,
}

data := make([]byte, file.Size)
n, err := datasetFile.Read(data)
if err != nil {
return errors.Wrap(err, "failed to download file")
}

err = ioutil.WriteFile(filePath, data[:n], filePermissions)
if err != nil {
return errors.Wrap(err, "failed to write file")
}
}

// save snapshot

data, err := json.MarshalIndent(snapshot, "", " ")
if err != nil {
return err
}

err = ioutil.WriteFile(metadataFile, data, filePermissions)
if err != nil {
return errors.Wrap(err, "failed to write metadata file")
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/commands/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Push() *cli.Command {
}

conn, err := components.GRPCClient(ctx.Context, components.GRPCClientConfig{
Target: "localhost:8080",
Target: lookupHost(),
})
if err != nil {
return err
Expand Down
63 changes: 32 additions & 31 deletions internal/fs/dataset_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ import (
datasetv1 "github.com/mjpitz/aetherfs/api/aetherfs/dataset/v1"
)

type datasetFile struct {
ctx context.Context
type DatasetFile struct {
Context context.Context

blockAPI blockv1.BlockAPIClient
BlockAPI blockv1.BlockAPIClient

Dataset *datasetv1.Dataset
CurrentPath string
File *datasetv1.File

dataset *datasetv1.Dataset
filePath string
file *datasetv1.File
fileOffset int64
}

func (f *datasetFile) Close() error {
func (f *DatasetFile) Close() error {
return nil
}

Expand All @@ -41,20 +42,20 @@ func min(a, b int64) int64 {
return b
}

func (f *datasetFile) Read(p []byte) (n int, err error) {
if f.file == nil {
func (f *DatasetFile) Read(p []byte) (n int, err error) {
if f.File == nil {
return 0, os.ErrInvalid
}

if f.fileOffset >= f.file.Size {
if f.fileOffset >= f.File.Size {
return 0, io.EOF
}

blockSize := int64(f.dataset.BlockSize)
blockSize := int64(f.Dataset.BlockSize)
fileOffset := f.fileOffset

// factor in fileOffset which can reduce the total number of bytes that can be read
numBytesToRead := min(int64(len(p)), f.file.Size-fileOffset)
numBytesToRead := min(int64(len(p)), f.File.Size-fileOffset)

var numBlocksToRead int64
if numBytesToRead%blockSize > 0 {
Expand All @@ -63,8 +64,8 @@ func (f *datasetFile) Read(p []byte) (n int, err error) {
numBlocksToRead += numBytesToRead / blockSize

var datasetFileOffset int64
for _, file := range f.dataset.Files {
if file.Name == f.file.Name {
for _, file := range f.Dataset.Files {
if file.Name == f.File.Name {
break
}

Expand All @@ -79,8 +80,8 @@ func (f *datasetFile) Read(p []byte) (n int, err error) {

bytesRead := 0
for i := startingBlock; i < startingBlock+numBlocksToRead; i++ {
stream, err := f.blockAPI.Download(f.ctx, &blockv1.DownloadRequest{
Signature: f.dataset.Blocks[i],
stream, err := f.BlockAPI.Download(f.Context, &blockv1.DownloadRequest{
Signature: f.Dataset.Blocks[i],
Offset: blockOffset,
Size: min(blockSize, numBytesToRead-int64(bytesRead)),
})
Expand All @@ -93,16 +94,16 @@ func (f *datasetFile) Read(p []byte) (n int, err error) {
LOOP:
for {
resp, err = stream.Recv()
copy(p[bytesRead:], resp.GetPart())
bytesRead += len(resp.GetPart())

switch {
case err == io.EOF:
break LOOP
case err != nil:
// translate err
return bytesRead, translateError(err)
}

copy(p[bytesRead:], resp.Part)
bytesRead += len(resp.Part)
}

// every subsequent block should be read from the start
Expand All @@ -116,8 +117,8 @@ func (f *datasetFile) Read(p []byte) (n int, err error) {
return bytesRead, err
}

func (f *datasetFile) Seek(offset int64, whence int) (int64, error) {
if f.file == nil {
func (f *DatasetFile) Seek(offset int64, whence int) (int64, error) {
if f.File == nil {
return 0, os.ErrInvalid
}

Expand All @@ -128,29 +129,29 @@ func (f *datasetFile) Seek(offset int64, whence int) (int64, error) {
case io.SeekCurrent:
next = f.fileOffset + offset
case io.SeekEnd:
next = f.file.Size + offset
next = f.File.Size + offset
default:
return 0, errors.New("daemons.datasetFile.Seek: invalid whence")
return 0, errors.New("daemons.DatasetFile.Seek: invalid whence")
}

if next < 0 {
return 0, errors.New("daemons.datasetFile.Seek: negative position")
return 0, errors.New("daemons.DatasetFile.Seek: negative position")
}

f.fileOffset = next
return next, nil
}

func (f *datasetFile) Readdir(count int) ([]fs.FileInfo, error) {
func (f *DatasetFile) Readdir(count int) ([]fs.FileInfo, error) {
seen := make(map[string]bool)
var infos []fs.FileInfo

prefix := strings.TrimSuffix(f.filePath, "/")
prefix := strings.TrimSuffix(f.CurrentPath, "/")
if prefix != "" {
prefix = prefix + "/"
}

for _, file := range f.dataset.GetFiles() {
for _, file := range f.Dataset.GetFiles() {
if strings.HasPrefix(file.Name, prefix) {
remaining := strings.TrimPrefix(file.Name, prefix)
remaining = strings.TrimPrefix(remaining, "/")
Expand All @@ -175,13 +176,13 @@ func (f *datasetFile) Readdir(count int) ([]fs.FileInfo, error) {
return infos, nil
}

func (f *datasetFile) Stat() (fs.FileInfo, error) {
name := f.filePath[strings.LastIndex(f.filePath, "/")+1:]
func (f *DatasetFile) Stat() (fs.FileInfo, error) {
name := f.CurrentPath[strings.LastIndex(f.CurrentPath, "/")+1:]

return &fileInfo{
name: name,
file: f.file,
file: f.File,
}, nil
}

var _ http.File = &datasetFile{}
var _ http.File = &DatasetFile{}
2 changes: 1 addition & 1 deletion internal/fs/file_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (f *fileInfo) Size() int64 {
return 0
}

return int64(f.file.GetSize())
return f.file.GetSize()
}

func (f *fileInfo) Mode() fs.FileMode {
Expand Down
Loading

0 comments on commit 6d52d5e

Please sign in to comment.