Skip to content

Commit

Permalink
Improve policy rules locking during update
Browse files Browse the repository at this point in the history
  • Loading branch information
Waldz committed Jan 24, 2020
1 parent e56cfbc commit 2d91355
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 63 deletions.
4 changes: 2 additions & 2 deletions config/flags_service_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ var (
Name: "agreed-terms-and-conditions",
Usage: "Agree with terms & conditions",
}
// FlagAccessPolicyAddress Policy oracle URL for retrieving access policies.
// FlagAccessPolicyAddress Trust oracle URL for retrieving access policies.
FlagAccessPolicyAddress = cli.StringFlag{
Name: "access-policy.address",
Usage: "URL of policy oracle endpoint for retrieving lists of access policies",
Usage: "URL of trust oracle endpoint for retrieving lists of access policies",
Value: metadata.DefaultNetwork.AccessPolicyOracleAddress,
}
// FlagAccessPolicyList a comma-separated list of access policies that determines allowed identities to use the service.
Expand Down
28 changes: 15 additions & 13 deletions core/policy/policy_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"github.com/mysteriumnetwork/node/logconfig/httptrace"
"github.com/mysteriumnetwork/node/market"
"github.com/mysteriumnetwork/node/requests"
"github.com/pkg/errors"
Expand Down Expand Up @@ -59,15 +60,14 @@ func NewPolicyRepository(client *requests.HTTPClient, policyURL string, interval
}
}

// Start begins fetching proposals to repository
// Start begins fetching policies to repository
func (pr *PolicyRepository) Start() {
pr.fetchShutdown = make(chan struct{})
go pr.fetchLoop()
}

// Stop ends fetching proposals to repository
// Stop ends fetching policies to repository
func (pr *PolicyRepository) Stop() {
close(pr.fetchShutdown)
pr.fetchShutdown <- struct{}{}
}

// Policy converts given value to valid policy rule
Expand All @@ -94,7 +94,8 @@ func (pr *PolicyRepository) Policies(policyIDs []string) []market.AccessPolicy {
// AddPolicies adds given policy to repository. Also syncs policy rules from TrustOracle
func (pr *PolicyRepository) AddPolicies(policies []market.AccessPolicy) error {
pr.policyLock.Lock()
policyListNew := pr.policyList
policyListNew := make([]policyMetadata, len(pr.policyList))
copy(policyListNew, pr.policyList)
pr.policyLock.Unlock()

for _, policy := range policies {
Expand All @@ -105,8 +106,7 @@ func (pr *PolicyRepository) AddPolicies(policies []market.AccessPolicy) error {
}

var err error
err = pr.fetchPolicyRules(&policyListNew[index])
if err != nil {
if err = pr.fetchPolicyRules(&policyListNew[index]); err != nil {
return errors.Wrap(err, "initial fetch failed")
}
}
Expand Down Expand Up @@ -170,17 +170,19 @@ func (pr *PolicyRepository) fetchPolicyRules(policyMeta *policyMetadata) error {
}
defer res.Body.Close()

httptrace.TraceRequestResponse(req, res)

if res.StatusCode == http.StatusNotModified {
return nil
}
if err := requests.ParseResponseError(res); err != nil {
return errors.Wrap(err, "cannot parse proposals response")
return errors.Wrapf(err, "failed to fetch policy rule %s", policyMeta.policy)
}

policyMeta.rules = market.AccessPolicyRuleSet{}
err = pr.client.DoRequestAndParseResponse(req, &policyMeta.rules)
err = requests.ParseResponseJSON(res, &policyMeta.rules)
if err != nil {
return errors.Wrapf(err, "failed fetch policy rule %s", policyMeta.policy)
return errors.Wrapf(err, "failed to parse policy rule %s", policyMeta.policy)
}
policyMeta.eTag = res.Header.Get("ETag")
return nil
Expand All @@ -193,13 +195,13 @@ func (pr *PolicyRepository) fetchLoop() {
return
case <-time.After(pr.fetchInterval):
pr.policyLock.Lock()
policyListActive := pr.policyList
policyListActive := make([]policyMetadata, len(pr.policyList))
copy(policyListActive, pr.policyList)
pr.policyLock.Unlock()

for index := range policyListActive {
var err error
pr.fetchPolicyRules(&policyListActive[index])
if err != nil {
if err = pr.fetchPolicyRules(&policyListActive[index]); err != nil {
log.Warn().Err(err).Msg("synchronise fetch failed")
}

Expand Down
173 changes: 125 additions & 48 deletions core/policy/policy_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,32 @@ var (
{Type: market.AccessPolicyTypeIdentity, Value: "0x1"},
},
}
policyOneRulesUpdated = market.AccessPolicyRuleSet{
ID: "1",
Title: "One (updated)",
Allow: []market.AccessRule{
{Type: market.AccessPolicyTypeIdentity, Value: "0x1"},
},
}

policyTwoRules = market.AccessPolicyRuleSet{
ID: "2",
Title: "Two",
Allow: []market.AccessRule{
{Type: market.AccessPolicyTypeDNSHostname, Value: "ipinfo.io"},
},
}
policyThreeRules = market.AccessPolicyRuleSet{
policyTwoRulesUpdated = market.AccessPolicyRuleSet{
ID: "2",
Title: "Two (updated)",
Allow: []market.AccessRule{
{Type: market.AccessPolicyTypeDNSHostname, Value: "ipinfo.io"},
},
}

policyThreeRulesUpdated = market.AccessPolicyRuleSet{
ID: "3",
Title: "Three",
Title: "Three (updated)",
Allow: []market.AccessRule{
{Type: market.AccessPolicyTypeDNSZone, Value: "ipinfo.io"},
},
Expand Down Expand Up @@ -96,7 +112,7 @@ func Test_PolicyRepository_RulesForPolicy(t *testing.T) {
assert.EqualError(t, err, "unknown policy: {1 http://policy.localhost/1}")
assert.Equal(t, market.AccessPolicyRuleSet{}, policyRules)

repo = createFullRepo("http://policy.localhost")
repo = createFullRepo("http://policy.localhost", time.Minute)
policyRules, err = repo.RulesForPolicy(repo.Policy("1"))
assert.NoError(t, err)
assert.Equal(t, policyOneRules, policyRules)
Expand All @@ -111,7 +127,7 @@ func Test_PolicyRepository_RulesForPolicies(t *testing.T) {
assert.EqualError(t, err, "unknown policy: {1 http://policy.localhost/1}")
assert.Equal(t, []market.AccessPolicyRuleSet{}, policiesRules)

repo = createFullRepo("http://policy.localhost")
repo = createFullRepo("http://policy.localhost", time.Minute)
policiesRules, err = repo.RulesForPolicies([]market.AccessPolicy{
repo.Policy("1"),
repo.Policy("2"),
Expand All @@ -134,37 +150,13 @@ func Test_PolicyRepository_AddPolicies_WhenEndpointFails(t *testing.T) {
assert.EqualError(
t,
err,
fmt.Sprintf("initial fetch failed: failed fetch policy rule {1 %s/1}: server response invalid: 500 Internal Server Error (%s/1)", server.URL, server.URL),
fmt.Sprintf("initial fetch failed: failed to fetch policy rule {1 %s/1}: server response invalid: 500 Internal Server Error (%s/1)", server.URL, server.URL),
)
assert.Equal(t, []policyMetadata{}, repo.policyList)
}

func Test_PolicyRepository_AddPolicies_WhenEndpointSucceeds(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/1" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "1",
"title": "One",
"description": "",
"allow": [
{"type": "identity", "value": "0x1"}
]
}`))
} else if r.URL.Path == "/3" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "3",
"title": "Three",
"description": "",
"allow": [
{"type": "dns_zone", "value": "ipinfo.io"}
]
}`))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
server := mockPolicyServer()
defer server.Close()

repo := createEmptyRepo(server.URL)
Expand All @@ -176,13 +168,13 @@ func Test_PolicyRepository_AddPolicies_WhenEndpointSucceeds(t *testing.T) {
assert.Equal(
t,
[]policyMetadata{
{policy: repo.Policy("1"), rules: policyOneRules},
{policy: repo.Policy("3"), rules: policyThreeRules},
{policy: repo.Policy("1"), rules: policyOneRulesUpdated},
{policy: repo.Policy("3"), rules: policyThreeRulesUpdated},
},
repo.policyList,
)

repo = createFullRepo(server.URL)
repo = createFullRepo(server.URL, time.Minute)
err = repo.AddPolicies([]market.AccessPolicy{
repo.Policy("1"),
repo.Policy("3"),
Expand All @@ -191,29 +183,114 @@ func Test_PolicyRepository_AddPolicies_WhenEndpointSucceeds(t *testing.T) {
assert.Equal(
t,
[]policyMetadata{
{policy: repo.Policy("1"), rules: policyOneRules},
{policy: repo.Policy("1"), rules: policyOneRulesUpdated},
{policy: repo.Policy("2"), rules: policyTwoRules},
{policy: repo.Policy("3"), rules: policyThreeRules},
{policy: repo.Policy("3"), rules: policyThreeRulesUpdated},
},
repo.policyList,
)
}

func Test_PolicyRepository_StartSyncsPolicies(t *testing.T) {
server := mockPolicyServer()
defer server.Close()

repo := createFullRepo(server.URL, 1*time.Millisecond)
repo.Start()
defer repo.Stop()

time.Sleep(10 * time.Millisecond)
policiesRules, err := repo.RulesForPolicies([]market.AccessPolicy{
repo.Policy("1"),
repo.Policy("2"),
})
assert.NoError(t, err)
assert.Equal(t, []market.AccessPolicyRuleSet{policyOneRulesUpdated, policyTwoRulesUpdated}, policiesRules)
}

func Test_PolicyRepository_StartMultipleTimes(t *testing.T) {
repo := NewPolicyRepository(requests.NewHTTPClient("0.0.0.0", time.Second), "http://policy.localhost", time.Minute)
repo.Start()
repo.Stop()

repo.Start()
repo.Stop()
}

func createEmptyRepo(mockServerURL string) *PolicyRepository {
return &PolicyRepository{
client: requests.NewHTTPClient("0.0.0.0", time.Second),
policyURL: mockServerURL + "/",
policyList: []policyMetadata{},
}
return NewPolicyRepository(
requests.NewHTTPClient("0.0.0.0", 100*time.Millisecond),
mockServerURL+"/",
time.Minute,
)
}

func createFullRepo(mockServerURL string) *PolicyRepository {
repo := &PolicyRepository{
client: requests.NewHTTPClient("0.0.0.0", time.Second),
policyURL: mockServerURL + "/",
policyList: make([]policyMetadata, 2),
}
repo.policyList[0] = policyMetadata{policy: repo.Policy("1"), rules: policyOneRules}
repo.policyList[1] = policyMetadata{policy: repo.Policy("2"), rules: policyTwoRules}
func createFullRepo(mockServerURL string, interval time.Duration) *PolicyRepository {
repo := NewPolicyRepository(
requests.NewHTTPClient("0.0.0.0", time.Second),
mockServerURL+"/",
interval,
)
repo.policyList = append(
repo.policyList,
policyMetadata{
policy: repo.Policy("1"),
rules: market.AccessPolicyRuleSet{
ID: "1",
Title: "One",
Allow: []market.AccessRule{
{Type: market.AccessPolicyTypeIdentity, Value: "0x1"},
},
},
},
policyMetadata{
policy: repo.Policy("2"),
rules: market.AccessPolicyRuleSet{
ID: "2",
Title: "Two",
Allow: []market.AccessRule{
{Type: market.AccessPolicyTypeDNSHostname, Value: "ipinfo.io"},
},
},
},
)
return repo
}

func mockPolicyServer() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/1" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "1",
"title": "One (updated)",
"description": "",
"allow": [
{"type": "identity", "value": "0x1"}
]
}`))
} else if r.URL.Path == "/2" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "2",
"title": "Two (updated)",
"description": "",
"allow": [
{"type": "dns_hostname", "value": "ipinfo.io"}
]
}`))
} else if r.URL.Path == "/3" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"id": "3",
"title": "Three (updated)",
"description": "",
"allow": [
{"type": "dns_zone", "value": "ipinfo.io"}
]
}`))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
}

0 comments on commit 2d91355

Please sign in to comment.