-
-
Notifications
You must be signed in to change notification settings - Fork 216
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(api): allow legacy auth from api
- Loading branch information
Showing
6 changed files
with
318 additions
and
5 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import logging | ||
from datetime import datetime | ||
from typing import Optional | ||
|
||
from django.db import models | ||
from django.utils.crypto import get_random_string | ||
|
||
from ..core.models import User | ||
from .vendor import phpserialize | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
LEGACY_SESSION_LIFETIME = 1440 | ||
|
||
|
||
def legacy_session_decode(value: Optional[str]) -> dict: | ||
if not value: | ||
return {} | ||
return phpserialize.loads(value, object_hook=phpserialize.phpobject) | ||
|
||
|
||
def legacy_session_encode(data: Optional[dict]) -> str: | ||
if not data: | ||
return "" | ||
return phpserialize.dumps(data, object_hook=phpserialize.phpobject) | ||
|
||
|
||
class LegacySession(models.Model): | ||
id = models.CharField(primary_key=True, max_length=32) | ||
modified = models.IntegerField(blank=True, null=True) | ||
lifetime = models.IntegerField(blank=True, null=True) | ||
data = models.TextField(blank=True, null=True) | ||
|
||
class Meta: | ||
managed = False | ||
db_table = "sessions" | ||
|
||
@classmethod | ||
def login(cls, old_session_id: str, user: User) -> Optional["LegacySession"]: | ||
try: | ||
old_session = cls.objects.get(id=old_session_id) | ||
except cls.DoesNotExist: | ||
return None | ||
|
||
# Check session expiration time | ||
old_session_expires = (old_session.modified or 0) + (old_session.lifetime or 0) | ||
if old_session_expires < datetime.now().timestamp(): | ||
old_session.delete() | ||
return None | ||
|
||
session_data = legacy_session_decode(old_session.data) | ||
|
||
def _datetime_format(value: Optional[datetime]) -> Optional[str]: | ||
return value.strftime("%Y-%m-%d %H:%M:%S.%f") if value else None | ||
|
||
user_data = phpserialize.phpobject( | ||
name="stdClass", | ||
d={ | ||
"id": user.id, | ||
"login": user.username, | ||
"email": user.email, | ||
"type": user.role, | ||
"first_name": user.first_name, | ||
"last_name": user.last_name, | ||
"lastlogin": _datetime_format(user.last_login), | ||
"lastfail": _datetime_format(user.last_failed_login), | ||
"login_attempts": user.login_attempts, | ||
"skype_contact": user.skype, | ||
"jabber_contact": user.jabber, | ||
"cell_phone": user.phone, | ||
}, | ||
) | ||
if session_data["libretime"]["storage"] is not None: | ||
logger.warning("overwriting data in legacy session") | ||
|
||
session_data["libretime"]["storage"] = user_data | ||
|
||
# https://github.com/zf1s/zend-session/blob/f33edeaabeda8def65e18cfb13be2c12664f03c3/library/Zend/Session.php#L532-L541 | ||
new_session_id = get_random_string( | ||
length=26, | ||
allowed_chars="0123456789abcdef", | ||
) | ||
new_session = cls( | ||
id=new_session_id, | ||
modified=datetime.now().timestamp(), | ||
lifetime=LEGACY_SESSION_LIFETIME, | ||
data=legacy_session_encode(session_data), | ||
) | ||
|
||
old_session.delete() | ||
new_session.save() | ||
|
||
return new_session | ||
|
||
@classmethod | ||
def logout(cls, old_session_id: str) -> None: | ||
cls.objects.filter(id=old_session_id).delete() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
from datetime import datetime | ||
from typing import Optional | ||
|
||
import pytest | ||
|
||
from ...core.models import Role, User | ||
from ..models import ( | ||
LEGACY_SESSION_LIFETIME, | ||
LegacySession, | ||
legacy_session_decode, | ||
legacy_session_encode, | ||
) | ||
from ..vendor import phpserialize | ||
|
||
|
||
def make_legacy_session_data(session_data: Optional[dict] = None): | ||
if session_data is not None: | ||
storage = phpserialize.phpobject(name="stdClass", d=session_data) | ||
else: | ||
storage = None | ||
|
||
return { | ||
"__ZF": {"csrf_namespace": {"ENT": 1686060245}}, | ||
"csrf_namespace": {"authtoken": "985a17a922c86a12f28c3ebd1abd09375e1dfde4"}, | ||
"libretime": {"storage": storage}, | ||
} | ||
|
||
|
||
TEST_SESSION_DATA_RAW = """a:3:{s:4:"__ZF";a:1:{s:14:"csrf_namespace";a:1:{s:3:"ENT";i:1686060245;}}s:14:"csrf_namespace";a:1:{s:9:"authtoken";s:40:"985a17a922c86a12f28c3ebd1abd09375e1dfde4";}s:9:"libretime";a:1:{s:7:"storage";O:8:"stdClass":13:{s:2:"id";i:1;s:5:"login";s:5:"admin";s:4:"pass";s:32:"21232f297a57a5a743894a0e4a801fc3";s:4:"type";s:1:"A";s:10:"first_name";s:0:"";s:9:"last_name";s:0:"";s:9:"lastlogin";N;s:8:"lastfail";N;s:13:"skype_contact";N;s:14:"jabber_contact";N;s:5:"email";N;s:10:"cell_phone";N;s:14:"login_attempts";i:0;}}}""" # pylint: disable=line-too-long | ||
TEST_SESSION_DATA = make_legacy_session_data( | ||
{ | ||
"id": 1, | ||
"login": "admin", | ||
"pass": "21232f297a57a5a743894a0e4a801fc3", | ||
"type": "A", | ||
"first_name": "", | ||
"last_name": "", | ||
"lastlogin": None, | ||
"lastfail": None, | ||
"skype_contact": None, | ||
"jabber_contact": None, | ||
"email": None, | ||
"cell_phone": None, | ||
"login_attempts": 0, | ||
} | ||
) | ||
|
||
|
||
def test_legacy_session_decode(): | ||
assert legacy_session_decode(TEST_SESSION_DATA_RAW) == TEST_SESSION_DATA | ||
|
||
|
||
def test_legacy_session_encode(): | ||
print(legacy_session_encode(TEST_SESSION_DATA)) | ||
assert legacy_session_encode(TEST_SESSION_DATA) == TEST_SESSION_DATA_RAW | ||
|
||
|
||
@pytest.fixture(name="old_session") | ||
def old_session_fixture(): | ||
return LegacySession.objects.create( | ||
id="jikjr9dlrjl9b0jn9r6tnci47a", | ||
modified=datetime.now().timestamp(), | ||
lifetime=LEGACY_SESSION_LIFETIME, | ||
data=legacy_session_encode(make_legacy_session_data()), | ||
) | ||
|
||
|
||
@pytest.mark.django_db | ||
def test_legacy_session_login(old_session: LegacySession): | ||
user = User.objects.create_user( | ||
role=Role.HOST, | ||
username="test", | ||
password="test", | ||
email="test@example.com", | ||
first_name="test", | ||
last_name="user", | ||
) | ||
|
||
new_session = LegacySession.login(old_session.id, user) | ||
assert new_session is not None | ||
assert not LegacySession.objects.filter(id=old_session.id).exists() | ||
assert old_session.id != new_session.id | ||
|
||
new_session_data = legacy_session_decode(new_session.data) | ||
assert new_session_data == make_legacy_session_data( | ||
{ | ||
"id": user.id, | ||
"login": "test", | ||
"email": "test@example.com", | ||
"type": "H", | ||
"first_name": "test", | ||
"last_name": "user", | ||
"lastlogin": None, | ||
"lastfail": None, | ||
"login_attempts": None, | ||
"skype_contact": None, | ||
"jabber_contact": None, | ||
"cell_phone": None, | ||
}, | ||
) | ||
|
||
|
||
@pytest.mark.django_db | ||
def test_legacy_session_login_expired(): | ||
user = User.objects.create_user( | ||
role=Role.HOST, | ||
username="test", | ||
password="test", | ||
email="test@example.com", | ||
first_name="test", | ||
last_name="user", | ||
) | ||
|
||
old_session = LegacySession.objects.create( | ||
id="jikjr9dlrjl9b0jn9r6tnci47a", | ||
modified=datetime.now().timestamp() - LEGACY_SESSION_LIFETIME - 1, | ||
lifetime=LEGACY_SESSION_LIFETIME, | ||
data=legacy_session_encode(make_legacy_session_data()), | ||
) | ||
|
||
new_session = LegacySession.login(old_session.id, user) | ||
|
||
assert new_session is None | ||
assert not LegacySession.objects.filter(id=old_session.id).exists() | ||
|
||
|
||
@pytest.mark.django_db | ||
def test_legacy_session_logout(old_session: LegacySession): | ||
LegacySession.logout(old_session.id) | ||
assert not LegacySession.objects.filter(id=old_session.id).exists() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters