Skip to content

Commit

Permalink
Concurrency
Browse files Browse the repository at this point in the history
Fix asset imports merge
  • Loading branch information
thinktwice13 committed Jun 12, 2022
1 parent fac57dc commit 4b9d4ba
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 34 deletions.
86 changes: 61 additions & 25 deletions import_results.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"sync"
"time"
)

Expand Down Expand Up @@ -37,6 +38,8 @@ type ImportResults struct {
// Mapped unique currencies and years found in any imported events
currencies map[string]bool
years map[int]bool

l *sync.Mutex
}

// Domicile extracts the instrument country code used fo the ForeignIncome tax report
Expand Down Expand Up @@ -73,17 +76,22 @@ func NewImportResults() *ImportResults {
years: map[int]bool{
time.Now().Year(): true,
},
l: new(sync.Mutex),
}
}

func (r *ImportResults) AddInstrumentInfo(symbols []string, cat string) {
r.l.Lock()
defer r.l.Unlock()
a := r.assets.bySymbols(symbols...)
if a.Category != "" || cat == "" {
return
}
a.Category = cat
}
func (r *ImportResults) AddTrade(sym, ccy string, tm time.Time, qty, price, fee float64) {
r.l.Lock()
defer r.l.Unlock()
a := r.assets.bySymbols(sym)
t := Trade{}
t.Currency = ccy
Expand All @@ -98,6 +106,8 @@ func (r *ImportResults) AddTrade(sym, ccy string, tm time.Time, qty, price, fee

}
func (r *ImportResults) AddDividend(sym, ccy string, yr int, amt float64, isTax bool) {
r.l.Lock()
defer r.l.Unlock()
a := r.assets.bySymbols(sym)
d := Transaction{}
d.Currency = ccy
Expand All @@ -113,6 +123,8 @@ func (r *ImportResults) AddDividend(sym, ccy string, yr int, amt float64, isTax
r.years[yr] = true
}
func (r *ImportResults) AddFee(ccy string, amt float64, yr int) {
r.l.Lock()
defer r.l.Unlock()
f := Transaction{}
f.Currency = ccy
f.Year = yr
Expand All @@ -121,7 +133,6 @@ func (r *ImportResults) AddFee(ccy string, amt float64, yr int) {

r.currencies[ccy] = true
r.years[yr] = true

}

// bySymbols func looks up any assets already imported by incoming symbols
Expand All @@ -130,55 +141,80 @@ func (r *ImportResults) AddFee(ccy string, amt float64, yr int) {
//
// If incoming symbols are matched with more than one distinct asset, merges conflict and uses the first found match
func (as assets) bySymbols(ss ...string) *AssetImport {
if len(ss) == 0 {
if len(ss) < 1 {
return nil
}

var base *AssetImport
newSymbols := make([]string, 0, len(ss))
if len(ss) == 1 {
match, ok := as[ss[0]]
if !ok {
a := &AssetImport{Instrument: Instrument{Symbols: ss}}
as[ss[0]] = a
return a
}

return match
}

// Target is the merged asset combining all of the info of incoming symbols and previously included symbols for an instrument
// Processed symbols used to avoid processing symbols twice
// Unmatched symbols slcie stores incoming symbols not matched with any existing assets. Once any match has been found, not needed anymore
var target *AssetImport
processed := make(map[string]bool, len(ss))
unmatched := make([]string, 0, len(ss))
for _, s := range ss {
if s == "" {
// Skip processinf the same symbol twice
if _, ok := processed[s]; ok {
continue
}

// Find existing match
// If first match found (target), set target and add include all unmatched symbols
// If match not found, but target already found, just add the symbol
// Skip if match found and equal to the target
// Merge info to target if match found not equal
match, ok := as[s]
if target == nil && !ok {
unmatched = append(unmatched, s)
continue
}

if !ok && base == nil {
newSymbols = append(newSymbols, s)
if target == nil {
target = match
target.Symbols = append(target.Symbols, unmatched...)
unmatched = nil
for _, matchedSymbol := range match.Symbols {
processed[matchedSymbol] = true
}
continue
}

if !ok {
base.Symbols = append(base.Symbols, s)
as[s] = match
target.Symbols = append(target.Symbols, s)
processed[s] = true
continue
}

if base == nil {
base = match
for _, s := range newSymbols {
base.Symbols = append(base.Symbols, s)
}
if match == target {
continue
}

if base != match {
// Resolve conflict
mergeAsset(*match, base)
continue
for _, matchedSymbol := range match.Symbols {
processed[matchedSymbol] = true
}
mergeAsset(*match, target)
}

if base == nil {
base = &AssetImport{Instrument: Instrument{Symbols: ss}}
// If still no matches found
if target == nil {
target = &AssetImport{Instrument: Instrument{Symbols: ss}}
}

for _, s := range base.Symbols {
as[s] = base
for _, s := range target.Symbols {
as[s] = target
}

return base
return target
}

// mergeAsset merges information on the assets and jions all founc events: trades, dividends and withholding tax tax
Expand All @@ -199,12 +235,13 @@ func mergeAsset(src AssetImport, tgt *AssetImport) {
tgt.Symbols = list

// TODO improve and check for mismatch when not empty
if tgt.Category == "" {
if tgt.Category == "" && src.Category != "" {
tgt.Category = src.Category
}

tgt.Trades = append(tgt.Trades, src.Trades...)
tgt.Dividends = append(tgt.Dividends, src.Dividends...)
tgt.WithholdingTax = append(tgt.WithholdingTax, src.WithholdingTax...)
}

// assets maps imported asset information by symbol, for easier lookup while importing
Expand All @@ -221,7 +258,6 @@ func (a assets) list() []AssetImport {
list = append(list, *a)
listed[a] = true
}

return list
}

Expand Down
29 changes: 20 additions & 9 deletions read_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,45 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)

// readDir imports all data found in curent directory
func readDir() ([]AssetImport, []Transaction, []int, []string) {
files := findFiles()
ir := NewImportResults()
for _, file := range files {
ReadStatement(file, ir)
var wg sync.WaitGroup
files := make(chan string, 10)
for w := 1; w <= maxWorkers; w++ {
wg.Add(1)
go func(wg *sync.WaitGroup, files <-chan string, r *ImportResults) {
for f := range files {
ReadStatement(f, ir)
// fmt.Println(f)
}
wg.Done()
}(&wg, files, ir)
}

findFiles(files)
wg.Wait()
return ir.assets.list(), ir.fees, list(ir.years), list(ir.currencies)
}

// findFiles walks the current directory and looks for .csv files
func findFiles() []string {
var files []string
func findFiles(files chan<- string) {
count := 0
filepath.WalkDir(os.Getenv("PWD"), func(path string, d fs.DirEntry, err error) error {
if d.IsDir() && d.Name()[:1] == "." {
return filepath.SkipDir
}
if filepath.Ext(d.Name()) == ".csv" {
files = append(files, path)
count++
files <- path
}
return nil
})
fmt.Printf("%d files found. Working... \n", len(files))
return files
close(files)
fmt.Printf("%d files found. Working... \n", count)
}

// ReadStatement reads single csv IBKR file
Expand Down

0 comments on commit 4b9d4ba

Please sign in to comment.