-
Notifications
You must be signed in to change notification settings - Fork 360
/
neon_fixtures.py
3627 lines (3008 loc) · 125 KB
/
neon_fixtures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import abc
import asyncio
import enum
import filecmp
import json
import os
import re
import shutil
import socket
import subprocess
import tempfile
import textwrap
import time
import uuid
from collections import defaultdict
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from enum import Flag, auto
from functools import cached_property
from itertools import chain, product
from pathlib import Path
from types import TracebackType
from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
from urllib.parse import urlparse
import asyncpg
import backoff # type: ignore
import boto3
import jwt
import psycopg2
import pytest
import requests
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import cursor as PgCursor
from psycopg2.extensions import make_dsn, parse_dsn
from typing_extensions import Literal
from fixtures.log_helper import log
from fixtures.metrics import Metrics, parse_metrics
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
Fn,
allure_attach_from_dir,
get_self_dir,
subprocess_capture,
)
"""
This file contains pytest fixtures. A fixture is a test resource that can be
summoned by placing its name in the test's arguments.
A fixture is created with the decorator @pytest.fixture decorator.
See docs: https://docs.pytest.org/en/6.2.x/fixture.html
There are several environment variables that can control the running of tests:
NEON_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information.
There's no need to import this file to use it. It should be declared as a plugin
inside conftest.py, and that makes it available to all tests.
Don't import functions from this file, or pytest will emit warnings. Instead
put directly-importable functions into utils.py or another separate file.
"""
Env = Dict[str, str]
DEFAULT_OUTPUT_DIR: str = "test_output"
DEFAULT_BRANCH_NAME: str = "main"
DEFAULT_PG_VERSION_DEFAULT: str = "14"
BASE_PORT: int = 15000
WORKER_PORT_NUM: int = 1000
def pytest_configure(config: Config):
"""
Check that we do not overflow available ports range.
"""
numprocesses = config.getoption("numprocesses")
if (
numprocesses is not None and BASE_PORT + numprocesses * WORKER_PORT_NUM > 32768
): # do not use ephemeral ports
raise Exception("Too many workers configured. Cannot distribute ports for services.")
@pytest.fixture(scope="session")
def base_dir() -> Iterator[Path]:
# find the base directory (currently this is the git root)
base_dir = get_self_dir().parent.parent
log.info(f"base_dir is {base_dir}")
yield base_dir
@pytest.fixture(scope="session")
def neon_binpath(base_dir: Path) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
return
# Find the neon binaries.
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
else:
build_type = os.environ.get("BUILD_TYPE", "debug")
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
if not (binpath / "pageserver").exists():
raise Exception(f"neon binaries not found at '{binpath}'")
yield binpath
@pytest.fixture(scope="session")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
else:
distrib_dir = base_dir / "pg_install"
log.info(f"pg_distrib_dir is {distrib_dir}")
yield distrib_dir
@pytest.fixture(scope="session")
def top_output_dir(base_dir: Path) -> Iterator[Path]:
# Compute the top-level directory for all tests.
if env_test_output := os.environ.get("TEST_OUTPUT"):
output_dir = Path(env_test_output).resolve()
else:
output_dir = base_dir / DEFAULT_OUTPUT_DIR
output_dir.mkdir(exist_ok=True)
log.info(f"top_output_dir is {output_dir}")
yield output_dir
@pytest.fixture(scope="session")
def pg_version() -> Iterator[str]:
if env_default_pg_version := os.environ.get("DEFAULT_PG_VERSION"):
version = env_default_pg_version
else:
version = DEFAULT_PG_VERSION_DEFAULT
log.info(f"pg_version is {version}")
yield version
@pytest.fixture(scope="session")
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: str) -> Iterator[Path]:
versioned_dir = pg_distrib_dir / f"v{pg_version}"
psql_bin_path = versioned_dir / "bin/psql"
postgres_bin_path = versioned_dir / "bin/postgres"
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not psql_bin_path.exists():
raise Exception(f"psql not found at '{psql_bin_path}'")
else:
if not postgres_bin_path.exists():
raise Exception(f"postgres not found at '{postgres_bin_path}'")
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
yield versioned_dir
def shareable_scope(fixture_name: str, config: Config) -> Literal["session", "function"]:
"""Return either session of function scope, depending on TEST_SHARED_FIXTURES envvar.
This function can be used as a scope like this:
@pytest.fixture(scope=shareable_scope)
def myfixture(...)
...
"""
return "function" if os.environ.get("TEST_SHARED_FIXTURES") is None else "session"
@pytest.fixture(scope="session")
def worker_seq_no(worker_id: str) -> int:
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
# parse it to always get a number
if worker_id == "master":
return 0
assert worker_id.startswith("gw")
return int(worker_id[2:])
@pytest.fixture(scope="session")
def worker_base_port(worker_seq_no: int) -> int:
# so we divide ports in ranges of 100 ports
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * WORKER_PORT_NUM
def get_dir_size(path: str) -> int:
"""Return size in bytes."""
totalbytes = 0
for root, dirs, files in os.walk(path):
for name in files:
totalbytes += os.path.getsize(os.path.join(root, name))
return totalbytes
def can_bind(host: str, port: int) -> bool:
"""
Check whether a host:port is available to bind for listening
Inspired by the can_bind() perl function used in Postgres tests, in
vendor/postgres-v14/src/test/perl/PostgresNode.pm
"""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
# TODO: The pageserver and safekeepers don't use SO_REUSEADDR at the
# moment. If that changes, we should use start using SO_REUSEADDR here
# too, to allow reusing ports more quickly.
# See https://github.com/neondatabase/neon/issues/801
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((host, port))
sock.listen()
return True
except socket.error:
log.info(f"Port {port} is in use, skipping")
return False
finally:
sock.close()
class PortDistributor:
def __init__(self, base_port: int, port_number: int):
self.iterator = iter(range(base_port, base_port + port_number))
self.port_map: Dict[int, int] = {}
def get_port(self) -> int:
for port in self.iterator:
if can_bind("localhost", port):
return port
raise RuntimeError(
"port range configured for test is exhausted, consider enlarging the range"
)
def replace_with_new_port(self, value: Union[int, str]) -> Union[int, str]:
"""
Returns a new port for a port number in a string (like "localhost:1234") or int.
Replacements are memorised, so a substitution for the same port is always the same.
"""
# TODO: replace with structural pattern matching for Python >= 3.10
if isinstance(value, int):
return self._replace_port_int(value)
if isinstance(value, str):
return self._replace_port_str(value)
raise TypeError(f"unsupported type {type(value)} of {value=}")
def _replace_port_int(self, value: int) -> int:
known_port = self.port_map.get(value)
if known_port is None:
known_port = self.port_map[value] = self.get_port()
return known_port
def _replace_port_str(self, value: str) -> str:
# Use regex to find port in a string
# urllib.parse.urlparse produces inconvenient results for cases without scheme like "localhost:5432"
# See https://bugs.python.org/issue27657
ports = re.findall(r":(\d+)(?:/|$)", value)
assert len(ports) == 1, f"can't find port in {value}"
port_int = int(ports[0])
return value.replace(f":{port_int}", f":{self._replace_port_int(port_int)}")
@pytest.fixture(scope="session")
def port_distributor(worker_base_port: int) -> PortDistributor:
return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM)
@pytest.fixture(scope="function")
def default_broker(
port_distributor: PortDistributor,
test_output_dir: Path,
neon_binpath: Path,
) -> Iterator[NeonBroker]:
# multiple pytest sessions could get launched in parallel, get them different ports/datadirs
client_port = port_distributor.get_port()
broker_logfile = test_output_dir / "repo" / "storage_broker.log"
broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath)
yield broker
broker.stop()
@pytest.fixture(scope="session")
def run_id() -> Iterator[uuid.UUID]:
yield uuid.uuid4()
@pytest.fixture(scope="session")
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
mock_s3_server = MockS3Server(port_distributor.get_port())
yield mock_s3_server
mock_s3_server.kill()
class PgProtocol:
"""Reusable connection logic"""
def __init__(self, **kwargs: Any):
self.default_options = kwargs
def connstr(self, **kwargs: Any) -> str:
"""
Build a libpq connection string for the Postgres instance.
"""
return str(make_dsn(**self.conn_options(**kwargs)))
def conn_options(self, **kwargs: Any) -> Dict[str, Any]:
"""
Construct a dictionary of connection options from default values and extra parameters.
An option can be dropped from the returning dictionary by None-valued extra parameter.
"""
result = self.default_options.copy()
if "dsn" in kwargs:
result.update(parse_dsn(kwargs["dsn"]))
result.update(kwargs)
result = {k: v for k, v in result.items() if v is not None}
# Individual statement timeout in seconds. 2 minutes should be
# enough for our tests, but if you need a longer, you can
# change it by calling "SET statement_timeout" after
# connecting.
options = result.get("options", "")
if "statement_timeout" not in options:
options = f"-cstatement_timeout=120s {options}"
result["options"] = options
return result
# autocommit=True here by default because that's what we need most of the time
def connect(self, autocommit: bool = True, **kwargs: Any) -> PgConnection:
"""
Connect to the node.
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn = psycopg2.connect(**self.conn_options(**kwargs))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
return conn
@contextmanager
def cursor(self, autocommit: bool = True, **kwargs: Any) -> Iterator[PgCursor]:
"""
Shorthand for pg.connect().cursor().
The cursor and connection are closed when the context is exited.
"""
with closing(self.connect(autocommit=autocommit, **kwargs)) as conn:
yield conn.cursor()
async def connect_async(self, **kwargs: Any) -> asyncpg.Connection:
"""
Connect to the node from async python.
Returns asyncpg's connection object.
"""
# asyncpg takes slightly different options than psycopg2. Try
# to convert the defaults from the psycopg2 format.
# The psycopg2 option 'dbname' is called 'database' is asyncpg
conn_options = self.conn_options(**kwargs)
if "dbname" in conn_options:
conn_options["database"] = conn_options.pop("dbname")
# Convert options='-c<key>=<val>' to server_settings
if "options" in conn_options:
options = conn_options.pop("options")
for match in re.finditer(r"-c(\w*)=(\w*)", options):
key = match.group(1)
val = match.group(2)
if "server_options" in conn_options:
conn_options["server_settings"].update({key: val})
else:
conn_options["server_settings"] = {key: val}
return await asyncpg.connect(**conn_options)
def safe_psql(self, query: str, **kwargs: Any) -> List[Tuple[Any, ...]]:
"""
Execute query against the node and return all rows.
This method passes all extra params to connstr.
"""
return self.safe_psql_many([query], **kwargs)[0]
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
"""
Execute queries against the node and return all rows.
This method passes all extra params to connstr.
"""
result: List[List[Any]] = []
with closing(self.connect(**kwargs)) as conn:
with conn.cursor() as cur:
for query in queries:
log.info(f"Executing query: {query}")
cur.execute(query)
if cur.description is None:
result.append([]) # query didn't return data
else:
result.append(cur.fetchall())
return result
@dataclass
class AuthKeys:
pub: str
priv: str
def generate_token(self, *, scope: str, **token_data: str) -> str:
token = jwt.encode({"scope": scope, **token_data}, self.priv, algorithm="EdDSA")
# cast(Any, self.priv)
# jwt.encode can return 'bytes' or 'str', depending on Python version or type
# hinting or something (not sure what). If it returned 'bytes', convert it to 'str'
# explicitly.
if isinstance(token, bytes):
token = token.decode()
return token
def generate_pageserver_token(self) -> str:
return self.generate_token(scope="pageserverapi")
def generate_safekeeper_token(self) -> str:
return self.generate_token(scope="safekeeperdata")
def generate_tenant_token(self, tenant_id: TenantId) -> str:
return self.generate_token(scope="tenant", tenant_id=str(tenant_id))
class MockS3Server:
"""
Starts a mock S3 server for testing on a port given, errors if the server fails to start or exits prematurely.
Relies that `poetry` and `moto` server are installed, since it's the way the tests are run.
Also provides a set of methods to derive the connection properties from and the method to kill the underlying server.
"""
def __init__(
self,
port: int,
):
self.port = port
# XXX: do not use `shell=True` or add `exec ` to the command here otherwise.
# We use `self.subprocess.kill()` to shut down the server, which would not "just" work in Linux
# if a process is started from the shell process.
self.subprocess = subprocess.Popen(["poetry", "run", "moto_server", "s3", f"-p{port}"])
error = None
try:
return_code = self.subprocess.poll()
if return_code is not None:
error = f"expected mock s3 server to run but it exited with code {return_code}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
except Exception as e:
error = f"expected mock s3 server to start but it failed with exception: {e}. stdout: '{self.subprocess.stdout}', stderr: '{self.subprocess.stderr}'"
if error is not None:
log.error(error)
self.kill()
raise RuntimeError("failed to start s3 mock server")
def endpoint(self) -> str:
return f"http://127.0.0.1:{self.port}"
def region(self) -> str:
return "us-east-1"
def access_key(self) -> str:
return "test"
def secret_key(self) -> str:
return "test"
def kill(self):
self.subprocess.kill()
@enum.unique
class RemoteStorageKind(str, enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
# Pass to tests that are generic to remote storage
# to ensure the test pass with or without the remote storage
NOOP = "noop"
def available_remote_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
log.info("Using mock implementations to test remote storage")
return remote_storages
@dataclass
class LocalFsStorage:
root: Path
@dataclass
class S3Storage:
bucket_name: str
bucket_region: str
access_key: str
secret_key: str
endpoint: Optional[str] = None
prefix_in_bucket: Optional[str] = None
def access_env_vars(self) -> Dict[str, str]:
return {
"AWS_ACCESS_KEY_ID": self.access_key,
"AWS_SECRET_ACCESS_KEY": self.secret_key,
}
RemoteStorage = Union[LocalFsStorage, S3Storage]
# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
if isinstance(remote_storage, LocalFsStorage):
remote_storage_config = f"local_path='{remote_storage.root}'"
elif isinstance(remote_storage, S3Storage):
remote_storage_config = f"bucket_name='{remote_storage.bucket_name}',\
bucket_region='{remote_storage.bucket_region}'"
if remote_storage.prefix_in_bucket is not None:
remote_storage_config += f",prefix_in_bucket='{remote_storage.prefix_in_bucket}'"
if remote_storage.endpoint is not None:
remote_storage_config += f",endpoint='{remote_storage.endpoint}'"
else:
raise Exception("invalid remote storage type")
return f"{{{remote_storage_config}}}"
class RemoteStorageUsers(Flag):
PAGESERVER = auto()
SAFEKEEPER = auto()
class NeonEnvBuilder:
"""
Builder object to create a Neon runtime environment
You should use the `neon_env_builder` or `neon_simple_env` pytest
fixture to create the NeonEnv object. That way, the repository is
created in the right directory, based on the test name, and it's properly
cleaned up after the test has finished.
"""
def __init__(
self,
repo_dir: Path,
port_distributor: PortDistributor,
broker: NeonBroker,
run_id: uuid.UUID,
mock_s3_server: MockS3Server,
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
remote_storage: Optional[RemoteStorage] = None,
remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
# Use non-standard SK ids to check for various parsing bugs
safekeepers_id_start: int = 0,
# fsync is disabled by default to make the tests go faster
safekeepers_enable_fsync: bool = False,
auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
default_branch_name: str = DEFAULT_BRANCH_NAME,
preserve_database_files: bool = False,
initial_tenant: Optional[TenantId] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.remote_storage = remote_storage
self.remote_storage_users = remote_storage_users
self.broker = broker
self.run_id = run_id
self.mock_s3_server = mock_s3_server
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
self.safekeepers_id_start = safekeepers_id_start
self.safekeepers_enable_fsync = safekeepers_enable_fsync
self.auth_enabled = auth_enabled
self.default_branch_name = default_branch_name
self.env: Optional[NeonEnv] = None
self.remote_storage_prefix: Optional[str] = None
self.keep_remote_storage_contents: bool = True
self.neon_binpath = neon_binpath
self.pg_distrib_dir = pg_distrib_dir
self.pg_version = pg_version
self.preserve_database_files = preserve_database_files
self.initial_tenant = initial_tenant or TenantId.generate()
def init_configs(self) -> NeonEnv:
# Cannot create more than one environment from one builder
assert self.env is None, "environment already initialized"
self.env = NeonEnv(self)
return self.env
def start(self):
assert self.env is not None, "environment is not already initialized, call init() first"
self.env.start()
def init_start(self) -> NeonEnv:
env = self.init_configs()
self.start()
# Prepare the default branch to start the postgres on later.
# Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API.
log.info(
f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline"
)
initial_tenant, initial_timeline = env.neon_cli.create_tenant(tenant_id=env.initial_tenant)
env.initial_timeline = initial_timeline
log.info(f"Initial timeline {initial_tenant}/{initial_timeline} created successfully")
return env
def enable_remote_storage(
self,
remote_storage_kind: RemoteStorageKind,
test_name: str,
force_enable: bool = True,
):
if remote_storage_kind == RemoteStorageKind.NOOP:
return
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable)
else:
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
def enable_local_fs_remote_storage(self, force_enable: bool = True):
"""
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage"))
def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = True):
"""
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
Starts up the mock server, if that does not run yet.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
mock_endpoint = self.mock_s3_server.endpoint()
mock_region = self.mock_s3_server.region()
self.remote_storage_client = boto3.client(
"s3",
endpoint_url=mock_endpoint,
region_name=mock_region,
aws_access_key_id=self.mock_s3_server.access_key(),
aws_secret_access_key=self.mock_s3_server.secret_key(),
)
self.remote_storage_client.create_bucket(Bucket=bucket_name)
self.remote_storage = S3Storage(
bucket_name=bucket_name,
endpoint=mock_endpoint,
bucket_region=mock_region,
access_key=self.mock_s3_server.access_key(),
secret_key=self.mock_s3_server.secret_key(),
)
def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = True):
"""
Sets up configuration to use real s3 endpoint without mock server
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
access_key = os.getenv("AWS_ACCESS_KEY_ID")
assert access_key, "no aws access key provided"
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
assert secret_key, "no aws access key provided"
# session token is needed for local runs with sso auth
session_token = os.getenv("AWS_SESSION_TOKEN")
bucket_name = os.getenv("REMOTE_STORAGE_S3_BUCKET")
assert bucket_name, "no remote storage bucket name provided"
region = os.getenv("REMOTE_STORAGE_S3_REGION")
assert region, "no remote storage region provided"
# do not leave data in real s3
self.keep_remote_storage_contents = False
# construct a prefix inside bucket for the particular test case and test run
self.remote_storage_prefix = f"{self.run_id}/{test_name}"
self.remote_storage_client = boto3.client(
"s3",
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
aws_session_token=session_token,
)
self.remote_storage = S3Storage(
bucket_name=bucket_name,
bucket_region=region,
access_key=access_key,
secret_key=secret_key,
prefix_in_bucket=self.remote_storage_prefix,
)
def cleanup_local_storage(self):
if self.preserve_database_files:
return
directories_to_clean: List[Path] = []
for test_entry in Path(self.repo_dir).glob("**/*"):
if test_entry.is_file():
test_file = test_entry
if ATTACHMENT_NAME_REGEX.fullmatch(test_file.name):
continue
if SMALL_DB_FILE_NAME_REGEX.fullmatch(test_file.name):
continue
log.debug(f"Removing large database {test_file} file")
test_file.unlink()
elif test_entry.is_dir():
directories_to_clean.append(test_entry)
for directory_to_clean in reversed(directories_to_clean):
if not os.listdir(directory_to_clean):
log.debug(f"Removing empty directory {directory_to_clean}")
directory_to_clean.rmdir()
def cleanup_remote_storage(self):
# here wee check for true remote storage, no the local one
# local cleanup is not needed after test because in ci all env will be destroyed anyway
if self.remote_storage_prefix is None:
log.info("no remote storage was set up, skipping cleanup")
return
# Making mypy happy with allowing only `S3Storage` further.
# `self.remote_storage_prefix` is coupled with `S3Storage` storage type,
# so this line effectively a no-op
assert isinstance(self.remote_storage, S3Storage)
if self.keep_remote_storage_contents:
log.info("keep_remote_storage_contents skipping remote storage cleanup")
return
log.info(
"removing data from test s3 bucket %s by prefix %s",
self.remote_storage.bucket_name,
self.remote_storage_prefix,
)
paginator = self.remote_storage_client.get_paginator("list_objects_v2")
pages = paginator.paginate(
Bucket=self.remote_storage.bucket_name,
Prefix=self.remote_storage_prefix,
)
# Using Any because DeleteTypeDef (from boto3-stubs) doesn't fit our case
objects_to_delete: Any = {"Objects": []}
cnt = 0
for item in pages.search("Contents"):
# weirdly when nothing is found it returns [None]
if item is None:
break
objects_to_delete["Objects"].append({"Key": item["Key"]})
# flush once aws limit reached
if len(objects_to_delete["Objects"]) >= 1000:
self.remote_storage_client.delete_objects(
Bucket=self.remote_storage.bucket_name,
Delete=objects_to_delete,
)
objects_to_delete = {"Objects": []}
cnt += 1
# flush rest
if len(objects_to_delete["Objects"]):
self.remote_storage_client.delete_objects(
Bucket=self.remote_storage.bucket_name,
Delete=objects_to_delete,
)
log.info(f"deleted {cnt} objects from remote storage")
def __enter__(self) -> "NeonEnvBuilder":
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
):
# Stop all the nodes.
if self.env:
log.info("Cleaning up all storage and compute nodes")
self.env.endpoints.stop_all()
for sk in self.env.safekeepers:
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
cleanup_error = None
try:
self.cleanup_remote_storage()
except Exception as e:
log.error(f"Error during remote storage cleanup: {e}")
cleanup_error = e
try:
self.cleanup_local_storage()
except Exception as e:
log.error(f"Error during local storage cleanup: {e}")
if cleanup_error is not None:
cleanup_error = e
if cleanup_error is not None:
raise cleanup_error
self.env.pageserver.assert_no_errors()
class NeonEnv:
"""
An object representing the Neon runtime environment. It consists of
the page server, 0-N safekeepers, and the compute nodes.
NeonEnv contains functions for stopping/starting nodes in the
environment, checking their status, creating tenants, connecting to the
nodes, creating and destroying compute nodes, etc. The page server and
the safekeepers are considered fixed in the environment, you cannot
create or destroy them after the environment is initialized. (That will
likely change in the future, as we start supporting multiple page
servers and adding/removing safekeepers on the fly).
Some notable functions and fields in NeonEnv:
postgres - A factory object for creating postgres compute nodes.
pageserver - An object that contains functions for manipulating and
connecting to the pageserver
safekeepers - An array containing objects representing the safekeepers
pg_bin - pg_bin.run() can be used to execute Postgres client binaries,
like psql or pg_dump
initial_tenant - tenant ID of the initial tenant created in the repository
neon_cli - can be used to run the 'neon' CLI tool
create_tenant() - initializes a new tenant in the page server, returns
the tenant id
"""
def __init__(self, config: NeonEnvBuilder):
self.repo_dir = config.repo_dir
self.rust_log_override = config.rust_log_override
self.port_distributor = config.port_distributor
self.s3_mock_server = config.mock_s3_server
self.neon_cli = NeonCli(env=self)
self.endpoints = EndpointFactory(self)
self.safekeepers: List[Safekeeper] = []
self.broker = config.broker
self.remote_storage = config.remote_storage
self.remote_storage_users = config.remote_storage_users
self.pg_version = config.pg_version
self.neon_binpath = config.neon_binpath
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
self.initial_tenant = config.initial_tenant
self.initial_timeline: Optional[TimelineId] = None
# Create a config file corresponding to the options
toml = textwrap.dedent(
f"""
default_tenant_id = '{config.initial_tenant}'
"""
)
toml += textwrap.dedent(
f"""
[broker]
listen_addr = '{self.broker.listen_addr()}'
"""
)
# Create config for pageserver
pageserver_port = PageserverPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
)
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
toml += textwrap.dedent(
f"""
[pageserver]
id=1
listen_pg_addr = 'localhost:{pageserver_port.pg}'
listen_http_addr = 'localhost:{pageserver_port.http}'
pg_auth_type = '{pg_auth_type}'
http_auth_type = '{http_auth_type}'
"""
)
# Create a corresponding NeonPageserver object
self.pageserver = NeonPageserver(
self, port=pageserver_port, config_override=config.pageserver_config_override
)
# Create config and a Safekeeper object for each safekeeper
for i in range(1, config.num_safekeepers + 1):
port = SafekeeperPort(
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
)
id = config.safekeepers_id_start + i # assign ids sequentially
toml += textwrap.dedent(
f"""
[[safekeepers]]
id = {id}
pg_port = {port.pg}
http_port = {port.http}
sync = {'true' if config.safekeepers_enable_fsync else 'false'}"""
)
if config.auth_enabled:
toml += textwrap.dedent(
"""
auth_enabled = true
"""
)
if (
bool(self.remote_storage_users & RemoteStorageUsers.SAFEKEEPER)
and self.remote_storage is not None
):
toml += textwrap.dedent(
f"""
remote_storage = "{remote_storage_to_toml_inline_table(self.remote_storage)}"
"""
)
safekeeper = Safekeeper(env=self, id=id, port=port)
self.safekeepers.append(safekeeper)
log.info(f"Config: {toml}")
self.neon_cli.init(toml)
def start(self):
# Start up broker, pageserver and all safekeepers
self.broker.try_start()
self.pageserver.start()
for safekeeper in self.safekeepers:
safekeeper.start()
def get_safekeeper_connstrs(self) -> str:
"""Get list of safekeeper endpoints suitable for safekeepers GUC"""
return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers)
def timeline_dir(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
return self.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)