Skip to content

Commit

Permalink
Merge pull request #48 from src-d/progress
Browse files Browse the repository at this point in the history
Status table to add possibility to use those data for progress chart
  • Loading branch information
se7entyse7en committed Jun 21, 2019
2 parents 11dedef + 725187e commit 4c73faf
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 52 deletions.
39 changes: 39 additions & 0 deletions cmd/ghsync/subcmd/common.go
Expand Up @@ -22,6 +22,7 @@ import (
)

const maxVersion uint = 1560510971
const statusTableName = "status"

type PostgresOpt struct {
DB string `long:"postgres-db" env:"GHSYNC_POSTGRES_DB" description:"PostgreSQL DB" default:"ghsync"`
Expand Down Expand Up @@ -72,9 +73,47 @@ func (o PostgresOpt) initDB() (db *sql.DB, err error) {

log.With(log.Fields{"db-version": dbVersion}).Debugf("the DB version is up to date")
log.Infof("connection with the DB established")
if err = o.createStatusTable(); err != nil {
return db, err
}

return db, nil
}

func (o PostgresOpt) createStatusTable() error {
log.Debugf(fmt.Sprintf("creating status table '%s'", statusTableName))

db, err := sql.Open("postgres", o.URL())
if err != nil {
return err
}

defer func() {
if err != nil {
db.Close()
}
}()

stm := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s(
id serial PRIMARY KEY,
org VARCHAR (50) NOT NULL,
entity VARCHAR (20) NOT NULL,
done INTEGER NOT NULL DEFAULT 0,
failed INTEGER NOT NULL DEFAULT 0,
total INTEGER DEFAULT NULL,
UNIQUE (org, entity)
);`, statusTableName)
log.Debugf("running statement: %s", stm)
_, err = db.Exec(stm)
if err != nil {
return fmt.Errorf("an error occured while ensureing the status table: %v", err)
}

log.Infof("status table '%s' created", statusTableName)

return nil
}

func newMigrate(url string) (*migrate.Migrate, error) {
// wrap assets into Resource
s := bindata.Resource(migrations.AssetNames(),
Expand Down
34 changes: 32 additions & 2 deletions cmd/ghsync/subcmd/shallow.go
@@ -1,11 +1,14 @@
package subcmd

import (
"database/sql"
"fmt"
"strings"

"github.com/src-d/ghsync/shallow"

"gopkg.in/src-d/go-cli.v0"
"gopkg.in/src-d/go-log.v1"
)

type ShallowCommand struct {
Expand All @@ -29,8 +32,13 @@ func (c *ShallowCommand) Execute(args []string) error {
return err
}

orgSyncer := shallow.NewOrganizationSyncer(db, client)
for _, o := range strings.Split(c.Orgs, ",") {
orgs := strings.Split(c.Orgs, ",")
if err = c.initStatus(db, statusTableName, orgs); err != nil {
return err
}

orgSyncer := shallow.NewOrganizationSyncer(db, client, statusTableName)
for _, o := range orgs {
err = orgSyncer.Sync(o)
if err != nil {
return err
Expand All @@ -39,3 +47,25 @@ func (c *ShallowCommand) Execute(args []string) error {

return nil
}

func (c *ShallowCommand) initStatus(db *sql.DB, tableName string, orgs []string) error {
log.Debugf("initializing status table for orgs: %v", orgs)
var b strings.Builder

for _, o := range orgs[:len(orgs)-1] {
b.WriteString(fmt.Sprintf("('%s', 'repository'),", o))
b.WriteString(fmt.Sprintf("('%s', 'user'),", o))
}
b.WriteString(fmt.Sprintf("('%s', 'repository'),", orgs[len(orgs)-1]))
b.WriteString(fmt.Sprintf("('%s', 'user')", orgs[len(orgs)-1]))

stm := fmt.Sprintf("INSERT INTO %s (org, entity) VALUES %s ON CONFLICT (org, entity) DO UPDATE SET failed=0, done=0, total=NULL;", tableName, b.String())
log.Debugf("running statement: %s", stm)
_, err := db.Exec(stm)
if err != nil {
return fmt.Errorf(fmt.Sprintf(
"an error occured while initializing %s table: %v", tableName, err))
}

return nil
}
26 changes: 17 additions & 9 deletions shallow/organization.go
Expand Up @@ -13,16 +13,18 @@ import (
)

type OrganizationSyncer struct {
db *sql.DB
store *models.OrganizationStore
client *github.Client
db *sql.DB
store *models.OrganizationStore
client *github.Client
statusTableName string
}

func NewOrganizationSyncer(db *sql.DB, c *github.Client) *OrganizationSyncer {
func NewOrganizationSyncer(db *sql.DB, c *github.Client, statusTableName string) *OrganizationSyncer {
return &OrganizationSyncer{
db: db,
store: models.NewOrganizationStore(db),
client: c,
db: db,
store: models.NewOrganizationStore(db),
client: c,
statusTableName: statusTableName,
}
}

Expand All @@ -40,6 +42,12 @@ func (s *OrganizationSyncer) Sync(login string) error {

if err == nil {
logger.Infof("resource already exists, skipping")
stm := fmt.Sprintf("UPDATE %s SET total=0 WHERE org='%s'", s.statusTableName, login)
_, err = s.db.Exec(stm)
if err != nil {
return fmt.Errorf("unable to update status for org %s: %v", login, err)
}

return nil
}

Expand All @@ -48,13 +56,13 @@ func (s *OrganizationSyncer) Sync(login string) error {
return err
}

repoSyncer := NewRepositorySyncer(s.db, s.client)
repoSyncer := NewRepositorySyncer(s.db, s.client, s.statusTableName)
err = repoSyncer.Sync(login, logger)
if err != nil {
return err
}

userSyncer := NewUserSyncer(s.db, s.client)
userSyncer := NewUserSyncer(s.db, s.client, s.statusTableName)
err = userSyncer.Sync(login, logger)
if err != nil {
return err
Expand Down
46 changes: 39 additions & 7 deletions shallow/repository.go
Expand Up @@ -13,16 +13,18 @@ import (
)

type RepositorySyncer struct {
db *sql.DB
store *models.RepositoryStore
client *github.Client
db *sql.DB
store *models.RepositoryStore
client *github.Client
statusTableName string
}

func NewRepositorySyncer(db *sql.DB, c *github.Client) *RepositorySyncer {
func NewRepositorySyncer(db *sql.DB, c *github.Client, statusTableName string) *RepositorySyncer {
return &RepositorySyncer{
db: db,
store: models.NewRepositoryStore(db),
client: c,
db: db,
store: models.NewRepositoryStore(db),
client: c,
statusTableName: statusTableName,
}
}

Expand Down Expand Up @@ -52,10 +54,30 @@ func (s *RepositorySyncer) Sync(owner string, logger log.Logger) error {
opts.Page = r.NextPage
}

stm := fmt.Sprintf("UPDATE %s SET total=%d WHERE org='%s' AND entity='repository'",
s.statusTableName, len(repos), owner)
log.Debugf("running statement: %s", stm)
if _, err := s.db.Exec(stm); err != nil {
return fmt.Errorf("an error occured while updating %s table: %v",
s.statusTableName, err)
}

// Process each one of them
for _, repository := range repos {
err := s.doRepo(repository, logger)
if err != nil {
stm := fmt.Sprintf("UPDATE %s SET failed=failed + 1 WHERE org='%s' AND entity='repository'",
s.statusTableName, owner)
if err = s.updateStatus(stm); err != nil {
return err
}

return err
}

stm := fmt.Sprintf("UPDATE %s SET done=done + 1 WHERE org='%s' AND entity='repository'",
s.statusTableName, owner)
if err = s.updateStatus(stm); err != nil {
return err
}
}
Expand All @@ -65,6 +87,16 @@ func (s *RepositorySyncer) Sync(owner string, logger log.Logger) error {
return nil
}

func (s *RepositorySyncer) updateStatus(stm string) error {
log.Debugf("running statement: %s", stm)
if _, err := s.db.Exec(stm); err != nil {
return fmt.Errorf("an error occured while updating %s table: %v",
s.statusTableName, err)
}

return nil
}

func (s *RepositorySyncer) doRepo(repository *github.Repository, parentLogger log.Logger) error {
logger := parentLogger.With(log.Fields{"repository": repository.GetName()})

Expand Down
116 changes: 82 additions & 34 deletions shallow/user.go
Expand Up @@ -13,20 +13,23 @@ import (
)

type UserSyncer struct {
db *sql.DB
client *github.Client
db *sql.DB
store *models.UserStore
client *github.Client
statusTableName string
}

func NewUserSyncer(db *sql.DB, c *github.Client) *UserSyncer {
func NewUserSyncer(db *sql.DB, c *github.Client, statusTableName string) *UserSyncer {
return &UserSyncer{
db: db,
client: c,
db: db,
store: models.NewUserStore(db),
client: c,
statusTableName: statusTableName,
}
}

func (s *UserSyncer) Sync(org string, logger log.Logger) error {
store := models.NewUserStore(s.db)
return store.Transaction(func(store *models.UserStore) error {
return s.store.Transaction(func(store *models.UserStore) error {
return s.doUsers(store, org, logger)
})
}
Expand All @@ -37,51 +40,96 @@ func (s *UserSyncer) doUsers(store *models.UserStore, org string, logger log.Log

logger.Infof("starting to retrieve users")

allUsers := make([]*github.User, 0)

// Get the list of all users
for {
users, r, err := s.client.Organizations.ListMembers(context.TODO(), org, opts)
if err != nil {
return err
}

for _, user := range users {
logger := logger.With(log.Fields{"user": user.GetLogin()})

_, err := store.FindOne(models.NewUserQuery().
Where(kallax.And(
kallax.Eq(models.Schema.User.ID, user.GetID()),
)),
)
if err != nil && err != kallax.ErrNotFound {
logger.With(log.Fields{"user": user.GetLogin()}).Errorf(err, "failed to read the resource from the DB")
return fmt.Errorf("failed to read the resource from the DB: %v", err)
}
for _, u := range users {
allUsers = append(allUsers, u)
}

if err == nil {
logger.With(log.Fields{"user": user.GetLogin()}).Infof("resource already exists, skipping")
continue
}
if r.NextPage == 0 {
break
}

record := models.NewUser()
record.User = *user
opts.Page = r.NextPage
}

err = store.Insert(record)
if err != nil {
logger.Errorf(err, "failed to write the resource into the DB")
return fmt.Errorf("failed to write the resource into the DB: %v", err)
stm := fmt.Sprintf("UPDATE %s SET total=%d WHERE org='%s' AND entity='user'",
s.statusTableName, len(allUsers), org)
log.Debugf("running statement: %s", stm)
if _, err := s.db.Exec(stm); err != nil {
return fmt.Errorf("an error occured while updating %s table: %v",
s.statusTableName, err)
}

for _, user := range allUsers {
err := s.doUser(user, logger)
if err != nil {
stm := fmt.Sprintf("UPDATE %s SET failed=failed + 1 WHERE org='%s' AND entity='user'",
s.statusTableName, org)
if err = s.updateStatus(stm); err != nil {
return err
}

logger.Debugf("resource written in the DB")
return err
}

if r.NextPage == 0 {
break
stm := fmt.Sprintf("UPDATE %s SET done=done + 1 WHERE org='%s' AND entity='user'",
s.statusTableName, org)
if err = s.updateStatus(stm); err != nil {
return err
}

opts.Page = r.NextPage
}

logger.Infof("finished to retrieve users")

return nil
}

func (s *UserSyncer) updateStatus(stm string) error {
log.Debugf("running statement: %s", stm)
if _, err := s.db.Exec(stm); err != nil {
return fmt.Errorf("an error occured while updating %s table: %v",
s.statusTableName, err)
}

return nil
}

func (s *UserSyncer) doUser(user *github.User, parentLogger log.Logger) error {
logger := parentLogger.With(log.Fields{"user": user.GetLogin()})

_, err := s.store.FindOne(models.NewUserQuery().
Where(kallax.And(
kallax.Eq(models.Schema.User.ID, user.GetID()),
)),
)
if err != nil && err != kallax.ErrNotFound {
logger.With(log.Fields{"user": user.GetLogin()}).Errorf(err, "failed to read the resource from the DB")
return fmt.Errorf("failed to read the resource from the DB: %v", err)
}

if err == nil {
logger.With(log.Fields{"user": user.GetLogin()}).Infof("resource already exists, skipping")
return nil
}

record := models.NewUser()
record.User = *user

err = s.store.Insert(record)
if err != nil {
logger.Errorf(err, "failed to write the resource into the DB")
return fmt.Errorf("failed to write the resource into the DB: %v", err)
}

logger.Debugf("resource written in the DB")

return nil
}

0 comments on commit 4c73faf

Please sign in to comment.