Skip to content

Commit

Permalink
Merge pull request #220 from sveneberth/feature/taskQueueEmulatorSupport
Browse files Browse the repository at this point in the history
Add support to use a task queue emulator
  • Loading branch information
tsteinruecken committed Mar 10, 2021
2 parents d205662 + dd34b0a commit 3af1fd3
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions tasks.py
Expand Up @@ -8,8 +8,10 @@
from time import sleep
from typing import Any, Callable, Dict

import grpc
import pytz
from google.cloud import tasks_v2
from google.cloud.tasks_v2.services.cloud_tasks.transports import CloudTasksGrpcTransport
from google.protobuf import timestamp_pb2

from viur.core import db, errors, utils
Expand Down Expand Up @@ -56,11 +58,17 @@ def jsonDecodeObjectHook(obj):
regionPrefix = _gaeApp.split("~")[0]
queueRegion = regionMap.get(regionPrefix)

if not queueRegion:
if not queueRegion and utils.isLocalDevelopmentServer and os.getenv("TASKS_EMULATOR") is None:
# Probably local development server
logging.warning("Taskqueue disabled, tasks will run inline!")

taskClient = tasks_v2.CloudTasksClient()
if not utils.isLocalDevelopmentServer or os.getenv("TASKS_EMULATOR") is None:
taskClient = tasks_v2.CloudTasksClient()
else:
taskClient = tasks_v2.CloudTasksClient(
transport=CloudTasksGrpcTransport(channel=grpc.insecure_channel(os.getenv("TASKS_EMULATOR")))
)
queueRegion = "local"

_periodicTasks: Dict[str, Dict[Callable, int]] = {}
_callableTasks = {}
Expand Down Expand Up @@ -188,7 +196,8 @@ def deferred(self, *args, **kwargs):
if 'X-AppEngine-TaskName' not in req.headers:
logging.critical('Detected an attempted XSRF attack. The header "X-AppEngine-Taskname" was not set.')
raise errors.Forbidden()
if req.environ.get("HTTP_X_APPENGINE_USER_IP") not in _appengineServiceIPs:
if req.environ.get("HTTP_X_APPENGINE_USER_IP") not in _appengineServiceIPs \
and (not utils.isLocalDevelopmentServer or os.getenv("TASKS_EMULATOR") is None):
logging.critical('Detected an attempted XSRF attack. This request did not originate from Task Queue.')
raise errors.Forbidden()
# Check if the retry count exceeds our warning threshold
Expand Down Expand Up @@ -436,7 +445,6 @@ def mkDefered(func, self=__undefinedFlag_, *args, **kwargs):
except:
usr = None
env = {"user": usr}
logging.error(env)
try:
env["lang"] = currentLanguage.get()
except AttributeError: # This isn't originating from a normal request
Expand Down

0 comments on commit 3af1fd3

Please sign in to comment.