-
Notifications
You must be signed in to change notification settings - Fork 1
/
sde_updater.py
129 lines (98 loc) · 4.79 KB
/
sde_updater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import os
import contextlib
import sys
import tempfile
import subprocess
from bz2 import BZ2File
from typing import Optional
from dateutil import parser
from datetime import datetime, timezone
import httpx
from tqdm import tqdm
UPDATE_TIMESTAMP_LOG = ".update.log"
def _last_line_in_file(file_path) -> Optional[str]:
if not os.path.isfile(file_path):
return None
# https://stackoverflow.com/a/54278929
with open(file_path, "rb") as f:
try: # catch OSError in case of a one line file
f.seek(-2, os.SEEK_END)
while f.read(1) != b"\n":
f.seek(-2, os.SEEK_CUR)
except OSError:
f.seek(0)
last_line = f.readline().decode().strip()
return last_line if last_line else None
def get_last_update_timestamp() -> datetime:
date_from_log = _last_line_in_file(UPDATE_TIMESTAMP_LOG)
if not date_from_log:
# File does not exist or is empty
return datetime(2003, 5, 6, tzinfo=timezone.utc)
return parser.parse(date_from_log).replace(tzinfo=timezone.utc)
def set_update_timestamp() -> None:
with open(UPDATE_TIMESTAMP_LOG, mode="a") as update_log:
timestamp = datetime.utcnow().isoformat()
update_log.write(f"{timestamp}\n")
def get_dump_timestamp(dump_url: Optional[str] = None) -> datetime:
if not dump_url:
dump_url = os.getenv("DB_DUMP_URL")
res = httpx.head(dump_url)
last_modified_header = res.headers.get("Last-Modified")
if not last_modified_header:
raise Exception("Last-Modified header missing.")
return parser.parse(last_modified_header)
def is_out_of_date() -> bool:
last_updated = get_last_update_timestamp()
dump_timestamp = get_dump_timestamp()
return last_updated < dump_timestamp
def get_dump_checksum(dump_url: str) -> Optional[str]:
checksum_url = f"{dump_url}.md5"
res = httpx.get(checksum_url)
if res.status_code == 200:
return res.text.split()[0] if res.text else None
@contextlib.contextmanager
def download_dump() -> str:
with tempfile.NamedTemporaryFile(suffix=".dmp.bz2") as download_file:
with httpx.stream("GET", os.getenv("DB_DUMP_URL"), headers={"Accept-Encoding": "identity"}) as response:
total = int(response.headers["Content-Length"])
with tqdm(total=total, unit_scale=True, unit_divisor=1024, unit="B") as progress:
num_bytes_downloaded = response.num_bytes_downloaded
for chunk in response.iter_bytes():
download_file.write(chunk)
progress.update(response.num_bytes_downloaded - num_bytes_downloaded)
num_bytes_downloaded = response.num_bytes_downloaded
yield download_file.name
@contextlib.contextmanager
def decompressed(bz2_file_path: str) -> str:
with tempfile.NamedTemporaryFile(suffix=".dmp") as file, BZ2File(bz2_file_path, "r") as bz2_file:
for data in iter(lambda: bz2_file.read(100 * 1024), b""):
file.write(data)
yield file.name
def _prepare_database(docker_executable: str, sde_container_name: str, sde_db_username: str, sde_db_name: str):
# Recreates the database with template0 to avoid any conflicts
# https://www.postgresql.org/docs/9.2/app-pgrestore.html#APP-PGRESTORE-EXAMPLES
drop_command = f"dropdb -U {sde_db_username} --if-exists {sde_db_name}"
create_command = f"createdb -U {sde_db_username} -T template0 {sde_db_name}"
if docker_executable:
drop_command = f"{docker_executable} exec -i {sde_container_name} " + drop_command
create_command = f"{docker_executable} exec -i {sde_container_name} " + create_command
subprocess.run(drop_command, shell=True)
subprocess.run(create_command, shell=True)
def restore_dump(dump_file: str) -> None:
with decompressed(dump_file) as dump:
docker_executable = os.getenv("DOCKER_EXECUTABLE")
sde_container_name = os.getenv("SDE_CONTAINER_NAME")
sde_db_username = os.getenv("SDE_DB_USERNAME")
sde_db_name = os.getenv("SDE_DB_NAME")
restore_cmd = f"pg_restore --no-owner -U {sde_db_username} -v -d {sde_db_name} < {dump}"
if docker_executable:
restore_cmd = f"{docker_executable} exec -i {sde_container_name} " + restore_cmd
try:
_prepare_database(docker_executable, sde_container_name, sde_db_username, sde_db_name)
subprocess.run(restore_cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
set_update_timestamp()
print("Database dump restored succesfully!")
except subprocess.CalledProcessError as e:
print("--- FAILED TO RESTORE DUMP ---", file=sys.stderr)
print(f"Error output:\n{e.stdout.decode('utf-8')}\n{e.stderr.decode('utf-8')}", file=sys.stderr)
exit(e.returncode)