Skip to content

Commit

Permalink
Applied linters
Browse files Browse the repository at this point in the history
  • Loading branch information
lorinkoz committed Jul 1, 2023
1 parent ec67a09 commit 54ab037
Show file tree
Hide file tree
Showing 41 changed files with 517 additions and 146 deletions.
16 changes: 15 additions & 1 deletion django_pgschemas/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,15 @@
from .schema import SchemaDescriptor, activate, activate_public, deactivate, get_current_schema # noqa
from .schema import (
SchemaDescriptor,
activate,
activate_public,
deactivate,
get_current_schema,
)

__all__ = [
"SchemaDescriptor",
"activate",
"activate_public",
"deactivate",
"get_current_schema",
]
26 changes: 19 additions & 7 deletions django_pgschemas/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,44 +38,56 @@ def _check_default_schemas(self):
if "DOMAINS" in settings.TENANTS["default"]:
raise ImproperlyConfigured("TENANTS['default'] cannot contain a 'DOMAINS' key.")
if "FALLBACK_DOMAINS" in settings.TENANTS["default"]:
raise ImproperlyConfigured("TENANTS['default'] cannot contain a 'FALLBACK_DOMAINS' key.")
raise ImproperlyConfigured(
"TENANTS['default'] cannot contain a 'FALLBACK_DOMAINS' key."
)
if (
"CLONE_REFERENCE" in settings.TENANTS["default"]
and settings.TENANTS["default"]["CLONE_REFERENCE"] in settings.TENANTS
):
raise ImproperlyConfigured("TENANTS['default']['CLONE_REFERENCE'] must be a unique schema name.")
raise ImproperlyConfigured(
"TENANTS['default']['CLONE_REFERENCE'] must be a unique schema name."
)

def _check_overall_schemas(self):
for schema in settings.TENANTS:
if schema not in ["public", "default"]:
if not is_valid_schema_name(schema):
raise ImproperlyConfigured("'%s' is not a valid schema name." % schema)
if not isinstance(settings.TENANTS[schema].get("DOMAINS"), list):
raise ImproperlyConfigured("TENANTS['%s'] must contain a 'DOMAINS' list." % schema)
raise ImproperlyConfigured(
"TENANTS['%s'] must contain a 'DOMAINS' list." % schema
)

def _check_complementary_settings(self):
if "django_pgschemas.routers.SyncRouter" not in settings.DATABASE_ROUTERS:
raise ImproperlyConfigured("DATABASE_ROUTERS setting must contain 'django_pgschemas.routers.SyncRouter'.")
raise ImproperlyConfigured(
"DATABASE_ROUTERS setting must contain 'django_pgschemas.routers.SyncRouter'."
)

def _check_extra_search_paths(self):
if hasattr(settings, "PGSCHEMAS_EXTRA_SEARCH_PATHS"):
TenantModel = get_tenant_model()
cursor = connection.cursor()
cursor.execute(
"SELECT 1 FROM information_schema.tables WHERE table_name = %s;", [TenantModel._meta.db_table]
"SELECT 1 FROM information_schema.tables WHERE table_name = %s;",
[TenantModel._meta.db_table],
)
dynamic_tenants = []
if "CLONE_REFERENCE" in settings.TENANTS["default"]:
dynamic_tenants.append(settings.TENANTS["default"]["CLONE_REFERENCE"])
if cursor.fetchone():
dynamic_tenants += list(TenantModel.objects.all().values_list("schema_name", flat=True))
dynamic_tenants += list(
TenantModel.objects.all().values_list("schema_name", flat=True)
)
cursor.close()
invalid_schemas = set(settings.PGSCHEMAS_EXTRA_SEARCH_PATHS).intersection(
set(settings.TENANTS.keys()).union(dynamic_tenants)
)
if invalid_schemas:
raise ImproperlyConfigured(
"Do not include '%s' on PGSCHEMAS_EXTRA_SEARCH_PATHS." % ", ".join(invalid_schemas)
"Do not include '%s' on PGSCHEMAS_EXTRA_SEARCH_PATHS."
% ", ".join(invalid_schemas)
)

def ready(self):
Expand Down
22 changes: 17 additions & 5 deletions django_pgschemas/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,17 @@ def check_principal_apps(app_configs, **kwargs):
domain_app = get_domain_app()
if tenant_app not in settings.TENANTS["public"].get("APPS", []):
errors.append(
checks.Error("Your tenant app '%s' must be on the 'public' schema." % tenant_app, id="pgschemas.W001")
checks.Error(
"Your tenant app '%s' must be on the 'public' schema." % tenant_app,
id="pgschemas.W001",
)
)
if domain_app not in settings.TENANTS["public"].get("APPS", []):
errors.append(
checks.Error("Your domain app '%s' must be on the 'public' schema." % domain_app, id="pgschemas.W001")
checks.Error(
"Your domain app '%s' must be on the 'public' schema." % domain_app,
id="pgschemas.W001",
)
)
for schema in settings.TENANTS:
schema_apps = settings.TENANTS[schema].get("APPS", [])
Expand Down Expand Up @@ -97,14 +103,20 @@ def check_other_apps(app_configs, **kwargs):
if session_app in schema_apps and user_app not in schema_apps:
errors.append(
checks.Warning(
"'%s' must be together with '%s' in TENANTS['%s']['APPS']." % (user_app, session_app, schema),
"'%s' must be together with '%s' in TENANTS['%s']['APPS']."
% (user_app, session_app, schema),
id="pgschemas.W003",
)
)
elif user_app in schema_apps and session_app not in schema_apps and session_app in settings.INSTALLED_APPS:
elif (
user_app in schema_apps
and session_app not in schema_apps
and session_app in settings.INSTALLED_APPS
):
errors.append(
checks.Warning(
"'%s' must be together with '%s' in TENANTS['%s']['APPS']." % (session_app, user_app, schema),
"'%s' must be together with '%s' in TENANTS['%s']['APPS']."
% (session_app, user_app, schema),
id="pgschemas.W003",
)
)
Expand Down
4 changes: 3 additions & 1 deletion django_pgschemas/contrib/channels2/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ def get_user(scope):
If no user is retrieved, return an instance of `AnonymousUser`.
"""
if "session" not in scope:
raise ValueError("Cannot find session in scope. You should wrap your consumer in SessionMiddleware.")
raise ValueError(
"Cannot find session in scope. You should wrap your consumer in SessionMiddleware."
)
user = None
session = scope["session"]
with scope["tenant"]:
Expand Down
14 changes: 11 additions & 3 deletions django_pgschemas/contrib/channels2/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,24 @@ def get_tenant_scope(self, scope):
DomainModel = get_domain_model()
prefix = scope["path"].split("/")[1]
try:
domain = DomainModel.objects.select_related("tenant").get(domain=hostname, folder=prefix)
domain = DomainModel.objects.select_related("tenant").get(
domain=hostname, folder=prefix
)
except DomainModel.DoesNotExist:
try:
domain = DomainModel.objects.select_related("tenant").get(domain=hostname, folder="")
domain = DomainModel.objects.select_related("tenant").get(
domain=hostname, folder=""
)
except DomainModel.DoesNotExist:
return None, "", []
tenant = domain.tenant
tenant.domain_url = hostname
ws_urlconf = settings.TENANTS["default"]["WS_URLCONF"]
return tenant, prefix if prefix == domain.folder else "", import_string(ws_urlconf + ".urlpatterns")
return (
tenant,
prefix if prefix == domain.folder else "",
import_string(ws_urlconf + ".urlpatterns"),
)

def get_protocol_type_router(self, tenant_prefix, ws_urlconf):
"""
Expand Down
4 changes: 3 additions & 1 deletion django_pgschemas/contrib/channels3/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ def get_user(scope):
If no user is retrieved, return an instance of `AnonymousUser`.
"""
if "session" not in scope:
raise ValueError("Cannot find session in scope. You should wrap your consumer in SessionMiddleware.")
raise ValueError(
"Cannot find session in scope. You should wrap your consumer in SessionMiddleware."
)
user = None
session = scope["session"]
with scope["tenant"]:
Expand Down
14 changes: 11 additions & 3 deletions django_pgschemas/contrib/channels3/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,24 @@ def get_tenant_scope(self, scope):
DomainModel = get_domain_model()
prefix = scope["path"].split("/")[1]
try:
domain = DomainModel.objects.select_related("tenant").get(domain=hostname, folder=prefix)
domain = DomainModel.objects.select_related("tenant").get(
domain=hostname, folder=prefix
)
except DomainModel.DoesNotExist:
try:
domain = DomainModel.objects.select_related("tenant").get(domain=hostname, folder="")
domain = DomainModel.objects.select_related("tenant").get(
domain=hostname, folder=""
)
except DomainModel.DoesNotExist:
return None, "", []
tenant = domain.tenant
tenant.domain_url = hostname
ws_urlconf = settings.TENANTS["default"]["WS_URLCONF"]
return tenant, prefix if prefix == domain.folder else "", import_string(ws_urlconf + ".urlpatterns")
return (
tenant,
prefix if prefix == domain.folder else "",
import_string(ws_urlconf + ".urlpatterns"),
)

def get_protocol_type_router(self, tenant_prefix, ws_urlconf):
"""
Expand Down
43 changes: 33 additions & 10 deletions django_pgschemas/management/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.db.models import CharField, Q
from django.db.models import Value as V
from django.db.models import CharField, Q, Value as V
from django.db.models.functions import Concat
from django.db.utils import ProgrammingError

Expand Down Expand Up @@ -49,7 +48,11 @@ def add_arguments(self, parser):
help="Tells Django to NOT prompt the user for input of any kind.",
)
parser.add_argument(
"-s", "--schema", nargs="+", dest="schemas", help="Schema(s) to execute the current command"
"-s",
"--schema",
nargs="+",
dest="schemas",
help="Schema(s) to execute the current command",
)
parser.add_argument(
"-x",
Expand Down Expand Up @@ -151,7 +154,8 @@ def _get_schemas_from_options(self, **options):
include_all_schemas = True
elif options.get("interactive", True):
schema = input(
"Enter schema to run command (leave blank for running on '%s' schemas): " % self.get_scope_display()
"Enter schema to run command (leave blank for running on '%s' schemas): "
% self.get_scope_display()
).strip()

if schema:
Expand All @@ -162,9 +166,13 @@ def _get_schemas_from_options(self, **options):
raise CommandError("No schema provided")

TenantModel = get_tenant_model()
static_schemas = [x for x in settings.TENANTS.keys() if x != "default"] if allow_static else []
static_schemas = (
[x for x in settings.TENANTS.keys() if x != "default"] if allow_static else []
)
dynamic_schemas = (
TenantModel.objects.values_list("schema_name", flat=True) if dynamic_ready and allow_dynamic else []
TenantModel.objects.values_list("schema_name", flat=True)
if dynamic_ready and allow_dynamic
else []
)
if clone_reference and allow_static:
static_schemas.append(clone_reference)
Expand Down Expand Up @@ -195,7 +203,11 @@ def find_schema_by_reference(reference, as_excluded=False):
return reference
elif reference == clone_reference:
return reference
elif dynamic_ready and TenantModel.objects.filter(schema_name=reference).exists() and allow_dynamic:
elif (
dynamic_ready
and TenantModel.objects.filter(schema_name=reference).exists()
and allow_dynamic
):
return reference
else:
local = []
Expand All @@ -209,16 +221,27 @@ def find_schema_by_reference(reference, as_excluded=False):
if dynamic_ready and allow_dynamic:
local += (
TenantModel.objects.annotate(
route=Concat("domains__domain", V("/"), "domains__folder", output_field=CharField())
route=Concat(
"domains__domain",
V("/"),
"domains__folder",
output_field=CharField(),
)
)
.filter(
Q(schema_name=reference) | Q(domains__domain__istartswith=reference) | Q(route=reference)
Q(schema_name=reference)
| Q(domains__domain__istartswith=reference)
| Q(route=reference)
)
.distinct()
.values_list("schema_name", flat=True)
)
if not local:
message = "No schema found for '%s' (excluded)" if as_excluded else "No schema found for '%s'"
message = (
"No schema found for '%s' (excluded)"
if as_excluded
else "No schema found for '%s'"
)
raise CommandError(message % reference)
if len(local) > 1:
message = (
Expand Down
8 changes: 6 additions & 2 deletions django_pgschemas/management/commands/_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ def __call__(self, message):

if schema_name in settings.TENANTS:
domains = settings.TENANTS[schema_name].get("DOMAINS", [])
schema = SchemaDescriptor.create(schema_name=schema_name, domain_url=domains[0] if domains else None)
schema = SchemaDescriptor.create(
schema_name=schema_name, domain_url=domains[0] if domains else None
)
elif schema_name == get_clone_reference():
schema = SchemaDescriptor.create(schema_name=schema_name)
else:
Expand All @@ -89,7 +91,9 @@ def __call__(self, message):
return schema_name


def sequential(schemas, command, function_name, args=None, kwargs=None, pass_schema_in_kwargs=False):
def sequential(
schemas, command, function_name, args=None, kwargs=None, pass_schema_in_kwargs=False
):
runner = functools.partial(
run_on_schema,
executor_codename="sequential",
Expand Down
24 changes: 19 additions & 5 deletions django_pgschemas/management/commands/cloneschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ def _run_checks(self, **kwargs): # pragma: no cover

def add_arguments(self, parser):
super().add_arguments(parser)
parser.add_argument("source", help="The name of the schema you want to clone")
parser.add_argument("destination", help="The name of the schema you want to create as clone")
parser.add_argument(
"source",
help="The name of the schema you want to clone",
)
parser.add_argument(
"destination",
help="The name of the schema you want to create as clone",
)
parser.add_argument(
"--noinput",
"--no-input",
Expand Down Expand Up @@ -61,10 +67,16 @@ def _check_required_field(self, field, exclude=None):
)

def _get_constructed_instance(self, model_class, data):
fields = [field for field in model_class._meta.fields if self._check_required_field(field, data.keys())]
fields = [
field
for field in model_class._meta.fields
if self._check_required_field(field, data.keys())
]
instance = model_class(**data)
if fields:
self.stdout.write(self.style.WARNING(f"We need some data for model '{model_class._meta.model_name}':"))
self.stdout.write(
self.style.WARNING(f"We need some data for model '{model_class._meta.model_name}':")
)
for field in fields:
while field.name not in data:
raw_value = input(f"Value for field '{field.name}': ")
Expand All @@ -88,7 +100,9 @@ def get_dynamic_tenant(self, **options):
if self._ask(
"You are cloning a schema for a dynamic tenant. Would you like to create a database entry for it?"
):
tenant = self._get_constructed_instance(get_tenant_model(), {"schema_name": options["destination"]})
tenant = self._get_constructed_instance(
get_tenant_model(), {"schema_name": options["destination"]}
)
domain = self._get_constructed_instance(get_domain_model(), {"is_primary": True})
if options["verbosity"] >= 1:
self.stdout.write(self.style.WARNING("Looks good! Let's get to it!"))
Expand Down
11 changes: 9 additions & 2 deletions django_pgschemas/management/commands/createrefschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ def _run_checks(self, **kwargs): # pragma: no cover

def add_arguments(self, parser):
super().add_arguments(parser)
parser.add_argument("--recreate", action="store_true", dest="recreate", help="Recreate reference schema.")
parser.add_argument(
"--recreate",
action="store_true",
dest="recreate",
help="Recreate reference schema.",
)

def handle(self, *args, **options):
clone_reference = get_clone_reference()
Expand All @@ -24,7 +29,9 @@ def handle(self, *args, **options):
drop_schema(clone_reference, check_if_exists=True, verbosity=options["verbosity"])
if options["verbosity"] >= 1:
self.stdout.write("Destroyed existing reference schema.")
created = create_schema(clone_reference, check_if_exists=True, verbosity=options["verbosity"])
created = create_schema(
clone_reference, check_if_exists=True, verbosity=options["verbosity"]
)
if options["verbosity"] >= 1:
if created:
self.stdout.write("Reference schema successfully created!")
Expand Down
4 changes: 3 additions & 1 deletion django_pgschemas/management/commands/whowill.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ class Command(TenantCommand):
def handle_tenant(self, tenant, *args, **options):
if options["verbosity"] >= 1:
self.stdout.write(
str(tenant.get_primary_domain()) if tenant.is_dynamic else tenant.domain_url or tenant.schema_name
str(tenant.get_primary_domain())
if tenant.is_dynamic
else tenant.domain_url or tenant.schema_name
)
Loading

0 comments on commit 54ab037

Please sign in to comment.