-
Notifications
You must be signed in to change notification settings - Fork 8
/
merge_db.py
63 lines (52 loc) · 1.94 KB
/
merge_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""
Merges multiple history sqlite databases into one
"""
import locale
import warnings
from datetime import datetime
from itertools import chain
from typing import Iterator, Sequence, Set, Tuple
from .log import logger
from .model import Visit
from .common import PathIsh, expand_paths
from .parse_db import read_visits
# https://stackoverflow.com/a/10742904/9348376
locale.setlocale(locale.LC_ALL, "")
def format_num(num: int) -> str:
return f"{num:n}"
# not sure on the typing/Sequence's with splat here
# works fine though, each of these accept variadic arguments
# with either PathIsh-things or Iterator/List things w/ Visits
def read_and_merge(*input_databases: Sequence[PathIsh]) -> Iterator[Visit]:
"""
Receives variable amount of PathIsh as input,
reads Visits from each of those databases,
and merges them together (removing duplicates)
"""
database_histories: Sequence[Iterator[Visit]] = list(
map(read_visits, expand_paths(input_databases))
)
yield from merge_visits(*database_histories)
def merge_visits(*sources: Sequence[Iterator[Visit]]) -> Iterator[Visit]:
"""
Removes duplicate Visit items from multiple sources
"""
if len(sources) == 0:
warnings.warn("merge_visits received no sources!")
else:
logger.debug("merging information from {} databases...".format(len(sources)))
# use combination of URL, visit date and visit type to uniquely identify visits
emitted: Set[Tuple[str, datetime, int]] = set()
duplicates = 0
for vs in chain(*sources):
key = (vs.url, vs.visit_date, vs.visit_type)
if key in emitted:
# logger.debug(f"skipping {key} => {vs}")
duplicates += 1
continue
yield vs
emitted.add(key)
logger.debug("Summary: removed {} duplicates...".format(format_num(duplicates)))
logger.debug(
"Summary: returning {} visit entries...".format(format_num(len(emitted)))
)