Skip to content

Commit

Permalink
aep: Allow merged surrogates to use cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ctessum committed Apr 20, 2020
1 parent 1c21fe4 commit a141333
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 61 deletions.
55 changes: 23 additions & 32 deletions emissions/aep/spatialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ import (
"net/url"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"

"github.com/ctessum/geom/proj"
"github.com/ctessum/requestcache/v2"
"github.com/ctessum/requestcache/v4"
"github.com/ctessum/sparse"
"github.com/ctessum/unit"

// Register sqlite drivers
_ "github.com/mattn/go-sqlite3"
)

Expand Down Expand Up @@ -104,65 +104,57 @@ func init() {

// unmarshalGriddedSrgData unmarshals an interface from a byte array and fulfills
// the requirements for the Disk cache unmarshalFunc input.
func unmarshalGriddedSrgData(b []byte) (interface{}, error) {
func (data *GriddedSrgData) UnmarshalBinary(b []byte) error {
r := bytes.NewBuffer(b)
d := gob.NewDecoder(r)
var data GriddedSrgData
if err := d.Decode(&data); err != nil {
return nil, err
}
return &data, nil
return d.Decode(data)
}

// marshalGriddedSrgData marshals an interface to a byte array and fulfills
// the requirements for the Disk cache marshalFunc input.
func marshalGriddedSrgData(data interface{}) ([]byte, error) {
// MarshalBinary marshals the data to a byte array.
func (data *GriddedSrgData) MarshalBinary() ([]byte, error) {
w := bytes.NewBuffer(nil)
e := gob.NewEncoder(w)
d := *data.(*interface{})
dd := d.(*GriddedSrgData)
if err := e.Encode(dd); err != nil {
if err := e.Encode(data); err != nil {
return nil, err
}
return w.Bytes(), nil
}

func (sp *SpatialProcessor) load() error {
var err error
sp.cache, err = newCache(sp.DiskCachePath, sp.MemCacheSize, marshalGriddedSrgData, unmarshalGriddedSrgData)
sp.cache, err = newCache(sp.DiskCachePath, sp.MemCacheSize)
return err
}

func newCache(diskCachePath string, memCacheSize int, marshalFunc func(interface{}) ([]byte, error), unmarshalFunc func([]byte) (interface{}, error)) (*requestcache.Cache, error) {
func newCache(diskCachePath string, memCacheSize int) (*requestcache.Cache, error) {
dedup := requestcache.Deduplicate()
nprocs := runtime.GOMAXPROCS(-1)
mc := requestcache.Memory(memCacheSize)
if diskCachePath == "" {
return requestcache.NewCache(nprocs, dedup, mc), nil
return requestcache.NewCache(dedup, mc), nil
} else {
if strings.HasPrefix(diskCachePath, "gs://") {
loc, err := url.Parse(diskCachePath)
if err != nil {
return nil, err
}
cf, err := requestcache.GoogleCloudStorage(context.TODO(), loc.Host, strings.TrimLeft(loc.Path, "/"), marshalFunc, unmarshalFunc)
cf, err := requestcache.GoogleCloudStorage(context.TODO(), loc.Host, strings.TrimLeft(loc.Path, "/"))
if err != nil {
return nil, err
}
return requestcache.NewCache(nprocs, dedup, mc, cf), nil
return requestcache.NewCache(dedup, mc, cf), nil
} else if filepath.Ext(diskCachePath) == ".sqlite3" {
db, err := sql.Open("sqlite3", diskCachePath)
if err != nil {
return nil, err
}
cf, err := requestcache.SQL(context.Background(), db, marshalFunc, unmarshalFunc)
cf, err := requestcache.SQL(context.Background(), db)
if err != nil {
return nil, err
}
return requestcache.NewCache(nprocs, dedup, mc, cf), nil
return requestcache.NewCache(dedup, mc, cf), nil
} else {
return requestcache.NewCache(nprocs, dedup, mc,
requestcache.Disk(diskCachePath, marshalFunc, unmarshalFunc)), nil
return requestcache.NewCache(dedup, mc,
requestcache.Disk(diskCachePath)), nil
}
}
}
Expand All @@ -187,6 +179,7 @@ type RecordGridded interface {
emis map[Pollutant]*sparse.SparseArray, units map[Pollutant]unit.Dimensions, err error)
}

// GridRecord returns a record that can allocate the emissions to a grid.
func (sp *SpatialProcessor) GridRecord(r Record) RecordGridded {
return &recordGridded{
Record: r,
Expand Down Expand Up @@ -286,12 +279,11 @@ func (sp *SpatialProcessor) Surrogate(srgSpec SrgSpec, grid *GridDef, loc *Locat
}

s := &srgGrid{srg: srgSpec, gridData: grid, loc: loc, sp: sp}
req := sp.cache.NewRequest(context.Background(), s)
resultI, err := req.Result()
if err != nil {
req := sp.cache.NewRequestRecursive(context.Background(), s)
result := new(GriddedSrgData)
if err := req.Result(result); err != nil {
return nil, false, err
}
result := resultI.(*GriddedSrgData)
srg, coveredByGrid := result.ToGrid()
if srg != nil {
return srg, coveredByGrid, nil
Expand All @@ -303,12 +295,11 @@ func (sp *SpatialProcessor) Surrogate(srgSpec SrgSpec, grid *GridDef, loc *Locat
return nil, false, err
}
s := &srgGrid{srg: newSrgSpec, gridData: grid, loc: loc, sp: sp}
req := sp.cache.NewRequest(context.Background(), s)
resultI, err := req.Result()
if err != nil {
result := new(GriddedSrgData)
req := sp.cache.NewRequestRecursive(context.Background(), s)
if err := req.Result(result); err != nil {
return nil, false, err
}
result := resultI.(*GriddedSrgData)
srg, coveredByGrid := result.ToGrid()
if srg != nil {
return srg, coveredByGrid, nil
Expand Down
9 changes: 4 additions & 5 deletions emissions/aep/spatialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,15 @@ func TestCreateSurrogates(t *testing.T) {
for fips, covered := range coveredByGrid {
t.Run(fips, func(t *testing.T) {
sg := &srgGrid{srg: srgSpec, gridData: grid, loc: inputShapes[fips], sp: sp}
srgI, err := sg.Run(context.Background())
if err != nil {
srg := new(GriddedSrgData)
if err := sg.Run(context.Background(), nil, srg); err != nil {
t.Fatalf("creating surrogate %s, FIPS %s: %v", code, fips, err)
}
if srgI == nil {

if srg == nil {
t.Fatalf("county %s is not in surrogate %s", fips, code)
}

srg := srgI.(*GriddedSrgData)

if srg.CoveredByGrid != covered {
t.Errorf("county %s should %v be covered by the grid but it is %v",
fips, covered, srg.CoveredByGrid)
Expand Down
39 changes: 38 additions & 1 deletion emissions/aep/srgspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ package aep

import (
"context"
"database/sql"
"encoding/csv"
"fmt"
"io"
"log"
"math"
"net/url"
"path/filepath"
"runtime"
"strconv"
"strings"

Expand Down Expand Up @@ -84,6 +87,40 @@ type SrgSpecSMOKE struct {

const none = "NONE"

func newCacheV2(diskCachePath string, memCacheSize int, marshalFunc func(interface{}) ([]byte, error), unmarshalFunc func([]byte) (interface{}, error)) (*requestcache.Cache, error) {
dedup := requestcache.Deduplicate()
nprocs := runtime.GOMAXPROCS(-1)
mc := requestcache.Memory(memCacheSize)
if diskCachePath == "" {
return requestcache.NewCache(nprocs, dedup, mc), nil
} else {
if strings.HasPrefix(diskCachePath, "gs://") {
loc, err := url.Parse(diskCachePath)
if err != nil {
return nil, err
}
cf, err := requestcache.GoogleCloudStorage(context.TODO(), loc.Host, strings.TrimLeft(loc.Path, "/"), marshalFunc, unmarshalFunc)
if err != nil {
return nil, err
}
return requestcache.NewCache(nprocs, dedup, mc, cf), nil
} else if filepath.Ext(diskCachePath) == ".sqlite3" {
db, err := sql.Open("sqlite3", diskCachePath)
if err != nil {
return nil, err
}
cf, err := requestcache.SQL(context.Background(), db, marshalFunc, unmarshalFunc)
if err != nil {
return nil, err
}
return requestcache.NewCache(nprocs, dedup, mc, cf), nil
} else {
return requestcache.NewCache(nprocs, dedup, mc,
requestcache.Disk(diskCachePath, marshalFunc, unmarshalFunc)), nil
}
}
}

// ReadSrgSpecSMOKE reads a SMOKE formatted spatial surrogate specification file.
// Results are returned as a map of surrogate specifications as indexed by
// their unique ID, which is Region+SurrogateCode. shapefileDir specifies the
Expand All @@ -104,7 +141,7 @@ func ReadSrgSpecSMOKE(fid io.Reader, shapefileDir string, checkShapefiles bool,
if err != nil {
return nil, fmt.Errorf("in ReadSrgSpec: %v", err)
}
cache, err := newCache(diskCachePath, memCacheSize, marshalSrgHolders, unmarshalSrgHolders)
cache, err := newCacheV2(diskCachePath, memCacheSize, marshalSrgHolders, unmarshalSrgHolders)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion emissions/aep/srgspec_osm.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func ReadSrgSpecOSM(r io.Reader, diskCachePath string, memCacheSize int) (*SrgSp
return nil, err
}
srgs := NewSrgSpecs()
cache, err := newCache(diskCachePath, memCacheSize, marshalSrgHolders, unmarshalSrgHolders)
cache, err := newCacheV2(diskCachePath, memCacheSize, marshalSrgHolders, unmarshalSrgHolders)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions emissions/aep/srgspec_osm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,10 @@ func TestCreateSurrogates_osm(t *testing.T) {
t.Fatal(err)
}
sg := &srgGrid{srg: srgSpec, gridData: grid, loc: inputLoc, sp: sp}
srgsI, err := sg.Run(context.Background())
if err != nil {
srgs := new(GriddedSrgData)
if err := sg.Run(context.Background(), nil, srgs); err != nil {
t.Fatalf("creating surrogate %s: %v", code, err)
}
srgs := srgsI.(*GriddedSrgData)
griddedSrg, covered := srgs.ToGrid()
if covered {
t.Errorf("srg %s should not cover", code)
Expand Down
33 changes: 18 additions & 15 deletions emissions/aep/surrogate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ctessum/geom"
"github.com/ctessum/geom/encoding/shp"
"github.com/ctessum/geom/index/rtree"
"github.com/ctessum/requestcache/v4"
"github.com/ctessum/sparse"
"github.com/spatialmodel/inmap/internal/hash"
)
Expand Down Expand Up @@ -139,23 +140,24 @@ func ParseSurrogateFilter(filterFunction string) *SurrogateFilter {
}

// createMerged creates a surrogate by creating and merging other surrogates.
func (sp *SpatialProcessor) createMerged(srg SrgSpec, gridData *GridDef, loc *Location) (*GriddedSrgData, error) {
func (sp *SpatialProcessor) createMerged(srg SrgSpec, gridData *GridDef, loc *Location, result *GriddedSrgData) error {
mrgSrgs := make([]*GriddedSrgData, len(srg.mergeNames()))
for i, mrgName := range srg.mergeNames() {
newSrg, err := sp.SrgSpecs.GetByName(srg.region(), mrgName)
if err != nil {
return nil, err
return err
}
// If we use the cache here it is possible to end up with a channel deadlock,
// so we generate the surrogate from scratch here.
sg := &srgGrid{srg: newSrg, gridData: gridData, loc: loc, sp: sp}
data, err := sg.Run(context.Background())
if err != nil {
return nil, err
req := sp.cache.NewRequestRecursive(context.Background(), sg)
data := new(GriddedSrgData)
if err := req.Result(data); err != nil {
return err
}
mrgSrgs[i] = data.(*GriddedSrgData)
mrgSrgs[i] = data
}
return mergeSrgs(mrgSrgs, srg.mergeMultipliers()), nil
res := mergeSrgs(mrgSrgs, srg.mergeMultipliers())
*result = *res
return nil
}

// srgGrid holds a surrogate specification and a grid definition.
Expand All @@ -173,22 +175,22 @@ func (sg *srgGrid) Key() string {

// Run creates a new gridding surrogate based on a
// surrogate specification and grid definition.
func (sg *srgGrid) Run(_ context.Context) (interface{}, error) {
func (sg *srgGrid) Run(_ context.Context, _ *requestcache.Cache, res requestcache.Result) error {
srg := sg.srg
gridData := sg.gridData
sp := sg.sp
loc := sg.loc
if loc == nil {
return nil, fmt.Errorf("aep.SpatialProcessor.createSurrogate: missing location: %+v", gridData)
return fmt.Errorf("aep.SpatialProcessor.createSurrogate: missing location: %+v", gridData)
}
if len(srg.mergeNames()) != 0 {
return sp.createMerged(srg, gridData, loc)
return sp.createMerged(srg, gridData, loc, res.(*GriddedSrgData))
}
log.Printf("creating surrogate `%s` for location %s", srg.name(), loc)

srgData, err := srg.getSrgData(gridData, loc, sp.SimplifyTolerance)
if err != nil {
return nil, err
return err
}

// Start workers
Expand All @@ -213,10 +215,11 @@ func (sg *srgGrid) Run(_ context.Context) (interface{}, error) {
for i := 0; i < workersRunning; i++ {
err = <-errchan
if err != nil {
return nil, err
return err
}
}
return grdsrg, nil
*(res.(*GriddedSrgData)) = *grdsrg
return nil
}

// WriteToShp write an individual gridding surrogate to a shapefile.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/ctessum/plotextra v0.0.0-20180623195436-96488e3f1996
github.com/ctessum/requestcache v1.0.1
github.com/ctessum/requestcache/v2 v2.0.0
github.com/ctessum/requestcache/v4 v4.0.0
github.com/ctessum/sparse v0.0.0-20181201011727-57d6234a2c9d
github.com/ctessum/unit v0.0.0-20160621200450-755774ac2fcb
github.com/davecgh/go-spew v1.1.1
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ github.com/ctessum/atmos v0.0.0-20170526022537-cba69f7ca647 h1:lUT29TZ1puayHZ2KL
github.com/ctessum/atmos v0.0.0-20170526022537-cba69f7ca647/go.mod h1:sPaXeGajd2kT9U0BHxsZRfon2LjMH3Ohs2VOF+ACNSY=
github.com/ctessum/cdf v0.0.0-20181201011353-edced208ea9d h1:mIQwc9ihMh23CRrvN5Vyv2T5E73NVALNguZizrV9XoI=
github.com/ctessum/cdf v0.0.0-20181201011353-edced208ea9d/go.mod h1:E45T61NEPsdFg0n3CqBwcG7jbwHTRdhOufhVTm3/Dtc=
github.com/ctessum/geom v0.2.9 h1:JHq9D1kJw/omHJfAOLNHUijVMAmti8+7K29CSj4EG4o=
github.com/ctessum/geom v0.2.9/go.mod h1:CdcpDcEdImJy9b20i+xEcYZZt9rl2oT3kCCGe0OVE3Q=
github.com/ctessum/geom v0.2.10-0.20200417141930-c1ad83ff7e0d h1:Z+MI31PiGuMxlejFqT6ibKT8JxEehLDrfGCvqprLSKI=
github.com/ctessum/geom v0.2.10-0.20200417141930-c1ad83ff7e0d/go.mod h1:E6Ji+jwCV2ejMYnDnmTjeY6SDNCbDDZVwQjbr+fQBoo=
github.com/ctessum/go-leaflet v0.0.0-20170724133759-2f9e4c38fb5e h1:IV59uAui/Q61m9UyCjpOCEl0Bg1g9kwsgxcF5Zt8yTQ=
Expand All @@ -61,14 +59,14 @@ github.com/ctessum/gobra v0.0.0-20180516235632-ddfa5eeb3017 h1:51Dc8x+DBWx1x+U9l
github.com/ctessum/gobra v0.0.0-20180516235632-ddfa5eeb3017/go.mod h1:Ny68iWp54ir4xyHjzCNgBVvpA6iKScCREYtd6+ZSBwE=
github.com/ctessum/plotextra v0.0.0-20180623195436-96488e3f1996 h1:kOR5vpmJ6tUaFpxj8Wd9zdoRpCBqhH07j+7DcwFAlV8=
github.com/ctessum/plotextra v0.0.0-20180623195436-96488e3f1996/go.mod h1:uhWEDow2kQayrGUrnESm3nIsGjq0Oc2unMZCGz21zW0=
github.com/ctessum/polyclip-go v1.0.1 h1:y+qTvBa6KLRTq/mZKOHciTaHv939nU5zl9mThLbK/m8=
github.com/ctessum/polyclip-go v1.0.1/go.mod h1:e/Lh1JOGyynZwLr0M4tZGIyx07wXw9T+pu6hFut+kFQ=
github.com/ctessum/polyclip-go v1.0.2-0.20200417141046-48e92ea36ddd h1:BVBbmu475OhEvEFXp17QEf0XHJhNHsfvGyNTjsshfOA=
github.com/ctessum/polyclip-go v1.0.2-0.20200417141046-48e92ea36ddd/go.mod h1:e/Lh1JOGyynZwLr0M4tZGIyx07wXw9T+pu6hFut+kFQ=
github.com/ctessum/requestcache v1.0.1 h1:YDoBJSQw+kQE0hEs+2x4CYB6IXHLZdcpYgKbh8qinok=
github.com/ctessum/requestcache v1.0.1/go.mod h1:xVlz88TkYBiJfpGe9Xo8KvAmfdmaGyvhBJyltQx4WVQ=
github.com/ctessum/requestcache/v2 v2.0.0 h1:rT1SSj8Yx2nQi19m6cPexlO59FuaX4JbhLYPlr74Fbg=
github.com/ctessum/requestcache/v2 v2.0.0/go.mod h1:5NbYIrgAaq500bz6iir80Byxex209I5gHg8IqVCO7w8=
github.com/ctessum/requestcache/v4 v4.0.0 h1:oWF8gb2FwPCURfR7I4+n25Rzv6mhOQN4VazBo8NTZoY=
github.com/ctessum/requestcache/v4 v4.0.0/go.mod h1:otYUsJaHa2jH15hZR2BnlcB/WayfAiW/QwHX0S/B7dQ=
github.com/ctessum/sparse v0.0.0-20181201011727-57d6234a2c9d h1:R6rUEdBcieof5DgG6UGkRH6FPBcfI60qRD6KusHIv+c=
github.com/ctessum/sparse v0.0.0-20181201011727-57d6234a2c9d/go.mod h1:UGiTsb4CQb8URion27d8H9aXu3UTbtCI0AgV1frKZsA=
github.com/ctessum/unit v0.0.0-20160621200450-755774ac2fcb h1:/S9w9JroTJ0Zo70VXUutYAI6YVBDwuYMUZt73fBq5r0=
Expand Down

0 comments on commit a141333

Please sign in to comment.