Skip to content
This repository has been archived by the owner on Oct 31, 2021. It is now read-only.

Commit

Permalink
Improvements + progress on jobs.
Browse files Browse the repository at this point in the history
Building out pull account balances job.
  • Loading branch information
elliotcourant committed Feb 24, 2021
1 parent c4f1343 commit 029bf7a
Show file tree
Hide file tree
Showing 21 changed files with 836 additions and 56 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/nyaruka/phonenumbers v1.0.66
github.com/pkg/errors v0.9.2-0.20201214064552-5dd12d0cfe7f
github.com/plaid/plaid-go v0.0.0-20210216195344-700b8cfc627d
github.com/prometheus/client_golang v1.9.0
github.com/robfig/cron v1.2.0 // indirect
github.com/sirupsen/logrus v1.8.0
github.com/spf13/viper v1.7.1
Expand Down
180 changes: 180 additions & 0 deletions go.sum

Large diffs are not rendered by default.

16 changes: 2 additions & 14 deletions pkg/controller/login.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package controller

import (
"crypto/sha256"
"fmt"
"github.com/harderthanitneedstobe/rest-api/v0/pkg/hash"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -40,7 +39,7 @@ func (c *Controller) loginEndpoint(ctx *context.Context) {
return
}

hashedPassword := c.hashPassword(loginRequest.Email, loginRequest.Password)
hashedPassword := hash.HashPassword(loginRequest.Email, loginRequest.Password)
var login models.Login
if err := c.db.RunInTransaction(ctx.Request().Context(), func(txn *pg.Tx) error {
return txn.Model(&login).
Expand Down Expand Up @@ -102,17 +101,6 @@ func (c *Controller) validateLogin(email, password string) error {
return nil
}

// hashPassword will return a one way hash of the provided user's credentials.
// The email is always converted to lowercase for this hash but the password is
// not modified.
func (c *Controller) hashPassword(email, password string) string {
email = strings.ToLower(email)
hash := sha256.New()
hash.Write([]byte(email))
hash.Write([]byte(password))
return fmt.Sprintf("%X", hash.Sum(nil))
}

func (c *Controller) generateToken(loginId, userId, accountId uint64) (string, error) {
now := time.Now()
claims := &HarderClaims{
Expand Down
8 changes: 1 addition & 7 deletions pkg/controller/main_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controller_test

import (
"fmt"
"github.com/brianvoe/gofakeit/v6"
"github.com/harderthanitneedstobe/rest-api/v0/pkg/application"
"github.com/harderthanitneedstobe/rest-api/v0/pkg/config"
Expand All @@ -11,7 +10,6 @@ import (
"github.com/plaid/plaid-go/plaid"
"github.com/stretchr/testify/require"
"net/http"
"strings"
"testing"
)

Expand Down Expand Up @@ -66,7 +64,7 @@ func GivenIHaveToken(t *testing.T, e *httptest.Expect) string {
FirstName string `json:"firstName"`
LastName string `json:"lastName"`
}
registerRequest.Email = GivenIHaveAnEmail(t)
registerRequest.Email = testutils.GivenIHaveAnEmail(t)
registerRequest.Password = gofakeit.Password(true, true, true, true, false, 32)
registerRequest.FirstName = gofakeit.FirstName()
registerRequest.LastName = gofakeit.LastName()
Expand All @@ -81,7 +79,3 @@ func GivenIHaveToken(t *testing.T, e *httptest.Expect) string {

return token
}

func GivenIHaveAnEmail(t *testing.T) string {
return fmt.Sprintf("%s@testing.harderthanitneedstobe.com", strings.ReplaceAll(gofakeit.UUID(), "-", ""))
}
3 changes: 2 additions & 1 deletion pkg/controller/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"github.com/dgrijalva/jwt-go"
"github.com/harderthanitneedstobe/rest-api/v0/pkg/hash"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (c *Controller) registerEndpoint(ctx *context.Context) {
}

// Hash the user's password so that we can store it securely.
hashedPassword := c.hashPassword(
hashedPassword := hash.HashPassword(
registerRequest.Email, registerRequest.Password,
)

Expand Down
18 changes: 18 additions & 0 deletions pkg/hash/hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package hash

import (
"crypto/sha256"
"fmt"
"strings"
)

// HashPassword will return a one way hash of the provided user's credentials.
// The email is always converted to lowercase for this hash but the password is
// not modified.
func HashPassword(email, password string) string {
email = strings.ToLower(email)
hash := sha256.New()
hash.Write([]byte(email))
hash.Write([]byte(password))
return fmt.Sprintf("%X", hash.Sum(nil))
}
39 changes: 39 additions & 0 deletions pkg/internal/mock_plaid/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,52 @@ package mock_plaid

import (
"encoding/json"
"github.com/brianvoe/gofakeit/v6"
"github.com/harderthanitneedstobe/rest-api/v0/pkg/internal/mock_http_helper"
"github.com/harderthanitneedstobe/rest-api/v0/pkg/testutils"
"github.com/plaid/plaid-go/plaid"
"github.com/stretchr/testify/require"
"net/http"
"testing"
)

func MockGetAccountsExtended(t *testing.T, plaidData *testutils.MockPlaidData) {
mock_http_helper.NewHttpMockJsonResponder(t, "POST", Path(t, "/accounts/get"), func(t *testing.T, request *http.Request) (interface{}, int) {
var getAccountsRequest struct {
ClientId string `json:"client_id"`
Secret string `json:"secret"`
AccessToken string `json:"access_token"`
Options struct {
AccountIds []string `json:"account_ids"`
} `json:"options"`
}
require.NoError(t, json.NewDecoder(request.Body).Decode(&getAccountsRequest), "must decode request")

accounts, ok := plaidData.BankAccounts[getAccountsRequest.AccessToken]
if !ok {
panic("invalid access token mocking not implemented")
}

response := plaid.GetAccountsResponse{
APIResponse: plaid.APIResponse{
RequestID: gofakeit.UUID(),
},
Accounts: make([]plaid.Account, 0),
Item: plaid.Item{}, // Not yet populating this.
}
for _, accountId := range getAccountsRequest.Options.AccountIds {
account, ok := accounts[accountId]
if !ok {
panic("bad account id handling not yet implemented")
}

response.Accounts = append(response.Accounts, account)
}

return response, http.StatusOK
})
}

func MockGetAccounts(t *testing.T, accounts []plaid.Account) {
mock_http_helper.NewHttpMockJsonResponder(t, "POST", Path(t, "/accounts/get"), func(t *testing.T, request *http.Request) (interface{}, int) {
var getAccountsRequest struct {
Expand Down
10 changes: 10 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/go-pg/pg/v10"
"github.com/gocraft/work"
"github.com/gomodule/redigo/redis"
"github.com/harderthanitneedstobe/rest-api/v0/pkg/metrics"
"github.com/harderthanitneedstobe/rest-api/v0/pkg/repository"
"github.com/pkg/errors"
"github.com/plaid/plaid-go/plaid"
Expand All @@ -23,6 +24,7 @@ type jobManagerBase struct {
queue *work.Enqueuer
db *pg.DB
plaidClient *plaid.Client
stats *metrics.Stats
}

func NewJobManager(log *logrus.Entry, pool *redis.Pool, db *pg.DB, plaidClient *plaid.Client) JobManager {
Expand Down Expand Up @@ -55,6 +57,14 @@ func NewJobManager(log *logrus.Entry, pool *redis.Pool, db *pg.DB, plaidClient *
return manager
}

func (j *jobManagerBase) enqueueUniqueJob(name string, arguments map[string]interface{}) (*work.Job, error) {
if j.stats != nil {
j.stats.JobEnqueued(name)
}

return j.queue.EnqueueUnique(name, arguments)
}

func (j *jobManagerBase) TriggerPullInitialTransactions(accountId, userId, linkId uint64) (jobId string, err error) {
job, err := j.queue.EnqueueUnique(PullInitialTransactions, map[string]interface{}{
"accountId": accountId,
Expand Down
150 changes: 126 additions & 24 deletions pkg/jobs/pull_account_balances.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package jobs

import (
"github.com/sirupsen/logrus"

"github.com/gocraft/work"
"github.com/harderthanitneedstobe/rest-api/v0/pkg/models"
"github.com/harderthanitneedstobe/rest-api/v0/pkg/repository"
"github.com/pkg/errors"
"github.com/plaid/plaid-go/plaid"
)

const (
Expand All @@ -12,24 +16,22 @@ const (
)

type PullAccountBalanceWorkItem struct {
AccountID uint64 `pg:"account_id"`
BankAccountIDs []uint64 `pg:"bank_account_ids,type:bigint[]"`
AccountID uint64 `pg:"account_id"`
LinkIDs []uint64 `pg:"link_ids,type:bigint[]"`
}

func (j *jobManagerBase) getPlaidBankAccountsByAccount() ([]PullAccountBalanceWorkItem, error) {
func (j *jobManagerBase) getPlaidLinksByAccount() ([]PullAccountBalanceWorkItem, error) {
// We need an accountId, and all of the bank accounts for that account that can be updated.
var accounts []PullAccountBalanceWorkItem

// Query the database for all accounts with bank accounts that have a link type of plaid.
_, err := j.db.Query(&accounts, `
SELECT
"accounts"."account_id",
array_agg("bank_accounts"."bank_account_id") "bank_account_ids"
FROM "accounts"
INNER JOIN "bank_accounts" ON "bank_accounts"."account_id" = "accounts"."account_id"
INNER JOIN "links" ON "links"."link_id" = "bank_accounts"."link_id" AND "links"."account_id" = "bank_accounts"."account_id"
WHERE "links"."link_type" = ?
GROUP BY "accounts"."account_id"
SELECT
"links"."account_id"
array_agg("links"."link_id") "link_ids"
FROM "links"
WHERE "links"."link_type" = ? AND "links"."plaid_link_id" IS NOT NULL
GROUP BY "links"."account_id"
`, models.PlaidLinkType)
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve accounts to update balances")
Expand All @@ -41,7 +43,7 @@ func (j *jobManagerBase) getPlaidBankAccountsByAccount() ([]PullAccountBalanceWo
func (j *jobManagerBase) enqueuePullAccountBalances(job *work.Job) error {
log := j.getLogForJob(job)

accounts, err := j.getPlaidBankAccountsByAccount()
accounts, err := j.getPlaidLinksByAccount()
if err != nil {
log.WithError(err).Errorf("failed to retrieve bank accounts that need to by synced")
return err
Expand All @@ -50,18 +52,23 @@ func (j *jobManagerBase) enqueuePullAccountBalances(job *work.Job) error {
log.Infof("enqueueing %d account(s) for sync", len(accounts))

for _, account := range accounts {
accountLog := log.WithField("accountId", account.AccountID)
accountLog.Trace("enqueueing for account balance update")
_, err := j.queue.EnqueueUnique(PullAccountBalances, map[string]interface{}{
"accountId": account.AccountID,
"bankAccountIds": account.BankAccountIDs,
})
if err != nil {
err = errors.Wrap(err, "failed to enqueue account")
accountLog.WithError(err).Error("could not enqueue account, data will not be synced")
continue
for _, linkId := range account.LinkIDs {
accountLog := log.WithFields(logrus.Fields{
"accountId": account.AccountID,
"linkId": linkId,
})
accountLog.Trace("enqueueing for account balance update")

_, err = j.enqueueUniqueJob(PullAccountBalances, map[string]interface{}{
"accountId": account.AccountID,
"linkId": linkId,
})
if err != nil {
accountLog.WithError(err).Error("could not enqueue account, data will not be synced")
continue
}
accountLog.Trace("successfully enqueued account for account balance update")
}
accountLog.Trace("successfully enqueued account for account balance update")
}

return nil
Expand All @@ -70,5 +77,100 @@ func (j *jobManagerBase) enqueuePullAccountBalances(job *work.Job) error {
func (j *jobManagerBase) pullAccountBalances(job *work.Job) error {
log := j.getLogForJob(job)
log.Infof("pulling account balances")
return nil

accountId, err := j.getAccountId(job)
if err != nil {
log.WithError(err).Error("could not run job, no account Id")
return err
}

linkId := uint64(job.ArgInt64("linkId"))

return j.getRepositoryForJob(job, func(repo repository.Repository) error {
link, err := repo.GetLink(linkId)
if err != nil {
log.WithError(err).Error("failed to retrieve link details to pull balances")
return err
}

if link.PlaidLink == nil {
err = errors.Errorf("cannot pull account balanaces for link without plaid info")
log.WithError(err).Errorf("failed to pull balances")
return err
}

bankAccounts, err := repo.GetBankAccountsByLinkId(linkId)
if err != nil {
log.WithError(err).Error("failed to retrieve bank account details to pull balances")
return err
}

groupedByPlaidAccessToken := map[string][]models.BankAccount{}

for _, bankAccount := range bankAccounts {
if bankAccount.Link == nil || bankAccount.Link.PlaidLink == nil {
// TODO (elliotcourant) Log something here maybe? This shouldn't happen so we might want to try to keep track
// of it if it does?
continue
}

accounts, ok := groupedByPlaidAccessToken[bankAccount.Link.PlaidLink.AccessToken]
if !ok {
// If the access token is not present, store it and create a new array.
groupedByPlaidAccessToken[bankAccount.Link.PlaidLink.AccessToken] = []models.BankAccount{
bankAccount,
}

// Keep moving along.
continue
}

// If the access token is already present, simply append this account.
accounts = append(accounts, bankAccount)
}

for accessToken, banks := range groupedByPlaidAccessToken {
// Gather the plaid account Ids so we can precisely query plaid.
plaidIdsToBankIds := map[string]uint64{}
itemBankAccountIds := make([]string, len(banks))
for i, bankAccount := range banks {
itemBankAccountIds[i] = bankAccount.PlaidAccountId
plaidIdsToBankIds[bankAccount.PlaidAccountId] = bankAccount.BankAccountId
}

log.Tracef("requesting information for %d bank account(s)", len(itemBankAccountIds))

result, err := j.plaidClient.GetAccountsWithOptions(
accessToken,
plaid.GetAccountsOptions{
AccountIDs: itemBankAccountIds,
},
)
if err != nil {
log.WithError(err).Error("failed to retrieve bank accounts from plaid")
continue // We don't want it to prevent others from being processed.
}

// TODO (elliotcourant) If we lift this array out of this loop then we could update all the bank accounts for all
// of an account's links in a single query which would be more efficient.
updatedBankAccounts := make([]models.BankAccount, len(result.Accounts))
for i, item := range result.Accounts {
// TODO (elliotcourant) Maybe add something here to compare balances to the existing account record? If there
// are no changes there is no need to update the account at all.
updatedBankAccounts[i] = models.BankAccount{
BankAccountId: plaidIdsToBankIds[item.AccountID],
AccountId: accountId,
AvailableBalance: int64(item.Balances.Available * 100),
CurrentBalance: int64(item.Balances.Current * 100),
}
}

if err := repo.UpdateBankAccounts(updatedBankAccounts); err != nil {
log.WithError(err).Error("failed to update bank account balances")
return err
}
}

return nil
})
}
Loading

0 comments on commit 029bf7a

Please sign in to comment.