/
ocean.py
270 lines (218 loc) 路 9.06 KB
/
ocean.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
#
# Copyright 2023 Ocean Protocol Foundation
# SPDX-License-Identifier: Apache-2.0
#
"""Ocean module."""
import json
import logging
from typing import Dict, List, Optional, Type, Union
from enforce_typing import enforce_types
from web3.datastructures import AttributeDict
from ocean_lib.assets.ddo import DDO
from ocean_lib.data_provider.data_service_provider import DataServiceProvider
from ocean_lib.example_config import config_defaults
from ocean_lib.models.compute_input import ComputeInput
from ocean_lib.models.data_nft import DataNFT
from ocean_lib.models.data_nft_factory import DataNFTFactoryContract
from ocean_lib.models.datatoken_base import DatatokenBase
from ocean_lib.models.df.df_rewards import DFRewards
from ocean_lib.models.df.df_strategy_v1 import DFStrategyV1
from ocean_lib.models.dispenser import Dispenser
from ocean_lib.models.factory_router import FactoryRouter
from ocean_lib.models.fixed_rate_exchange import FixedRateExchange
from ocean_lib.models.ve.smart_wallet_checker import SmartWalletChecker
from ocean_lib.models.ve.ve_allocate import VeAllocate
from ocean_lib.models.ve.ve_delegation import VeDelegation
from ocean_lib.models.ve.ve_fee_distributor import VeFeeDistributor
from ocean_lib.models.ve.ve_fee_estimate import VeFeeEstimate
from ocean_lib.models.ve.ve_ocean import VeOcean
from ocean_lib.ocean.ocean_assets import OceanAssets
from ocean_lib.ocean.ocean_compute import OceanCompute
from ocean_lib.ocean.util import get_address_of_type, get_ocean_token_address
from ocean_lib.services.service import Service
from ocean_lib.structures.algorithm_metadata import AlgorithmMetadata
logger = logging.getLogger("ocean")
class Ocean:
"""The Ocean class is the entry point into Ocean Protocol."""
@enforce_types
def __init__(self, config_dict: Dict, data_provider: Optional[Type] = None) -> None:
"""Initialize Ocean class.
Usage: Make a new Ocean instance
`ocean = Ocean({...})`
This class provides the main top-level functions in ocean protocol:
1. Publish assets metadata and associated services
- Each asset is assigned a unique DID and a DID Document (DDO)
- The DDO contains the asset's services including the metadata
- The DID is registered on-chain with a URL of the metadata store
to retrieve the DDO from
`ddo = ocean.assets.create(metadata, publisher_wallet)`
2. Discover/Search ddos via the current configured metadata store (Aquarius)
- Usage:
`ddos_list = ocean.assets.search('search text')`
An instance of Ocean is parameterized by a `Config` instance.
:param config_dict: variable definitions
:param data_provider: `DataServiceProvider` instance
"""
config_errors = {}
for key, value in config_defaults.items():
if key not in config_dict:
config_errors[key] = "required"
continue
if not isinstance(config_dict[key], type(value)):
config_errors[key] = f"must be {type(value).__name__}"
if "web3_instance" not in config_dict:
config_errors["web3_instance"] = "required"
if "NETWORK_NAME" not in config_dict:
config_errors["NETWORK_NAME"] = "required"
if config_errors:
raise Exception(json.dumps(config_errors))
self.config_dict = config_dict
if not data_provider:
data_provider = DataServiceProvider
self.assets = OceanAssets(self.config_dict, data_provider)
self.compute = OceanCompute(self.config_dict, data_provider)
logger.debug("Ocean instance initialized: ")
# ======================================================================
# OCEAN
@property
@enforce_types
def OCEAN_address(self) -> str:
return get_ocean_token_address(self.config)
@property
@enforce_types
def OCEAN_token(self) -> DatatokenBase:
return DatatokenBase.get_typed(self.config, self.OCEAN_address)
@property
@enforce_types
def OCEAN(self): # alias for OCEAN_token
return self.OCEAN_token
# ======================================================================
# objects for singleton smart contracts
@property
@enforce_types
def data_nft_factory(self) -> DataNFTFactoryContract:
return DataNFTFactoryContract(self.config, self._addr("ERC721Factory"))
@property
@enforce_types
def dispenser(self) -> Dispenser:
return Dispenser(self.config, self._addr("Dispenser"))
@property
@enforce_types
def fixed_rate_exchange(self) -> FixedRateExchange:
return FixedRateExchange(self.config, self._addr("FixedPrice"))
@property
@enforce_types
def factory_router(self) -> FactoryRouter:
return FactoryRouter(self.config, self._addr("Router"))
# ======================================================================
# token getters
@enforce_types
def get_nft_token(self, token_address: str) -> DataNFT:
"""
:param token_address: Token contract address, str
:return: `DataNFT` instance
"""
return DataNFT(self.config, token_address)
@enforce_types
def get_datatoken(self, token_address: str) -> DatatokenBase:
"""
:param token_address: Token contract address, str
:return: `Datatoken1` or `Datatoken2` instance
"""
return DatatokenBase.get_typed(self.config, token_address)
# ======================================================================
# orders
@enforce_types
def get_user_orders(self, address: str, datatoken: str) -> List[AttributeDict]:
"""
:return: List of orders `[Order]`
"""
dt = DatatokenBase.get_typed(self.config_dict, datatoken)
_orders = []
for log in dt.get_start_order_logs(address):
a = dict(log.args.items())
a["amount"] = int(log.args.amount)
a["address"] = log.address
a["transactionHash"] = log.transactionHash
a = AttributeDict(a.items())
_orders.append(a)
return _orders
# ======================================================================
# provider fees
@enforce_types
def retrieve_provider_fees(
self, ddo: DDO, access_service: Service, publisher_wallet
) -> dict:
initialize_response = DataServiceProvider.initialize(
ddo.did, access_service, consumer_address=publisher_wallet.address
)
initialize_data = initialize_response.json()
provider_fees = initialize_data["providerFee"]
return provider_fees
@enforce_types
def retrieve_provider_fees_for_compute(
self,
datasets: List[ComputeInput],
algorithm_data: Union[ComputeInput, AlgorithmMetadata],
consumer_address: str,
compute_environment: str,
valid_until: int,
) -> dict:
initialize_compute_response = DataServiceProvider.initialize_compute(
[x.as_dictionary() for x in datasets],
algorithm_data.as_dictionary(),
datasets[0].service.service_endpoint,
consumer_address,
compute_environment,
valid_until,
)
return initialize_compute_response.json()
# ======================================================================
# DF/VE properties (alphabetical)
@property
@enforce_types
def df_rewards(self) -> DFRewards:
return DFRewards(self.config, self._addr("DFRewards"))
@property
@enforce_types
def df_strategy_v1(self) -> DFStrategyV1:
return DFStrategyV1(self.config, self._addr("DFStrategyV1"))
@property
@enforce_types
def smart_wallet_checker(self) -> SmartWalletChecker:
return SmartWalletChecker(self.config, self._addr("SmartWalletChecker"))
@property
@enforce_types
def ve_allocate(self) -> VeAllocate:
return VeAllocate(self.config, self._addr("veAllocate"))
@property
@enforce_types
def ve_delegation(self) -> VeDelegation:
return VeDelegation(self.config, self._addr("veDelegation"))
@property
@enforce_types
def ve_fee_distributor(self) -> VeFeeDistributor:
return VeFeeDistributor(self.config, self._addr("veFeeDistributor"))
@property
@enforce_types
def ve_fee_estimate(self) -> VeFeeEstimate:
return VeFeeEstimate(self.config, self._addr("veFeeEstimate"))
@property
@enforce_types
def ve_ocean(self) -> VeOcean:
return VeOcean(self.config, self._addr("veOCEAN"))
@property
@enforce_types
def veOCEAN(self) -> VeOcean: # alias for ve_ocean
return self.ve_ocean
# ======================================================================
# helpers
@property
@enforce_types
def config(self) -> dict: # alias for config_dict
return self.config_dict
@enforce_types
def _addr(self, type_str: str) -> str:
return get_address_of_type(self.config, type_str)
def wallet_balance(self, w):
return self.config["web3_instance"].eth.get_balance(w.address)