Skip to content

Commit

Permalink
Add basic handling of service-agreement request
Browse files Browse the repository at this point in the history
  • Loading branch information
dimroc authored and alexcoventry committed Nov 14, 2018
1 parent 443c106 commit 49ba710
Show file tree
Hide file tree
Showing 25 changed files with 616 additions and 212 deletions.
2 changes: 1 addition & 1 deletion cmd/remote_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestClient_CreateServiceAgreement(t *testing.T) {
t.Run(test.name, func(t *testing.T) {

set := flag.NewFlagSet("create", 0)
set.Parse([]string{test.input})
assert.NoError(t, set.Parse([]string{test.input}))
c := cli.NewContext(nil, set, nil)

err := client.CreateServiceAgreement(c)
Expand Down
29 changes: 26 additions & 3 deletions integration/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func TestIntegration_HelloWorld(t *testing.T) {
eth.Context("app.Start()", func(eth *cltest.EthMock) {
eth.RegisterSubscription("newHeads", newHeads)
eth.Register("eth_getTransactionCount", `0x0100`) // TxManager.ActivateAccount()
eth.Register("eth_getBlockByNumber", models.BlockHeader{})
})
assert.NoError(t, app.Start())
eth.EventuallyAllCalled(t)
Expand Down Expand Up @@ -524,12 +523,36 @@ func TestIntegration_CreateServiceAgreement(t *testing.T) {
app, cleanup := cltest.NewApplicationWithConfigAndUnlockedAccount(config)
defer cleanup()

sa := cltest.FixtureCreateServiceAgreementViaWeb(t, app, "../internal/fixtures/web/hello_world_agreement.json")
eth := app.MockEthClient()
logs := make(chan store.Log, 1)
eth.Context("app.Start()", func(eth *cltest.EthMock) {
eth.RegisterSubscription("logs", logs)
eth.Register("eth_getBlockByNumber", models.BlockHeader{}) // services.(*HeadTracker).fastForwardHeadFromEth
eth.Register("eth_getTransactionCount", `0x100`)
})
assert.NoError(t, app.Start())
sa := cltest.FixtureCreateServiceAgreementViaWeb(t, app, "../internal/fixtures/web/noop_agreement.json")

assert.NotEqual(t, "", sa.ID)
cltest.FindJob(app.Store, sa.JobSpecID)
j := cltest.FindJob(app.Store, sa.JobSpecID)

assert.Equal(t, assets.NewLink(1000000000000000000), sa.Encumbrance.Payment)
assert.Equal(t, uint64(300), sa.Encumbrance.Expiration)

assert.Equal(t, time.Unix(1571523439, 0).UTC(), sa.Encumbrance.EndAt.Time)
assert.NotEqual(t, "", sa.ID)

// Request execution of the job associated with this ServiceAgreement
logs <- cltest.NewServiceAgreementExecutionLog(
j.ID,
cltest.NewAddress(),
cltest.NewAddress(),
1,
`{}`)

runs := cltest.WaitForRuns(t, j, app.Store, 1)
cltest.WaitForJobRunToComplete(t, app.Store, runs[0])

eth.EventuallyAllCalled(t)

}
23 changes: 23 additions & 0 deletions internal/cltest/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,29 @@ func NewRunLog(
}
}

// NewServiceAgreementExecutionLog creates a log event for the given jobid,
// address, block, and json, to simulate a request for execution on a service
// agreement.
func NewServiceAgreementExecutionLog(
jobID string,
logEmitter common.Address,
executionRequester common.Address,
blockHeight int,
serviceAgreementJSON string,
) strpkg.Log {
return strpkg.Log{
Address: logEmitter,
BlockNumber: uint64(blockHeight),
Data: StringToVersionedLogData("internalID", serviceAgreementJSON),
Topics: []common.Hash{
services.ServiceAgreementExecutionLogTopic,
StringToHash(jobID),
executionRequester.Hash(),
minimumContractPayment.ToHash(),
},
}
}

// StringToVersionedLogData encodes a string to the log data field.
func StringToVersionedLogData(internalID, str string) []byte {
buf := bytes.NewBuffer(hexutil.MustDecode(StringToHash(internalID).Hex()))
Expand Down
2 changes: 1 addition & 1 deletion internal/fixtures/web/hello_world_agreement.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"initiators": [{ "type": "web" }],
"initiators": [{ "type": "execagreement" }],
"tasks": [
{ "type": "HttpGet", "params": { "get": "https://bitstamp.net/api/ticker/" }},
{ "type": "JsonParse", "params": { "path": ["last"] }},
Expand Down
8 changes: 8 additions & 0 deletions internal/fixtures/web/noop_agreement.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"initiators": [{ "type": "execagreement" }],
"tasks": [{ "type": "noOp" }],
"payment": "1000000000000000000",
"expiration": 300,
"oracles": ["0x3cb8e3FD9d27e39a5e9e6852b0e96160061fd4ea", "0xa0788FC17B1dEe36f057c42B6F373A34B014687e"],
"endAt": "2019-10-19T22:17:19Z"
}
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"start": "cd gui && react-static start",
"build": "cd gui && react-static build",
"serve": "cd gui && serve dist -p 3000",
"lint": "standard --fix --verbose | snazzy",
"lint-ci": "standard --verbose | snazzy",
"lint": "standard --fix --verbose --global web3 --global assert --global context --global contract --global artifacts | snazzy",
"lint-ci": "standard --verbose --global web3 --global assert --global context --global contract --global artifacts | snazzy",
"test": "NODE_ENV=test jest",
"test-ci": "NODE_ENV=test yarn jest gui/ --coverage --reporters jest-silent-reporter",
"test-e2e": "NODE_ENV=test yarn jest e2e/ -i",
Expand Down
113 changes: 77 additions & 36 deletions services/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ const (
// RunLogTopic is the signature for the RunRequest(...) event
// which Chainlink RunLog initiators watch for.
// See https://github.com/smartcontractkit/chainlink/blob/master/solidity/contracts/Oracle.sol
// If updating this, be sure to update the truffle suite's "expected event signature" test.
var RunLogTopic = mustHash("RunRequest(bytes32,address,uint256,uint256,uint256,bytes)")

// ServiceAgreementExecutionLogTopic is the signature for the
// Coordinator.RunRequest(...) events which Chainlink nodes watch for. See
// https://github.com/smartcontractkit/chainlink/blob/master/solidity/contracts/Coordinator.sol#RunRequest
var ServiceAgreementExecutionLogTopic = mustHash("ServiceAgreementExecution(bytes32,address,uint256,uint256,uint256,bytes)")

// OracleFulfillmentFunctionID is the function id of the oracle fulfillment
// method used by EthTx: bytes4(keccak256("fulfillData(uint256,bytes32)"))
// Kept in sync with solidity/contracts/Oracle.sol
Expand All @@ -50,33 +54,52 @@ type JobSubscription struct {
unsubscribers []Unsubscriber
}

// StartJobSubscription is the constructor of JobSubscription that to starts
// listening to and keeps track of event logs corresponding to a job.
func StartJobSubscription(job models.JobSpec, head *models.IndexableBlockNumber, store *strpkg.Store) (JobSubscription, error) {
var merr error
var initSubs []Unsubscriber
for _, initr := range job.InitiatorsFor(models.InitiatorEthLog) {
sub, err := StartEthLogSubscription(initr, job, head, store)
merr = multierr.Append(merr, err)
if err == nil {
initSubs = append(initSubs, sub)
}
}

for _, initr := range job.InitiatorsFor(models.InitiatorRunLog) {
sub, err := StartRunLogSubscription(initr, job, head, store)
merr = multierr.Append(merr, err)
// scanInitiatorsLogsStartSubscriptions attempts to subscribe to each type of
// initiator log, and adds the unsubscriber to unsubscribers if successful, or
// the resulting error to merr, if not. Returns the extended unsubscribers, merr
func scanInitiatorLogsStartSubscriptions(
initiators []models.Initiator,
subscribe func(models.Initiator) (Unsubscriber, error),
unsubscribers []Unsubscriber, merr error) ([]Unsubscriber, error) {
for _, initr := range initiators {
unsubscriber, err := subscribe(initr)
if err == nil {
initSubs = append(initSubs, sub)
unsubscribers = append(unsubscribers, unsubscriber)
} else {
merr = multierr.Append(merr, err)
}
}
return unsubscribers, merr
}

if len(initSubs) == 0 {
return JobSubscription{}, multierr.Append(merr, errors.New("Job must have a valid log initiator"))
}

js := JobSubscription{Job: job, unsubscribers: initSubs}
return js, merr
// StartJobSubscription constructs a JobSubscription which listens for and
// tracks event logs corresponding to the specified job. Ignores any errors if
// there is at least one successful subscription to an initiator log.
func StartJobSubscription(job models.JobSpec, head *models.IndexableBlockNumber, store *strpkg.Store) (JobSubscription, error) {
var merr error
var unsubscribers []Unsubscriber
unsubscribers, merr = scanInitiatorLogsStartSubscriptions(
job.InitiatorsFor(models.InitiatorEthLog),
func(initr models.Initiator) (Unsubscriber, error) {
return StartEthLogSubscription(initr, job, head, store)
}, unsubscribers, merr)
unsubscribers, merr = scanInitiatorLogsStartSubscriptions(
job.InitiatorsFor(models.InitiatorRunLog),
func(initr models.Initiator) (Unsubscriber, error) {
return StartRunLogSubscription(initr, job, head, store)
}, unsubscribers, merr)
unsubscribers, merr = scanInitiatorLogsStartSubscriptions(
job.InitiatorsFor(models.InitiatorServiceAgreementExecutionLog),
func(initr models.Initiator) (Unsubscriber, error) {
return StartSALogSubscription(initr, job, head, store)
}, unsubscribers, merr)

if len(unsubscribers) == 0 {
return JobSubscription{}, multierr.Append(
merr, errors.New(
"unable to subscribe to any logs, check earlier errors in this message, and the initiator types"))
}
return JobSubscription{Job: job, unsubscribers: unsubscribers}, merr
}

// Unsubscribe stops the subscription and cleans up associated resources.
Expand Down Expand Up @@ -149,26 +172,41 @@ func (sub InitiatorSubscription) dispatchLog(log strpkg.Log) {
}

// TopicFiltersForRunLog generates the two variations of RunLog IDs that could
// possibly be entered. There is the ID, hex encoded and the ID zero padded.
func TopicFiltersForRunLog(jobID string) [][]common.Hash {
// possibly be entered on a RunLog or a ServiceAgreementExecutionLog. There is the ID,
// hex encoded and the ID zero padded.
func TopicFiltersForRunLog(logTopic common.Hash, jobID string) [][]common.Hash {
hexJobID := common.BytesToHash([]byte(jobID))
jobIDZeroPadded := common.BytesToHash(common.RightPadBytes(hexutil.MustDecode("0x"+jobID), utils.EVMWordByteLen))
// RunLogTopic AND (0xHEXJOBID OR 0xJOBID0padded)
return [][]common.Hash{{RunLogTopic}, {hexJobID, jobIDZeroPadded}}
return [][]common.Hash{{logTopic}, {hexJobID, jobIDZeroPadded}}
}

// StartRunLogSubscription starts an InitiatorSubscription tailored for use with RunLogs.
func StartRunLogSubscription(initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *strpkg.Store) (Unsubscriber, error) {
filter := NewInitiatorFilterQuery(initr, head, TopicFiltersForRunLog(job.ID))
return NewInitiatorSubscription(initr, job, store, filter, receiveRunLog)
// StartRunLogSubscription starts an InitiatorSubscription tailored for use with
// RunLogs
func StartRunLogSubscription(initr models.Initiator, job models.JobSpec,
head *models.IndexableBlockNumber, store *strpkg.Store) (Unsubscriber, error) {
filter := NewInitiatorFilterQuery(initr, head, TopicFiltersForRunLog(
RunLogTopic, job.ID))
return NewInitiatorSubscription(initr, job, store, filter, receiveRunOrSALog)
}

// StartEthLogSubscription starts an InitiatorSubscription tailored for use with EthLogs.
func StartEthLogSubscription(initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *strpkg.Store) (Unsubscriber, error) {
func StartEthLogSubscription(initr models.Initiator, job models.JobSpec,
head *models.IndexableBlockNumber, store *strpkg.Store) (Unsubscriber, error) {
filter := NewInitiatorFilterQuery(initr, head, nil)
return NewInitiatorSubscription(initr, job, store, filter, receiveEthLog)
}

// StartSALogSubscription starts an InitiatorSubscription tailored for use with
// ServiceAgreementExecutionLogs.
func StartSALogSubscription(initr models.Initiator, job models.JobSpec,
head *models.IndexableBlockNumber, store *strpkg.Store) (Unsubscriber, error) {
filter := NewInitiatorFilterQuery(initr, head, TopicFiltersForRunLog(
ServiceAgreementExecutionLogTopic, job.ID))
return NewInitiatorSubscription(initr, job, store, filter,
receiveRunOrSALog)
}

func loggerLogListening(initr models.Initiator, blockNumber *big.Int) {
msg := fmt.Sprintf(
"Listening for %v from block %v for address %v for job %v",
Expand All @@ -179,8 +217,9 @@ func loggerLogListening(initr models.Initiator, blockNumber *big.Int) {
logger.Infow(msg)
}

// Parse the log and run the job specific to this initiator log event.
func receiveRunLog(le InitiatorSubscriptionLogEvent) {
// receiveRunOrSALog parses the log and runs the job indicated by a RunLog or
// ServiceAgreementExecutionLog. (Both log events have the same format.)
func receiveRunOrSALog(le InitiatorSubscriptionLogEvent) {
if !le.ValidateRunLog() {
return
}
Expand All @@ -195,7 +234,8 @@ func receiveRunLog(le InitiatorSubscriptionLogEvent) {
runJob(le, data, le.Initiator)
}

// Parse the log and run the job specific to this initiator log event.
// receiveEthLog parses the log and runs the job specific to this initiator log
// event.
func receiveEthLog(le InitiatorSubscriptionLogEvent) {
le.ToDebug()
data, err := le.EthLogJSON()
Expand Down Expand Up @@ -450,7 +490,8 @@ func decodeABIToJSON(data []byte) (models.JSON, error) {
}

func isRunLog(log strpkg.Log) bool {
return len(log.Topics) == 4 && log.Topics[0] == RunLogTopic
return len(log.Topics) == 4 && (log.Topics[0] == RunLogTopic ||
log.Topics[0] == ServiceAgreementExecutionLogTopic)
}

func jobIDFromHexEncodedTopic(log strpkg.Log) (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion services/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestTopicFiltersForRunLog(t *testing.T) {
t.Parallel()

jobID := "4a1eb0e8df314cb894024a38991cff0f"
topics := services.TopicFiltersForRunLog(jobID)
topics := services.TopicFiltersForRunLog(services.RunLogTopic, jobID)

assert.Equal(t, 2, len(topics))
assert.Equal(
Expand Down
14 changes: 12 additions & 2 deletions services/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ func ValidateInitiator(i models.Initiator, j models.JobSpec) error {
return validateRunAtInitiator(i, j)
case models.InitiatorCron:
return validateCronInitiator(i)
default:
return models.NewJSONAPIErrorsWith(fmt.Sprintf("type %v does not exist", i.Type))
case models.InitiatorServiceAgreementExecutionLog:
return validateServiceAgreementInitiator(i, j)
case models.InitiatorWeb:
fallthrough
case models.InitiatorRunLog:
fallthrough
case models.InitiatorEthLog:
return nil
default:
return models.NewJSONAPIErrorsWith(fmt.Sprintf("type %v does not exist", i.Type))
}
}

Expand All @@ -86,6 +88,14 @@ func validateCronInitiator(i models.Initiator) error {
return nil
}

func validateServiceAgreementInitiator(i models.Initiator, j models.JobSpec) error {
fe := models.NewJSONAPIErrors()
if len(j.Initiators) != 1 {
fe.Add("ServiceAgreement should have at most one initiator")
}
return fe.CoerceEmptyToNil()
}

func validateTask(task models.TaskSpec, store *store.Store) error {
_, err := adapters.For(task, store)
return err
Expand Down
8 changes: 8 additions & 0 deletions services/validators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ func TestValidateServiceAgreement(t *testing.T) {
{"without being listed as an oracle", basic.Add("oracles", []string{}), true},
{"past allowed end at", basic.Add("endAt", "3000-06-19T22:17:19Z"), true},
{"before allowed end at", basic.Add("endAt", "2018-06-19T22:17:19Z"), true},
{"more than one initiator should fail",
basic.Add("initiators",
[]models.Initiator{
{0, "", models.InitiatorServiceAgreementExecutionLog,
models.InitiatorParams{}},
{0, "", models.InitiatorWeb, models.InitiatorParams{}},
}),
true},
}

for _, test := range tests {
Expand Down
37 changes: 37 additions & 0 deletions solidity/contracts/ChainlinkLib.sol
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ pragma solidity 0.4.24;
import "solidity-cborutils/contracts/CBOR.sol";

library ChainlinkLib {
bytes internal constant reqSig = "requestData(address,uint256,uint256,bytes32,address,bytes4,bytes32,bytes)";
bytes4 internal constant oracleRequestDataFid = bytes4(keccak256(reqSig));
bytes internal constant coordSig = "executeServiceAgreement(address,uint256,uint256,bytes32,address,bytes4,bytes32,bytes)";
bytes4 internal constant coordinatorRequestDataFid = bytes4(keccak256(coordSig));

using CBOR for Buffer.buffer;

struct Run {
Expand All @@ -27,6 +32,38 @@ library ChainlinkLib {
return self;
}

function encodeForOracle(
Run memory self,
uint256 _clArgsVersion
) internal pure returns (bytes memory) {
return abi.encodeWithSelector(
oracleRequestDataFid,
0, // overridden by onTokenTransfer
0, // overridden by onTokenTransfer
_clArgsVersion,
self.specId,
self.callbackAddress,
self.callbackFunctionId,
self.requestId,
self.buf.buf);
}

function encodeForCoordinator(
Run memory self,
uint256 _clArgsVersion
) internal pure returns (bytes memory) {
return abi.encodeWithSelector(
coordinatorRequestDataFid, // need to change
0, // overridden by onTokenTransfer
0, // overridden by onTokenTransfer
_clArgsVersion,
self.specId,
self.callbackAddress,
self.callbackFunctionId,
self.requestId,
self.buf.buf);
}

function add(Run memory self, string _key, string _value)
internal pure
{
Expand Down
Loading

0 comments on commit 49ba710

Please sign in to comment.