-
Notifications
You must be signed in to change notification settings - Fork 61
/
cachefile.py
266 lines (214 loc) · 8.47 KB
/
cachefile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
cachefile.py - file with classes handling cached TLDs (e.g. downloads, updates)
.. Licence MIT
.. codeauthor:: Jan Lipovský <janlipovsky@gmail.com>, janlipovsky.cz
.. contributors: https://github.com/lipoja/URLExtract/graphs/contributors
"""
import logging
import os
import tempfile
import urllib.request
from datetime import datetime
from urllib.error import URLError, HTTPError
import idna
import filelock
from platformdirs import user_cache_dir
class CacheFileError(Exception):
"""Raised when some error occurred regarding file with cached TLDs."""
pass
class CacheFile:
"""Class for working with cached TLDs in file."""
# file name of cached list of TLDs downloaded from IANA
_CACHE_FILE_NAME = "tlds-alpha-by-domain.txt"
_DATA_DIR = "data"
# name used in appdir
_URLEXTRACT_NAME = "urlextract"
def __init__(self, cache_dir=None):
"""
:param str cache_dir: base path for TLD cache, defaults to data dir
:raises: CacheFileError when cached file is not readable for user
"""
self._logger = logging.getLogger(self._URLEXTRACT_NAME)
self._user_defined_cache_dir = cache_dir
self._default_cache_file = False
# full path for cached file with list of TLDs
self._tld_list_path = self._get_cache_file_path()
if not os.access(self._tld_list_path, os.F_OK):
self._logger.info(
"Cache file not found in '%s'. "
"Use URLExtract.update() to download newest version.",
self._tld_list_path,
)
self._logger.info(
"Using default list of TLDs provided in urlextract package."
)
self._tld_list_path = self._get_default_cache_file_path()
self._default_cache_file = True
def _get_default_cache_dir(self):
"""
Returns default cache directory (data directory)
:raises: CacheFileError when default cached file does not is exist
:return: path to default cache directory
:rtype: str
"""
return os.path.join(os.path.dirname(__file__), self._DATA_DIR)
def _get_default_cache_file_path(self):
"""
Returns default cache file path
:return: default cache file path (to data directory)
:rtype: str
"""
default_list_path = os.path.join(
self._get_default_cache_dir(), self._CACHE_FILE_NAME
)
if not os.access(default_list_path, os.F_OK):
raise CacheFileError(
"Default cache file does not exist " "'{}'!".format(default_list_path)
)
return default_list_path
def _get_writable_cache_dir(self):
"""
Get writable cache directory with fallback to user's cache directory
and global temp directory
:raises: CacheFileError when cached directory is not writable for user
:return: path to cache directory
:rtype: str
"""
dir_path_data = self._get_default_cache_dir()
if os.access(dir_path_data, os.W_OK):
self._default_cache_file = True
return dir_path_data
dir_path_user = user_cache_dir(self._URLEXTRACT_NAME)
if not os.path.exists(dir_path_user):
try:
os.makedirs(dir_path_user, exist_ok=True)
except PermissionError:
# if PermissionError exception is raised we should continue
# and try to set the last fallback dir
pass
if os.access(dir_path_user, os.W_OK):
return dir_path_user
dir_path_temp = tempfile.gettempdir()
if os.access(dir_path_temp, os.W_OK):
return dir_path_temp
raise CacheFileError("Cache directories are not writable.")
def _get_cache_file_path(self):
"""
Get path for cache file
:raises: CacheFileError when cached directory is not writable for user
:return: Full path to cached file with TLDs
:rtype: str
"""
if self._user_defined_cache_dir is None:
# Tries to get writable cache dir with fallback to users data dir
# and temp directory
cache_dir = self._get_writable_cache_dir()
else:
cache_dir = self._user_defined_cache_dir
if not os.access(self._user_defined_cache_dir, os.W_OK):
raise CacheFileError(
"Cache directory {} is not writable.".format(
self._user_defined_cache_dir
)
)
# get path for cached file
return os.path.join(cache_dir, self._CACHE_FILE_NAME)
def _get_cache_lock_file_path(self):
"""
Get path for cache file lock
:raises: CacheFileError when cached directory is not writable for user
:return: Full path to cached file lock
:rtype: str
"""
return self._get_cache_file_path() + ".lock"
def _download_tlds_list(self):
"""
Function downloads list of TLDs from IANA.
LINK: https://data.iana.org/TLD/tlds-alpha-by-domain.txt
:return: True if list was downloaded, False in case of an error
:rtype: bool
"""
url_list = "https://data.iana.org/TLD/tlds-alpha-by-domain.txt"
# Default cache file exist (set by _default_cache_file)
# and we want to write permission
if self._default_cache_file and not os.access(self._tld_list_path, os.W_OK):
self._logger.info("Default cache file is not writable.")
self._tld_list_path = self._get_cache_file_path()
self._logger.info("Changed path of cache file to: %s", self._tld_list_path)
if (
os.path.exists(self._tld_list_path)
and os.access(self._tld_list_path, os.F_OK)
and not os.access(self._tld_list_path, os.W_OK)
):
self._logger.error(
"ERROR: Cache file is not writable for current "
"user. ({})".format(self._tld_list_path)
)
return False
req = urllib.request.Request(url_list)
req.add_header(
"User-Agent",
"Mozilla/5.0 (Windows NT 6.0; "
"WOW64; rv:24.0) Gecko/20100101 "
"Firefox/24.0",
)
try:
with urllib.request.urlopen(req) as f:
page = f.read().decode("utf-8")
except HTTPError as e:
self._logger.error(
"ERROR: Can not download list of TLDs. "
"(HTTPError: {})".format(e.reason)
)
return False
except URLError as e:
self._logger.error(
"ERROR: Can not download list of TLDs. "
"(URLError: {})".format(e.reason)
)
return False
with filelock.FileLock(self._get_cache_lock_file_path()):
with open(self._tld_list_path, "w") as ftld:
ftld.write(page)
return True
def _load_cached_tlds(self):
"""
Loads TLDs from cached file to set.
:return: Set of current TLDs
:rtype: set
"""
# check if cached file is readable
if not os.access(self._tld_list_path, os.R_OK):
self._logger.error(
"Cached file is not readable for current "
"user. ({})".format(self._tld_list_path)
)
raise CacheFileError("Cached file is not readable for current user.")
set_of_tlds = set()
with filelock.FileLock(self._get_cache_lock_file_path()):
with open(self._tld_list_path, "r") as f_cache_tld:
for line in f_cache_tld:
tld = line.strip().lower()
# skip empty lines
if not tld:
continue
# skip comments
if tld[0] == "#":
continue
set_of_tlds.add("." + tld)
set_of_tlds.add("." + idna.decode(tld))
return set_of_tlds
def _get_last_cachefile_modification(self):
"""
Get last modification of cache file with TLDs.
:return: Date and time of last modification or
None when file does not exist
:rtype: datetime|None
"""
try:
mtime = os.path.getmtime(self._tld_list_path)
except OSError:
return None
return datetime.fromtimestamp(mtime)