Skip to content

Commit

Permalink
feat: Use requests AuthBase classes
Browse files Browse the repository at this point in the history
  • Loading branch information
uda authored and nejch committed Oct 12, 2023
1 parent 3b83d5d commit 5f46cfd
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 32 deletions.
16 changes: 15 additions & 1 deletion gitlab/_backends/__init__.py
Expand Up @@ -2,7 +2,21 @@
Defines http backends for processing http requests
"""

from .requests_backend import RequestsBackend, RequestsResponse
from .requests_backend import (
JobTokenAuth,
OAuthTokenAuth,
PrivateTokenAuth,
RequestsBackend,
RequestsResponse,
)

DefaultBackend = RequestsBackend
DefaultResponse = RequestsResponse

__all__ = [
"DefaultBackend",
"DefaultResponse",
"JobTokenAuth",
"OAuthTokenAuth",
"PrivateTokenAuth",
]
31 changes: 31 additions & 0 deletions gitlab/_backends/requests_backend.py
Expand Up @@ -4,12 +4,43 @@
from typing import Any, Dict, Optional, TYPE_CHECKING, Union

import requests
from requests import PreparedRequest
from requests.auth import AuthBase
from requests.structures import CaseInsensitiveDict
from requests_toolbelt.multipart.encoder import MultipartEncoder # type: ignore

from . import protocol


class TokenAuth:
def __init__(self, token: str):
self.token = token


class OAuthTokenAuth(TokenAuth, AuthBase):
def __call__(self, r: PreparedRequest) -> PreparedRequest:
r.headers["Authorization"] = f"Bearer {self.token}"
r.headers.pop("PRIVATE-TOKEN", None)
r.headers.pop("JOB-TOKEN", None)
return r


class PrivateTokenAuth(TokenAuth, AuthBase):
def __call__(self, r: PreparedRequest) -> PreparedRequest:
r.headers["PRIVATE-TOKEN"] = self.token
r.headers.pop("JOB-TOKEN", None)
r.headers.pop("Authorization", None)
return r


class JobTokenAuth(TokenAuth, AuthBase):
def __call__(self, r: PreparedRequest) -> PreparedRequest:
r.headers["JOB-TOKEN"] = self.token
r.headers.pop("PRIVATE-TOKEN", None)
r.headers.pop("Authorization", None)
return r


@dataclasses.dataclass
class SendData:
content_type: str
Expand Down
22 changes: 8 additions & 14 deletions gitlab/client.py
Expand Up @@ -485,30 +485,24 @@ def _set_auth_info(self) -> None:
not self.http_username and self.http_password
):
raise ValueError("Both http_username and http_password should be defined")
if self.oauth_token and self.http_username:
if tokens and self.http_username:
raise ValueError(
"Only one of oauth authentication or http "
"Only one of token authentications or http "
"authentication should be defined"
)

self._http_auth = None
self._auth: Optional[requests.auth.AuthBase] = None
if self.private_token:
self.headers.pop("Authorization", None)
self.headers["PRIVATE-TOKEN"] = self.private_token
self.headers.pop("JOB-TOKEN", None)
self._auth = _backends.PrivateTokenAuth(self.private_token)

if self.oauth_token:
self.headers["Authorization"] = f"Bearer {self.oauth_token}"
self.headers.pop("PRIVATE-TOKEN", None)
self.headers.pop("JOB-TOKEN", None)
self._auth = _backends.OAuthTokenAuth(self.oauth_token)

if self.job_token:
self.headers.pop("Authorization", None)
self.headers.pop("PRIVATE-TOKEN", None)
self.headers["JOB-TOKEN"] = self.job_token
self._auth = _backends.JobTokenAuth(self.job_token)

if self.http_username and self.http_password:
self._http_auth = requests.auth.HTTPBasicAuth(
self._auth = requests.auth.HTTPBasicAuth(
self.http_username, self.http_password
)

Expand All @@ -527,7 +521,7 @@ def enable_debug() -> None:
def _get_session_opts(self) -> Dict[str, Any]:
return {
"headers": self.headers.copy(),
"auth": self._http_auth,
"auth": self._auth,
"timeout": self.timeout,
"verify": self.ssl_verify,
}
Expand Down
49 changes: 32 additions & 17 deletions tests/unit/test_gitlab_auth.py
@@ -1,7 +1,9 @@
import pytest
import requests
from requests import PreparedRequest

from gitlab import Gitlab
from gitlab._backends import JobTokenAuth, OAuthTokenAuth, PrivateTokenAuth
from gitlab.config import GitlabConfigParser


Expand Down Expand Up @@ -39,51 +41,64 @@ def test_invalid_auth_args():

def test_private_token_auth():
gl = Gitlab("http://localhost", private_token="private_token", api_version="4")
p = PreparedRequest()
p.prepare(url=gl.url, auth=gl._auth)
assert gl.private_token == "private_token"
assert gl.oauth_token is None
assert gl.job_token is None
assert gl._http_auth is None
assert "Authorization" not in gl.headers
assert gl.headers["PRIVATE-TOKEN"] == "private_token"
assert "JOB-TOKEN" not in gl.headers
assert isinstance(gl._auth, PrivateTokenAuth)
assert gl._auth.token == "private_token"
assert p.headers["PRIVATE-TOKEN"] == "private_token"
assert "JOB-TOKEN" not in p.headers
assert "Authorization" not in p.headers


def test_oauth_token_auth():
gl = Gitlab("http://localhost", oauth_token="oauth_token", api_version="4")
p = PreparedRequest()
p.prepare(url=gl.url, auth=gl._auth)
assert gl.private_token is None
assert gl.oauth_token == "oauth_token"
assert gl.job_token is None
assert gl._http_auth is None
assert gl.headers["Authorization"] == "Bearer oauth_token"
assert "PRIVATE-TOKEN" not in gl.headers
assert "JOB-TOKEN" not in gl.headers
assert isinstance(gl._auth, OAuthTokenAuth)
assert gl._auth.token == "oauth_token"
assert p.headers["Authorization"] == "Bearer oauth_token"
assert "PRIVATE-TOKEN" not in p.headers
assert "JOB-TOKEN" not in p.headers


def test_job_token_auth():
gl = Gitlab("http://localhost", job_token="CI_JOB_TOKEN", api_version="4")
p = PreparedRequest()
p.prepare(url=gl.url, auth=gl._auth)
assert gl.private_token is None
assert gl.oauth_token is None
assert gl.job_token == "CI_JOB_TOKEN"
assert gl._http_auth is None
assert "Authorization" not in gl.headers
assert "PRIVATE-TOKEN" not in gl.headers
assert gl.headers["JOB-TOKEN"] == "CI_JOB_TOKEN"
assert isinstance(gl._auth, JobTokenAuth)
assert gl._auth.token == "CI_JOB_TOKEN"
assert p.headers["JOB-TOKEN"] == "CI_JOB_TOKEN"
assert "PRIVATE-TOKEN" not in p.headers
assert "Authorization" not in p.headers


def test_http_auth():
gl = Gitlab(
"http://localhost",
private_token="private_token",
http_username="foo",
http_password="bar",
api_version="4",
)
assert gl.private_token == "private_token"
p = PreparedRequest()
p.prepare(url=gl.url, auth=gl._auth)
assert gl.private_token is None
assert gl.oauth_token is None
assert gl.job_token is None
assert isinstance(gl._http_auth, requests.auth.HTTPBasicAuth)
assert gl.headers["PRIVATE-TOKEN"] == "private_token"
assert "Authorization" not in gl.headers
assert isinstance(gl._auth, requests.auth.HTTPBasicAuth)
assert gl._auth.username == "foo"
assert gl._auth.password == "bar"
assert p.headers["Authorization"] == "Basic Zm9vOmJhcg=="
assert "PRIVATE-TOKEN" not in p.headers
assert "JOB-TOKEN" not in p.headers


@pytest.mark.parametrize(
Expand Down

0 comments on commit 5f46cfd

Please sign in to comment.