Skip to content

Commit

Permalink
Merge pull request #124 from phenobarbital/new-version
Browse files Browse the repository at this point in the history
New version
  • Loading branch information
phenobarbital committed May 24, 2023
2 parents 93c99f7 + 1d1fa6e commit 0bcbaa4
Show file tree
Hide file tree
Showing 15 changed files with 704 additions and 494 deletions.
4 changes: 2 additions & 2 deletions examples/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

def top_words(url, n):
"""Returns top n words from text specified by url."""
text = requests.get(url, timeout=10).text.split()
text = requests.get(url, timeout=5).text.split()
return {url: Counter(text).most_common(n)}

async def get_top_words(urls, n):
Expand All @@ -39,7 +39,7 @@ async def get_top_words(urls, n):
start_time = time.time()
loop = asyncio.get_event_loop()
top = loop.run_until_complete(
get_top_words(URLS, 20)
get_top_words(URLS, 50)
)
end_time = time.time() - start_time
print(top)
Expand Down
12 changes: 6 additions & 6 deletions examples/test_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from notify import Notify
from qw.decorators import dispatch

stmp_host_user=config.get('stmp_host_user')
stmp_host_password=config.get('stmp_host_password')
stmp_host=config.get('stmp_host')
stmp_port=config.get('stmp_port')
stmp_host_user = config.get('stmp_host_user')
stmp_host_password = config.get('stmp_host_password')
stmp_host = config.get('stmp_host')
stmp_port = config.get('stmp_port')

@dispatch
async def send_email(sender):
Expand All @@ -29,7 +29,7 @@ async def send_email(sender):

async def create_user():
user = {
"name": "Jesus Lara",
"name": "Jesus Lara",
"account": {
"address": "jesuslarag@gmail.com",
}
Expand All @@ -47,4 +47,4 @@ async def create_user():
create_user()
)
finally:
loop.stop()
loop.close()
4 changes: 2 additions & 2 deletions examples/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(get_server_discovery(loop)) # Server starts listening
loop.run_until_complete(get_server_discovery(loop)) # Server starts listening
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.stop()
loop.close()
10 changes: 5 additions & 5 deletions examples/test_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from notify import Notify
from qw.client import QClient

stmp_host_user=config.get('stmp_host_user')
stmp_host_password=config.get('stmp_host_password')
stmp_host=config.get('stmp_host')
stmp_port=config.get('stmp_port')
stmp_host_user = config.get('stmp_host_user')
stmp_host_password = config.get('stmp_host_password')
stmp_host = config.get('stmp_host')
stmp_port = config.get('stmp_port')

user = {
"name": "Jesus Lara",
Expand Down Expand Up @@ -45,4 +45,4 @@ async def send_email():
qw.queue(send_email)
)
finally:
loop.stop()
loop.close()
2 changes: 0 additions & 2 deletions examples/test_keep.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
"""
import asyncio
import warnings
import time
import random
import cloudpickle
from navconfig.logging import logging
# from qw.wrappers import FuncWrapper, TaskWrapper
Expand Down
2 changes: 1 addition & 1 deletion examples/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def very_long_task(seconds: int):
async def queue_tasks():
"""Returns top n words in documents specified by URLs."""
x = [randint(10, 20) for p in range(0, 100)]

print(f'Pushed {len(x)} tasks to worker queue.')
result = await asyncio.gather(
*[qw.queue(very_long_task, n) for n in x]
)
Expand Down
72 changes: 49 additions & 23 deletions qw/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import argparse
import uvloop
import warnings
from .conf import (
WORKER_DEFAULT_HOST,
WORKER_DEFAULT_PORT,
Expand All @@ -10,54 +11,79 @@
WORKER_DISCOVERY_PORT

)
from .process import spawn_process
from .process import SpawnProcess
from .utils import cPrint

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
uvloop.install()

warnings.simplefilter("default", ResourceWarning)

def main():
"""Main Worker Function."""
asyncio.set_event_loop_policy(
uvloop.EventLoopPolicy()
)
uvloop.install()
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('--host', dest='host', type=str,
default=WORKER_DEFAULT_HOST,
help='set server host'
parser.add_argument(
'--host', dest='host', type=str,
default=WORKER_DEFAULT_HOST,
help='set server host'
)
parser.add_argument('--port', dest='port', type=int,
default=WORKER_DEFAULT_PORT,
help='set server port'
parser.add_argument(
'--port', dest='port', type=int,
default=WORKER_DEFAULT_PORT,
help='set server port'
)
parser.add_argument('--workers', dest='workers', type=int,
default=WORKER_DEFAULT_QTY,
help='max number of workers'
parser.add_argument(
'--workers', dest='workers', type=int,
default=WORKER_DEFAULT_QTY,
help='max number of workers'
)
parser.add_argument('--queue', dest='queue', type=int,
default=WORKER_QUEUE_SIZE,
help='Size of Queue on Worker'
parser.add_argument(
'--queue', dest='queue', type=int,
default=WORKER_QUEUE_SIZE,
help='Size of Queue on Worker'
)
parser.add_argument('--wkname', dest='wkname', type=str, default='Worker',
help='Worker Name'
parser.add_argument(
'--wkname', dest='wkname', type=str,
default='Worker',
help='Worker Name'
)
parser.add_argument('--discovery', dest='discovery', type=str, default=WORKER_DISCOVERY_PORT,
help='UDP Port for Service discovery'
parser.add_argument(
'--enable-discovery', dest='enable_discovery',
type=str.lower,
choices=["true", "false"],
default='true',
help='Start Discovery Service on this Worker'
)
parser.add_argument('--debug', action="store_true", default=False,
help="Start workers in Debug Mode"
parser.add_argument(
'--discovery', dest='discovery', type=str,
default=WORKER_DISCOVERY_PORT,
help='UDP Port for Service discovery'
)
parser.add_argument(
'--debug', action="store_true",
default=False,
help="Start workers in Debug Mode"
)
args = parser.parse_args()
try:
loop = asyncio.get_event_loop()
cPrint('::: Starting Workers ::: ')
process = spawn_process(args, event_loop=loop)
process = SpawnProcess(args)
process.start()
loop.run_forever()
except KeyboardInterrupt:
process.terminate()
except Exception as ex:
# log the unexpected error
print(f"Unexpected error: {ex}")
process.terminate()
finally:
cPrint('Shutdown all workers ...', level='WARN')
loop.close()
loop.close() # close the event loop


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 0bcbaa4

Please sign in to comment.