/
portfolio.go
147 lines (136 loc) · 5.08 KB
/
portfolio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package service
import (
"github.com/jeremyhahn/tradebot/common"
"github.com/jeremyhahn/tradebot/dto"
"github.com/shopspring/decimal"
)
type PortfolioServiceImpl struct {
ctx common.Context
stopChans map[uint]chan bool
portfolios map[uint]common.Portfolio
marketcapService common.MarketCapService
userService UserService
ethereumService EthereumService
PortfolioService
}
func NewPortfolioService(ctx common.Context, marketcapService common.MarketCapService,
userService UserService, ethereumService EthereumService) PortfolioService {
return &PortfolioServiceImpl{
ctx: ctx,
stopChans: make(map[uint]chan bool),
portfolios: make(map[uint]common.Portfolio),
marketcapService: marketcapService,
userService: userService,
ethereumService: ethereumService}
}
func (ps *PortfolioServiceImpl) Build(user common.UserContext, currencyPair *common.CurrencyPair) (common.Portfolio, error) {
ps.ctx.GetLogger().Debugf("[PortfolioService.Build] Building portfolio for %s", user.GetUsername())
var netWorth decimal.Decimal
exchangeList, err := ps.userService.GetExchangeSummary(currencyPair)
if err != nil {
ps.ctx.GetLogger().Errorf("[PortfolioService.Build] Error: %s", err.Error())
return nil, err
}
for _, ex := range exchangeList {
netWorth = netWorth.Add(ex.GetTotal())
}
walletList := ps.userService.GetWallets()
for _, w := range walletList {
netWorth = netWorth.Add(w.GetValue())
}
tokenList, err := ps.userService.GetTokens()
for _, t := range tokenList {
netWorth = netWorth.Add(t.GetValue())
}
/*
accounts, err := ps.ethereumService.GetAccounts()
if err != nil {
ps.ctx.GetLogger().Errorf("[PortfolioService.Build] Error getting local Ethereum accounts: %s", err.Error())
}
for _, acct := range accounts {
etherbase := acct.GetEtherbase()
wallet, err := ps.ethereumService.GetWallet(etherbase)
if err != nil {
ps.ctx.GetLogger().Errorf("[PortfolioService.Build] Error getting Ethereum account balance for address %s: %s",
etherbase, err.Error())
}
priceUSD, err := strconv.ParseFloat(ps.marketcapService.GetMarket("ETH").PriceUSD, 64)
if err != nil {
ps.ctx.GetLogger().Errorf("[PortfolioService.Build] Error parsing MarketCap ETH response to float for address %s: %s",
etherbase, err.Error())
}
total := wallet.GetBalance() * priceUSD
walletList = append(walletList, &dto.UserCryptoWalletDTO{
Address: etherbase,
Balance: wallet.GetBalance(),
Currency: "ETH",
Value: total})
netWorth += total
tokens, err := ps.userService.GetTokens(ps.ctx.GetUser(), etherbase)
if err != nil {
ps.ctx.GetLogger().Errorf("[PortfolioService.Build] Error getting current user: %s", err.Error())
}
for _, token := range tokens {
tokenList = append(tokenList, token)
netWorth += token.GetBalance() * priceUSD
}
}*/
currentUser, err := ps.userService.GetCurrentUser()
if err != nil {
ps.ctx.GetLogger().Errorf("[PortfolioService.Build] Error getting current user: %s", err.Error())
}
portfolio := &dto.PortfolioDTO{
User: currentUser,
NetWorth: netWorth.Truncate(2),
Exchanges: exchangeList,
Wallets: walletList,
Tokens: tokenList}
ps.portfolios[user.GetId()] = portfolio
ps.stopChans[user.GetId()] = make(chan bool, 1)
return portfolio, nil
}
func (ps *PortfolioServiceImpl) Queue(user common.UserContext) (<-chan common.Portfolio, error) {
ps.ctx.GetLogger().Debugf("[PortfolioService.Queue] Adding portfolio to queue on behalf of %s", user.GetUsername())
currencyPair := &common.CurrencyPair{Base: "BTC", Quote: "USD", LocalCurrency: "USD"}
portfolio, err := ps.Build(user, currencyPair)
if err != nil {
ps.ctx.GetLogger().Debugf("[PortfolioService.Queue] Error: %s", err.Error())
return nil, err
}
ps.ctx.GetLogger().Debugf("[PortfolioService.Queue] portfolio=%+v\n", portfolio)
portChan := make(chan common.Portfolio, 1)
portChan <- portfolio
return portChan, nil
}
func (ps *PortfolioServiceImpl) Stream(user common.UserContext, currencyPair *common.CurrencyPair) (<-chan common.Portfolio, error) {
portfolio, err := ps.Build(user, currencyPair)
if err != nil {
ps.ctx.GetLogger().Errorf("[PortfolioService.Stream] Error: %s", err.Error())
return nil, err
}
ps.ctx.GetLogger().Debugf("[PortfolioService.Stream] Starting stream for %s", portfolio.GetUser().GetUsername())
portChan := make(chan common.Portfolio, 10)
go func() {
for {
select {
case <-ps.stopChans[user.GetId()]:
ps.ctx.GetLogger().Debug("[PortfolioService.Stream] Stopping stream")
delete(ps.stopChans, user.GetId())
default:
ps.ctx.GetLogger().Debugf("[PortfolioService.Stream] Broadcasting portfolio: %+v\n", portfolio)
portChan <- portfolio
}
}
}()
return portChan, nil
}
func (ps *PortfolioServiceImpl) Stop(user common.UserContext) {
ps.ctx.GetLogger().Debugf("[PortfolioService.Stop] Stopping stream for %s\n", user.GetUsername())
if ps.IsStreaming(user) {
ps.stopChans[user.GetId()] <- true
}
}
func (ps *PortfolioServiceImpl) IsStreaming(user common.UserContext) bool {
_, ok := ps.portfolios[user.GetId()]
return ok
}