-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
questions: how can I avoid main thread exits when run actors with multiprocQueueBase #63
Comments
Hi @carlos-lm , Thanks for the appreciation: it's nice to know this library has been useful. You will almost certainly want to use one of the multi-process bases; I would actually recommend the Because a multiproc system has multiple processes, logging is coordinated centrally and written to files as specified by the Hopefully this will help for your needs. |
thank you for the detailed answer @kquick I was checking the hellogoodbye sample and seems that change between system is just pass "multiprocTCPBase" as argument to ActorSystem... I did that actor_mode = "simpleSystemBase" if self.test else "multiprocTCPBase"
fetcher = ActorSystem(actor_mode,logDefs=logcfg).createActor(DataFetcher) but I'm getting this error
another question...I'm passing a complex object in my message...I've my doubts with this but for thank you |
That warning is described here: kquick#11 Thespian uses the Python |
@kquick right now my code works ok when I run with simpleSystemBase, but when I move to another system seems that the actors not work but I don't get any error or message... this is my code, this actor create another actors and send the message import logging
from bqbbdb.basedb import BaseDB
from thespian.actors import Actor, ActorSystem
from typing import List
from bqbbexchanges.exchange.actors.data_fetcher import DataFetcher
from bqbbexchanges.exchange.actors.message import Message, SetPropertiesMessage, GetHistoricalData, \
ResponseDataFetch, RequestDataFetch
from bqbbexchanges.settings import Settings, settings
from bqbbexchanges.utils.has_key import has_key
from bqbbexchanges.utils.my_logging import logcfg
from bqbbexchanges.utils.types import OHLCVData
class Dispatcher(Actor):
"""Dispatcher
this actor create new actors according to the tickers that needs to fetch, when the data_fetcher
return a response, this handle the response, that is, save in the db, send new data to the data_fetcher,etc
"""
db: BaseDB = None
exchange = None
# some code here....
def receiveMessage(self, message: Message, sender):
if isinstance(message, SetPropertiesMessage):
logging.info("initializing dispatcher with required db and exchange")
self.exchange = message.exchange
self.timeframes_mapper = message.timeframes_mapper
self.test = message.test
# here I pass the exchange and DB handler to the actor in order that this can make request to get data and store it
self.db = message.DB
self.exchange_prefix = message.exchange_prefix
# self.send(sender, f"Hello, World! {self.name}")
elif isinstance(message, GetHistoricalData):
logging.debug("start getting historical data")
# THIS IS THE IMPORTANT CODE ==================================== <----------
for ticker in message.tickers:
actor_mode = "simpleSystemBase" if self.test else "multiprocTCPBase"
fetcher = ActorSystem(actor_mode, logDefs=logcfg).createActor(DataFetcher)
init_message = SetPropertiesMessage()
init_message.exchange = self.exchange
init_message.timeframes_mapper = self.timeframes_mapper
self.send(fetcher, init_message)
self.send(fetcher,
RequestDataFetch(ticker_and_date=ticker, wait_time=0))
elif isinstance(message, ResponseDataFetch):
self._handle_received_data(message, sender)
else:
logging.error("Dispatcher> UNKNOWN MESSAGE, returning properties")
message = SetPropertiesMessage()
message.DB = self.db
message.exchange = self.exchange
message.timeframes_mapper = self.timeframes_mapper
message.stored_data = self.stored_data
self.send(sender, message)
# dispatcher = ActorSystem().createActor(Dispatcher)
# print(ActorSystem().ask(dispatcher, 'hi', 1)) and this are the fetcher actors import logging
import time
from thespian.actors import Actor
from typing import List
from bqbbexchanges.exchange.actors.message import Message, RequestDataFetch, SetPropertiesMessage, ResponseDataFetch
from bqbbexchanges.settings import settings
from bqbbexchanges.utils.types import OHLCVData
class DataFetcher(Actor):
"""
This actor only handle data fetcher for historical data,
if there is a problem retrieving the data returns to the caller an empty list
"""
exchange = None
timeframes_mapper = {"1M": "1M", "1h": "1h",
"1m": "1m"} # override this method allows you use another symbols for timeframe
def __init__(self):
super().__init__()
print("initializing datafetcher")
def _format_ohlcv_data(self, tick: list) -> OHLCVData:
return {
"time": str(tick[0]),
"open": tick[1],
"high": tick[2],
"low": tick[3],
"close": tick[4],
"volume": tick[5]
}
def _fetch_ohlcv(self, pair, unix_date: int) -> List[OHLCVData]:
try:
if self.exchange is None or self.timeframes_mapper is None:
logging.error("not initialized properties, please initialize first")
if not self.exchange.has["fetchOHLCV"]:
error = "This exchange doesn't have fetchOHLCV method implemented"
logging.error(error)
raise NotImplementedError(error)
logging.debug(f"fetching data {pair} {unix_date}")
data = self.exchange.fetch_ohlcv(pair,
self.timeframes_mapper["1m"],
since=unix_date,
limit=settings.ACTOR_MAX_CANDLES_TO_RETRIEVE_BY_CALL)
formated_data = []
if len(data) == 0 or data is None:
return formated_data
for tick in data:
if len(tick) == 6:
ohlcv_tick = self._format_ohlcv_data(tick)
formated_data.append(ohlcv_tick)
return formated_data
except Exception as err:
logging.error(f"ERROR IN ACTOR DataFetcher> {err}")
return []
def receiveMessage(self, message: Message, sender):
logging.debug(f"DataFetcher> Receive {message}")
if isinstance(message, SetPropertiesMessage):
logging.info("initializing exchange")
self.exchange = message.exchange
if isinstance(message, RequestDataFetch):
logging.debug(f"fetcher: received to fetch: {message.ticker_and_date}")
pair = message.ticker_and_date["pair"]
time.sleep(message.wait_time or 0)
formated_data = self._fetch_ohlcv(pair=pair,
unix_date=message.ticker_and_date["init_date"])
self.send(sender, ResponseDataFetch(pair=pair, data=formated_data, request=message)) from dataclasses import dataclass
from typing import List
from bqbbexchanges.utils.types import TickerAndDate, OHLCVData
@dataclass
class Message:
pass
@dataclass
class SetPropertiesMessage(Message):
DB = None
exchange = None
timeframes_mapper = None
test = False
stored_data = None
exchange_prefix = None
@dataclass
class GetHistoricalData(Message):
tickers: List[TickerAndDate]
@dataclass
class RequestDataFetch(Message):
ticker_and_date: TickerAndDate
wait_time: int
@dataclass
class ResponseDataFetch(Message):
pair: str
data: List[OHLCVData]
request: RequestDataFetch I don't get any error or warning except those address info warnings
My only suspicion could be the serialization of the db handler or the ccxt instance that could fail, but I don't have any warning about it ... in case pickly can't pickled the object, would it send me a warning or error message? |
It's hard to say what your issue might be from the code fragments above, but I can make some observations about things that you should probably change: The actor's
If the You have The An actor should never call The ActorSystem should be started already (probably in your In
|
@kquick thank you so much for your answer and your patience 😅 This has made it a bit difficult for me I've some doubts with your last reply so I did this basic example import logging
from dataclasses import dataclass
from typing import List, Union
from thespian.actors import *
@dataclass
class Message:
pass
@dataclass
class Plus(Message):
a: int
b: int
@dataclass
class Substract(Message):
a: int
b: int
@dataclass
class Result(Message):
response: int
@dataclass
class ListRequest(Message):
op: List[Union[Plus, Substract]]
class ActorTwo(ActorTypeDispatcher):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
def receiveMessage(self, msg: Message, sender):
logging.info(f"ACTORTWO: received {msg}")
if isinstance(msg, Plus):
self.send(sender, Result(response=msg.a + msg.b))
elif isinstance(msg, Substract):
self.send(sender, Result(response=msg.a - msg.b))
class ActorOne(ActorTypeDispatcher):
results = []
req_list = []
requester = None # this is the actor who send the original request
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
def receiveMessage(self, msg: Union[ListRequest, Result], sender):
logging.info(f"received message {msg}")
if isinstance(msg, ListRequest):
self.req_list = msg.op
self.requester = sender
for op in msg.op:
actor = self.createActor(ActorTwo)
self.send(actor, op)
elif isinstance(msg, Result):
self.results.append(msg.response)
if len(self.results) == len(self.req_list):
logging.info(f"ready to answer {self.results}")
self.send(self.requester, self.results)
else:
logging.error(f"UNKNOWN MESSAGE {msg}")
class actorLogFilter(logging.Filter):
def filter(self, logrecord):
return 'actorAddress' in logrecord.__dict__
class notActorLogFilter(logging.Filter):
def filter(self, logrecord):
return 'actorAddress' not in logrecord.__dict__
logcfg = {'version': 1,
'formatters': {
'normal': {'format': '%(levelname)-8s %(message)s'},
'actor': {'format': '%(levelname)-8s %(actorAddress)s => %(message)s'}},
'filters': {'isActorLog': {'()': actorLogFilter},
'notActorLog': {'()': notActorLogFilter}},
'handlers': {'h1': {'class': 'logging.FileHandler',
'filename': 'example.log',
'formatter': 'normal',
'filters': ['notActorLog'],
'level': logging.INFO},
'h2': {'class': 'logging.FileHandler',
'filename': 'example.log',
'formatter': 'actor',
'filters': ['isActorLog'],
'level': logging.INFO}, },
'loggers': {'': {'handlers': ['h1', 'h2'], 'level': logging.DEBUG}}
}
# main
if __name__ == '__main__':
debug = True
actor_mode = "simpleSystemBase" if debug else "multiprocTCPBase"
actor_one = ActorSystem(actor_mode, logDefs=logcfg).createActor(ActorOne)
print(ActorSystem().ask(actor_one, ListRequest(op=[Plus(a=10, b=12), Substract(a=5, b=2)]))) here I could replicate the two problems
thank so much, I appreciate your patience with this, hope this last post can solve my doubts. |
When you startup an When the I believe this is the problem you are encountering. If you ensure that the previous Please let me know if you still have difficulty after trying the above and ensuring the previous ActorSystem is shutdown. |
hi @kquick sorry for the late response, I don't know if this is the right way but I added a
but I'm afraid I'm not getting any log yet 😬 update:
thank you so much |
Killing the other app on port 1900 should open it up to use by Thespian, but that assumes the other app is not needed (and if so, it might be better to remove it from whatever startup script is running). Alternatively you can specify an alternate port for Thespian to use by setting the |
Hello, thanks for this lib, it's great, right now I've a small system with some actors, everything works as expected when I use simpleSystemBase but now I want to run this really async...so I'd like to use "multiprocQueueBase"
basically I've a normal python script which start some central actor (Dispatcher)
this central actor creates new actors based in the number of items passed in some message
these actors (DataFetcher) must be async because they do request to some API
is this arquitecture correct?...dispatcher must use "multiprocQueueBase or is ok start this using the default simpleSystemBase"?
how can I avoid that my main process (the script that create the dispatcher actor) end before my actors (actually this actor system must be running forever) ..use a while True in the main script is ok or are there a better approach?
also, the logs in the actors will be displayed in the running script?..right now (using a while True in the main script) I can't get logs
thank you so much!!!
The text was updated successfully, but these errors were encountered: