Skip to content

Commit

Permalink
Test: Symmetric marshal API
Browse files Browse the repository at this point in the history
  • Loading branch information
mcastorina committed Jun 8, 2023
1 parent 28a7b65 commit f51c64a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 42 deletions.
46 changes: 18 additions & 28 deletions pkg/sources/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package git
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net/url"
Expand Down Expand Up @@ -877,28 +878,33 @@ func handleBinary(ctx context.Context, repo *git.Repository, chunksChan chan *so
}

func (s *Source) UnitChunks(ctx context.Context, unit sources.SourceUnit, chunksChan chan *sources.Chunk) error {
if repo, ok := unit.(RepositoryUnit); ok {
return s.chunkRepo(ctx, repo, chunksChan)
var m map[string]string
if err := json.Unmarshal(unit.Data, &m); err != nil {
return err
}
if dir, ok := unit.(DirectoryUnit); ok {
return s.chunkDir(ctx, dir, chunksChan)
switch m["type"] {
case "directory":
return s.chunkDir(ctx, unit.ID, chunksChan)
case "repository":
return s.chunkRepo(ctx, unit.ID, chunksChan)
default:
return fmt.Errorf("unrecognized unit")
}
return fmt.Errorf("unrecognized unit")
}

func (s *Source) chunkRepo(ctx context.Context, unit RepositoryUnit, chunksChan chan *sources.Chunk) error {
func (s *Source) chunkRepo(ctx context.Context, repoURI string, chunksChan chan *sources.Chunk) error {
var path string
var repo *git.Repository
var err error
switch cred := s.conn.GetCredential().(type) {
case *sourcespb.Git_BasicAuth:
user := cred.BasicAuth.Username
token := cred.BasicAuth.Password
path, repo, err = CloneRepoUsingToken(ctx, token, unit.Repository, user)
path, repo, err = CloneRepoUsingToken(ctx, token, repoURI, user)
case *sourcespb.Git_Unauthenticated:
path, repo, err = CloneRepoUsingUnauthenticated(ctx, unit.Repository)
path, repo, err = CloneRepoUsingUnauthenticated(ctx, repoURI)
case *sourcespb.Git_SshAuth:
path, repo, err = CloneRepoUsingSSH(ctx, unit.Repository)
path, repo, err = CloneRepoUsingSSH(ctx, repoURI)
default:
// Note: should be unreachable at this point.
return errors.New("invalid connection type for git source")
Expand All @@ -916,9 +922,7 @@ func (s *Source) chunkRepo(ctx context.Context, unit RepositoryUnit, chunksChan
return nil
}

func (s *Source) chunkDir(ctx context.Context, unit DirectoryUnit, chunksChan chan *sources.Chunk) error {
gitDir := unit.Directory

func (s *Source) chunkDir(ctx context.Context, gitDir string, chunksChan chan *sources.Chunk) error {
repo, err := RepoFromPath(gitDir)
if err != nil {
return fmt.Errorf("error scanning git directory: %w", err)
Expand All @@ -939,31 +943,17 @@ func (s *Source) chunkDir(ctx context.Context, unit DirectoryUnit, chunksChan ch
func (s *Source) Enumerate(ctx context.Context, unitsChan chan sources.SourceUnit) error {
for _, item := range s.conn.GetDirectories() {
select {
case unitsChan <- DirectoryUnit{item}:
case unitsChan <- sources.SourceUnit{ID: item, Data: []byte(`{"type":"directory"}`)}:
case <-ctx.Done():
return ctx.Err()
}
}
for _, item := range s.conn.GetRepositories() {
select {
case unitsChan <- RepositoryUnit{item}:
case unitsChan <- sources.SourceUnit{ID: item, Data: []byte(`{"type":"repository"}`)}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

func (s *Source) UnmarshalSourceUnit(data []byte) (sources.SourceUnit, error) {
// Try RepositoryUnit before DirectoryUnit.
if repo, err := sources.UnmarshalUnit[RepositoryUnit](data); err == nil && repo.Repository != "" {
return repo, nil
}
return sources.UnmarshalUnit[DirectoryUnit](data)
}

type DirectoryUnit struct{ Directory string }
type RepositoryUnit struct{ Repository string }

func (u DirectoryUnit) SourceUnitID() string { return u.Directory }
func (u RepositoryUnit) SourceUnitID() string { return u.Repository }
12 changes: 0 additions & 12 deletions pkg/sources/source_unit.go

This file was deleted.

5 changes: 3 additions & 2 deletions pkg/sources/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ type Source interface {

// SourceUnit is an object that represents a Source's unit of work. This is
// used for source configuration, progress reporting, and job distribution.
type SourceUnit interface {
type SourceUnit struct {
// SourceUnitID uniquely identifies a source unit.
SourceUnitID() string
ID string
Data []byte
}

// GCSConfig defines the optional configuration for a GCS source.
Expand Down

0 comments on commit f51c64a

Please sign in to comment.