Skip to content

Commit

Permalink
Drop map locking from policy repository
Browse files Browse the repository at this point in the history
  • Loading branch information
Waldz committed Jan 23, 2020
1 parent 3c25027 commit 47727e5
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 19 deletions.
63 changes: 44 additions & 19 deletions core/policy/policy_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package policy

import (
"fmt"
"strings"
"sync"
"time"

Expand All @@ -28,13 +29,18 @@ import (
"github.com/rs/zerolog/log"
)

type policyMetadata struct {
policy market.AccessPolicy
rules market.AccessPolicyRuleSet
}

// PolicyRepository represents async policy fetcher from TrustOracle
type PolicyRepository struct {
client *requests.HTTPClient

policyURL string
policyLock sync.Mutex
policyRules map[market.AccessPolicy]market.AccessPolicyRuleSet
policyURL string
policyLock sync.Mutex
policyList []policyMetadata

fetchInterval time.Duration
fetchShutdown chan struct{}
Expand All @@ -45,7 +51,7 @@ func NewPolicyRepository(client *requests.HTTPClient, policyURL string, interval
return &PolicyRepository{
client: client,
policyURL: policyURL,
policyRules: make(map[market.AccessPolicy]market.AccessPolicyRuleSet),
policyList: make([]policyMetadata, 0),
fetchInterval: interval,
fetchShutdown: make(chan struct{}),
}
Expand All @@ -64,9 +70,13 @@ func (pr *PolicyRepository) Stop() {

// Policy converts given value to valid policy rule
func (pr *PolicyRepository) Policy(policyID string) market.AccessPolicy {
policyURL := pr.policyURL
if !strings.HasSuffix(policyURL, "/") {
policyURL += "/"
}
return market.AccessPolicy{
ID: policyID,
Source: fmt.Sprintf("%v%v", pr.policyURL, policyID),
Source: fmt.Sprintf("%v%v", policyURL, policyID),
}
}

Expand All @@ -82,7 +92,7 @@ func (pr *PolicyRepository) Policies(policyIDs []string) []market.AccessPolicy {
// AddPolicies adds given policy to repository. Also syncs policy rules from TrustOracle
func (pr *PolicyRepository) AddPolicies(policies []market.AccessPolicy) error {
pr.policyLock.Lock()
policyRulesNew := pr.policyRules
policyListNew := pr.policyList
pr.policyLock.Unlock()

for _, policy := range policies {
Expand All @@ -91,11 +101,15 @@ func (pr *PolicyRepository) AddPolicies(policies []market.AccessPolicy) error {
return errors.Wrap(err, "initial fetch failed")
}

policyRulesNew[policy] = policyRules
if index, exist := pr.getPolicyIndex(policyListNew, policy); exist {
policyListNew[index].rules = policyRules
} else {
policyListNew = append(policyListNew, policyMetadata{policy: policy, rules: policyRules})
}
}

pr.policyLock.Lock()
pr.policyRules = policyRulesNew
pr.policyList = policyListNew
pr.policyLock.Unlock()

return nil
Expand All @@ -106,11 +120,12 @@ func (pr *PolicyRepository) RulesForPolicy(policy market.AccessPolicy) (market.A
pr.policyLock.Lock()
defer pr.policyLock.Unlock()

policyRules, exist := pr.policyRules[policy]
index, exist := pr.getPolicyIndex(pr.policyList, policy)
if !exist {
return policyRules, fmt.Errorf("unknown policy: %s", policy)
return market.AccessPolicyRuleSet{}, fmt.Errorf("unknown policy: %s", policy)
}
return policyRules, nil

return pr.policyList[index].rules, nil
}

// RulesForPolicies gives list of rules of given policies
Expand All @@ -120,15 +135,25 @@ func (pr *PolicyRepository) RulesForPolicies(policies []market.AccessPolicy) ([]

policiesRules := make([]market.AccessPolicyRuleSet, len(policies))
for i, policy := range policies {
policyRules, exist := pr.policyRules[policy]
index, exist := pr.getPolicyIndex(pr.policyList, policy)
if !exist {
return policiesRules, fmt.Errorf("unknown policy: %s", policy)
return []market.AccessPolicyRuleSet{}, fmt.Errorf("unknown policy: %s", policy)
}
policiesRules[i] = policyRules
policiesRules[i] = pr.policyList[index].rules
}
return policiesRules, nil
}

func (pr *PolicyRepository) getPolicyIndex(policyList []policyMetadata, policy market.AccessPolicy) (int, bool) {
for i, policyMeta := range policyList {
if policyMeta.policy == policy {
return i, true
}
}

return 0, false
}

func (pr *PolicyRepository) fetchPolicyRules(policy market.AccessPolicy) (market.AccessPolicyRuleSet, error) {
var policyRules market.AccessPolicyRuleSet

Expand All @@ -151,17 +176,17 @@ func (pr *PolicyRepository) fetchLoop() {
break
case <-time.After(pr.fetchInterval):
pr.policyLock.Lock()
policyRulesActive := pr.policyRules
policyListActive := pr.policyList
pr.policyLock.Unlock()

for policy := range policyRulesActive {
policyRules, err := pr.fetchPolicyRules(policy)
if err != nil {
for i, policyMeta := range policyListActive {
var err error
if policyMeta.rules, err = pr.fetchPolicyRules(policyMeta.policy); err != nil {
log.Warn().Err(err).Msg("synchronise fetch failed")
}

pr.policyLock.Lock()
pr.policyRules[policy] = policyRules
pr.policyList[i] = policyMeta
pr.policyLock.Unlock()
}
}
Expand Down
204 changes: 204 additions & 0 deletions core/policy/policy_repository_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Copyright (C) 2020 The "MysteriumNetwork/node" Authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package policy

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/mysteriumnetwork/node/market"
"github.com/mysteriumnetwork/node/requests"
"github.com/stretchr/testify/assert"
)

var (
policyOneRules = market.AccessPolicyRuleSet{
ID: "1",
Title: "One",
Allow: []market.AccessRule{
{Type: market.AccessPolicyTypeIdentity, Value: "0x1"},
},
}
policyTwoRules = market.AccessPolicyRuleSet{
ID: "2",
Title: "Two",
Allow: []market.AccessRule{
{Type: market.AccessPolicyTypeDNSHostname, Value: "ipinfo.io"},
},
}
policyTreeRules = market.AccessPolicyRuleSet{
ID: "3",
Title: "Three",
Allow: []market.AccessRule{
{Type: market.AccessPolicyTypeDNSZone, Value: "ipinfo.io"},
},
}
)

func Test_PolicyRepository_Policy(t *testing.T) {
repo := &PolicyRepository{policyURL: "http://policy.localhost"}
assert.Equal(
t,
market.AccessPolicy{ID: "1", Source: "http://policy.localhost/1"},
repo.Policy("1"),
)

repo = &PolicyRepository{policyURL: "http://policy.localhost/"}
assert.Equal(
t,
market.AccessPolicy{ID: "2", Source: "http://policy.localhost/2"},
repo.Policy("2"),
)
}

func Test_PolicyRepository_Policies(t *testing.T) {
repo := &PolicyRepository{policyURL: "http://policy.localhost"}
assert.Equal(
t,
[]market.AccessPolicy{
{ID: "1", Source: "http://policy.localhost/1"},
},
repo.Policies([]string{"1"}),
)

repo = &PolicyRepository{policyURL: "http://policy.localhost/"}
assert.Equal(
t,
[]market.AccessPolicy{
{ID: "2", Source: "http://policy.localhost/2"},
{ID: "3", Source: "http://policy.localhost/3"},
},
repo.Policies([]string{"2", "3"}),
)
}

func Test_PolicyRepository_RulesForPolicy(t *testing.T) {
repo := createEmptyRepo("http://policy.localhost")
policyRules, err := repo.RulesForPolicy(repo.Policy("1"))
assert.EqualError(t, err, "unknown policy: {1 http://policy.localhost/1}")
assert.Equal(t, market.AccessPolicyRuleSet{}, policyRules)

repo = createFullRepo("http://policy.localhost")
policyRules, err = repo.RulesForPolicy(repo.Policy("1"))
assert.NoError(t, err)
assert.Equal(t, policyOneRules, policyRules)
}

func Test_PolicyRepository_RulesForPolicies(t *testing.T) {
repo := createEmptyRepo("http://policy.localhost")
policiesRules, err := repo.RulesForPolicies([]market.AccessPolicy{
repo.Policy("1"),
repo.Policy("3"),
})
assert.EqualError(t, err, "unknown policy: {1 http://policy.localhost/1}")
assert.Equal(t, []market.AccessPolicyRuleSet{}, policiesRules)

repo = createFullRepo("http://policy.localhost")
policiesRules, err = repo.RulesForPolicies([]market.AccessPolicy{
repo.Policy("1"),
repo.Policy("3"),
})
assert.NoError(t, err)
assert.Equal(t, []market.AccessPolicyRuleSet{policyOneRules, policyTreeRules}, policiesRules)
}

func Test_PolicyRepository_AddPolicies_WhenEndpointFails(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()

repo := createEmptyRepo(server.URL)
err := repo.AddPolicies([]market.AccessPolicy{
repo.Policy("1"),
repo.Policy("3"),
})
assert.EqualError(
t,
err,
fmt.Sprintf("initial fetch failed: failed fetch policy rule {1 %s/1}: server response invalid: 500 Internal Server Error (%s/1)", server.URL, server.URL),
)
assert.Equal(t, []policyMetadata{}, repo.policyList)
}

func Test_PolicyRepository_AddPolicies_WhenEndpointSucceeds(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/1" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "1",
"title": "One",
"description": "",
"allow": [
{"type": "identity", "value": "0x1"}
]
}`))
} else if r.URL.Path == "/3" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "3",
"title": "Three",
"description": "",
"allow": [
{"type": "dns_zone", "value": "ipinfo.io"}
]
}`))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()

repo := createEmptyRepo(server.URL)
err := repo.AddPolicies([]market.AccessPolicy{
repo.Policy("1"),
repo.Policy("3"),
})
assert.NoError(t, err)
assert.Equal(
t,
[]policyMetadata{
{policy: repo.Policy("1"), rules: policyOneRules},
{policy: repo.Policy("3"), rules: policyTreeRules},
},
repo.policyList,
)
}

func createEmptyRepo(policyURL string) *PolicyRepository {
return &PolicyRepository{
client: requests.NewHTTPClient("0.0.0.0", time.Second),
policyURL: policyURL + "/",
policyList: []policyMetadata{},
}
}

func createFullRepo(policyURL string) *PolicyRepository {
repo := &PolicyRepository{
client: requests.NewHTTPClient("0.0.0.0", time.Second),
policyURL: policyURL + "/",
policyList: make([]policyMetadata, 3),
}
repo.policyList[0] = policyMetadata{policy: repo.Policy("1"), rules: policyOneRules}
repo.policyList[1] = policyMetadata{policy: repo.Policy("2"), rules: policyTwoRules}
repo.policyList[2] = policyMetadata{policy: repo.Policy("3"), rules: policyTreeRules}
return repo
}

0 comments on commit 47727e5

Please sign in to comment.