Skip to content

Commit

Permalink
Merge branch 'master' of github.com:ooici/coi-services
Browse files Browse the repository at this point in the history
  • Loading branch information
rajvikram committed Mar 16, 2012
2 parents b7d159d + a08e357 commit 301113e
Show file tree
Hide file tree
Showing 13 changed files with 539 additions and 205 deletions.
226 changes: 184 additions & 42 deletions ion/core/containerui.py

Large diffs are not rendered by default.

23 changes: 12 additions & 11 deletions ion/processes/bootstrap/ion_loader.py
Expand Up @@ -474,15 +474,15 @@ def _load_PlatformDevice(self, row):
for ass_id in ass_ids:
ims_client.deploy_platform_device_to_logical_platform(res_id, self.resource_ids[ass_id])

ass_id = row["primary_deployment_lp_id"]
if ass_id:
ims_client.deploy_as_primary_platform_device_to_logical_platform(res_id, self.resource_ids[ass_id])

ass_id = row["platform_model_id"]
if ass_id:
ims_client.assign_platform_model_to_platform_device(self.resource_ids[ass_id], res_id)

self._resource_advance_lcs(row, res_id)
self._resource_advance_lcs(row, res_id, "PlatformDevice")

ass_id = row["primary_deployment_lp_id"]
if ass_id:
ims_client.deploy_as_primary_platform_device_to_logical_platform(res_id, self.resource_ids[ass_id])

def _load_InstrumentDevice(self, row):
res_id = self._basic_resource_create(row, "InstrumentDevice", "id/",
Expand All @@ -495,10 +495,6 @@ def _load_InstrumentDevice(self, row):
for ass_id in ass_ids:
ims_client.deploy_instrument_device_to_logical_instrument(res_id, self.resource_ids[ass_id])

ass_id = row["primary_deployment_li_id"]
if ass_id:
ims_client.deploy_as_primary_instrument_device_to_logical_instrument(res_id, self.resource_ids[ass_id])

ass_id = row["instrument_model_id"]
if ass_id:
ims_client.assign_instrument_model_to_instrument_device(self.resource_ids[ass_id], res_id)
Expand All @@ -507,7 +503,12 @@ def _load_InstrumentDevice(self, row):
if ass_id:
ims_client.assign_instrument_device_to_platform_device(res_id, self.resource_ids[ass_id])

self._resource_advance_lcs(row, res_id)
self._resource_advance_lcs(row, res_id, "InstrumentDevice")

ass_id = row["primary_deployment_li_id"]
if ass_id:
ims_client.deploy_as_primary_instrument_device_to_logical_instrument(res_id, self.resource_ids[ass_id])


def _load_InstrumentAgent(self, row):
res_id = self._basic_resource_create(row, "InstrumentAgent", "ia/",
Expand All @@ -521,7 +522,7 @@ def _load_InstrumentAgent(self, row):
for im_id in im_ids:
svc_client.assign_instrument_model_to_instrument_agent(self.resource_ids[im_id], res_id)

self._resource_advance_lcs(row, res_id)
self._resource_advance_lcs(row, res_id, "InstrumentAgent")

def _load_InstrumentAgentInstance(self, row):
ia_id = row["instrument_agent_id"]
Expand Down
2 changes: 1 addition & 1 deletion ion/processes/data/cache_launcher.py
Expand Up @@ -57,4 +57,4 @@ def on_start(self):
)


tms_cli.activate_transform(transform_id=transform_id)
tms_cli.activate_transform(transform_id=transform_id)
21 changes: 11 additions & 10 deletions ion/processes/data/ingestion/ingestion_launcher.py
Expand Up @@ -4,27 +4,28 @@
@file ion/processes/data/ingestion/ingestion_launcher.py
@description Ingestion Launcher
"""
from interface.objects import CouchStorage
from interface.objects import CouchStorage, HdfStorage

from interface.services.dm.iingestion_management_service import IngestionManagementServiceClient
from pyon.service.service import BaseService
from pyon.ion.process import StandaloneProcess

class IngestionLauncher(BaseService):
class IngestionLauncher(StandaloneProcess):
def on_start(self):
super(IngestionLauncher,self).on_start()
exchange_point = self.CFG.get('process',{}).get('exchange_point','science_data')
couch_storage = self.CFG.get('process',{}).get('couch_storage',{})
couch_storage = CouchStorage(**couch_storage)
hdf_storage = self.CFG.get('process',{}).get('hdf_storage',{})
number_of_workers = self.CFG.get('process',{}).get('number_of_workers',2)

exchange_point = self.CFG.get_safe('ingestion.exchange_point','science_data')
couch_opts = self.CFG.get_safe('ingestion.couch_storage',{})
couch_storage = CouchStorage(**couch_opts)
hdf_opts = self.CFG.get_safe('ingestion.hdf_storage',{})
hdf_storage = HdfStorage(**hdf_opts)
number_of_workers = self.CFG.get_safe('ingestion.number_of_workers',2)

ingestion_management_service = IngestionManagementServiceClient(node=self.container.node)
ingestion_id = ingestion_management_service.create_ingestion_configuration(
exchange_point_id=exchange_point,
couch_storage=couch_storage,
hdf_storage=hdf_storage,
number_of_workers=number_of_workers,
default_policy={}
number_of_workers=number_of_workers
)
ingestion_management_service.activate_ingestion_configuration(ingestion_id)

88 changes: 77 additions & 11 deletions ion/processes/data/simple_dispatcher.py
Expand Up @@ -5,9 +5,43 @@
@file ion/processes/data/ctd_stream_publisher.py
@description A simple example process which publishes prototype ctd data
To Run:
bin/pycc --rel res/deploy/r2dm.yml dispatcher.data_product_id='abc123'
cc.spawn_process(name="dispatcher_process", module="ion.processes.data.simple_dispatcher", cls="SimpleDispatcher")
$ bin/pycc --rel res/deploy/r2sa.yml
### To Create a data product and get some data on the stream copy this and use %paste
"""
from interface.services.sa.idata_product_management_service import DataProductManagementServiceClient
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient
from prototype.sci_data.stream_defs import SBE37_CDM_stream_definition
from interface.objects import DataProduct
from pyon.public import PRED, log
definition = SBE37_CDM_stream_definition()
dpm_cli = DataProductManagementServiceClient(node=cc.node)
pubsub_cli = PubsubManagementServiceClient(node=cc.node)
rr_cli = ResourceRegistryServiceClient(node=cc.node)
stream_def_id = pubsub_cli.create_stream_definition(container=definition)
dp = DataProduct(name='dp1')
data_product_id = dpm_cli.create_data_product(data_product=dp, stream_definition_id=stream_def_id)
stream_ids, garbage = rr_cli.find_objects(data_product_id, PRED.hasStream, id_only=True)
stream_id = stream_ids[0]
pid = cc.spawn_process(name='ctd_test',module='ion.processes.data.ctd_stream_publisher',cls='SimpleCtdPublisher',config={'process':{'stream_id':stream_id}})
"""
Get the data product id from the variable and use it to start a separate container running the dispatcher
To run the dispatcher:
bin/pycc --rel res/deploy/examples/dispatcher.yml dispatcher.data_product_id=<data product id>
'''


Expand All @@ -16,8 +50,9 @@
from pyon.ion.process import StandaloneProcess
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceProcessClient
from pyon.public import log, StreamSubscriberRegistrar, PRED

from pyon.util.containers import get_datetime
from interface.objects import StreamQuery
from prototype.sci_data.stream_parser import PointSupplementStreamParser

from interface.services.dm.ipubsub_management_service import PubsubManagementServiceProcessClient
import os
Expand All @@ -34,17 +69,16 @@ def on_start(self):
# Get the stream(s)
data_product_id = self.CFG.get_safe('dispatcher.data_product_id','')

log.warn('CFG: %s' % str(self.CFG))

stream_ids,_ = rr_cli.find_objects(subject=data_product_id, predicate=PRED.hasStream, id_only=True)

log.warn('Got Stream Ids: "%s"', stream_ids)
assert stream_ids, 'No streams found for this data product!'

query = StreamQuery(stream_ids)
query = StreamQuery(stream_ids=stream_ids)

exchange_name = 'dispatcher_%s' % str(os.getpid())

self.ctd_subscription_id = pubsub_cli.create_subscription(
subscription_id = pubsub_cli.create_subscription(
query = query,
exchange_name = exchange_name,
name = "SampleSubscription",
Expand All @@ -53,9 +87,41 @@ def on_start(self):

stream_subscriber = StreamSubscriberRegistrar(process=self, node=self.container.node)

def message_received(m, h):
log.info('Received a message from the stream')

stream_defs = {}

def message_received(granule, h):

stream_id = granule.stream_resource_id

data_stream_id = granule.data_stream_id
data_stream = granule.identifiables[data_stream_id]

tstamp = get_datetime(data_stream.timestamp.value)

records = granule.identifiables['record_count'].value


log.warn('Received a message from stream %s with time stamp %s and %d records' % (stream_id, tstamp, records))


if stream_id not in stream_defs:
stream_defs[stream_id] = pubsub_cli.find_stream_definition(stream_id, id_only=False).container
stream_def = stream_defs.get(stream_id)

sp = PointSupplementStreamParser(stream_definition=stream_def, stream_granule=granule)

last_data = {}
for field in sp.list_field_names():
last_data[field] = sp.get_values(field)[-1]

log.warn('Last values in the message: %s' % str(last_data))



subscriber = stream_subscriber.create_subscriber(exchange_name=exchange_name, callback=message_received)
subscriber.start()
subscriber.start()

pubsub_cli.activate_subscription(subscription_id)


0 comments on commit 301113e

Please sign in to comment.