-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcelery_task.py
49 lines (40 loc) · 1.48 KB
/
celery_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
"""
! Why :
? for some sections we need a admin Token , so we should refresh admin token ...
! How it work ?
? 1:sudo docker run -d -p 6379:6379 redis
? 2:celery -A celery_task beat
? 3:celery -A celery_task worker -B
"""
from celery import Celery
from decouple import config
import requests
BASE_AUTH = config('BASE_AUTH')
HOST = config('HOST')
REFRESH_TOKEN = config('REFRESH_TOKEN')
app = Celery('task', broker='redis://localhost:6379/0')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(5.0, change_token.s(), # ! change it
name='change_token every 5 sec')
@app.task
def change_token():
# * open file and read all lines
with open("settings.ini", 'r') as reader:
get_all = reader.readlines()
# * send refresh token
response = requests.post(
url=HOST + "/update-token", data={"refresh_token": REFRESH_TOKEN}, headers={'auth_basic': BASE_AUTH, }
)
new_token = response.json()["data"]['token']
new_refresh_token = response.json()["data"]['refresh_token']
# * change token and refresh token value on settings.ini
with open('settings.ini', 'w') as reader:
for i, line in enumerate(get_all, 1):
if i == 2:
reader.writelines("TOKEN = "+new_token+"\n")
elif i == 3:
reader.writelines("REFRESH_TOKEN = "+new_refresh_token+"\n")
else:
reader.writelines(line)
print("Token Updated")