-
Notifications
You must be signed in to change notification settings - Fork 7
/
utils_thirdparty.py
3205 lines (2674 loc) · 105 KB
/
utils_thirdparty.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# ScanCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/skeleton for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
from collections import defaultdict
import email
import itertools
import operator
import os
import re
import shutil
import subprocess
import tarfile
import tempfile
import time
import urllib
import attr
import license_expression
import packageurl
import requests
import saneyaml
import utils_pip_compatibility_tags
import utils_pypi_supported_tags
from commoncode import fileutils
from commoncode.hash import multi_checksums
from commoncode.text import python_safe_name
from packaging import tags as packaging_tags
from packaging import version as packaging_version
from utils_requirements import load_requirements
"""
Utilities to manage Python thirparty libraries source, binaries and metadata in
local directories and remote repositories.
- update pip requirement files from installed packages for prod. and dev.
- build and save wheels for all required packages
- also build variants for wheels with native code for all each supported
operating systems (Linux, macOS, Windows) and Python versions (3.x)
combinations using remote Ci jobs
- collect source distributions for all required packages
- keep in sync wheels, distributions, ABOUT and LICENSE files to a PyPI-like
repository (using GitHub)
- create, update and fetch ABOUT, NOTICE and LICENSE metadata for all distributions
Approach
--------
The processing is organized around these key objects:
- A PyPiPackage represents a PyPI package with its name and version. It tracks
the downloadable Distribution objects for that version:
- one Sdist source Distribution object
- a list of Wheel binary Distribution objects
- A Distribution (either a Wheel or Sdist) is identified by and created from its
filename. It also has the metadata used to populate an .ABOUT file and
document origin and license. A Distribution can be fetched from Repository.
Metadata can be loaded from and dumped to ABOUT files and optionally from
DejaCode package data.
- An Environment is a combination of a Python version and operating system.
A Wheel Distribution also has Python/OS tags is supports and these can be
supported in a given Environment.
- Paths or URLs to "filenames" live in a Repository, either a plain
LinksRepository (an HTML page listing URLs or a local directory) or a
PypiRepository (a PyPI simple index where each package name has an HTML page
listing URLs to all distribution types and versions).
Repositories and Distributions are related through filenames.
The Wheel models code is partially derived from the mit-licensed pip and the
Distribution/Wheel/Sdist design has been heavily inspired by the packaging-
dists library https://github.com/uranusjr/packaging-dists by Tzu-ping Chung
"""
TRACE = False
# Supported environments
PYTHON_VERSIONS = "36", "37", "38", "39", "310"
PYTHON_DOT_VERSIONS_BY_VER = {
"36": "3.6",
"37": "3.7",
"38": "3.8",
"39": "3.9",
"310": "3.10",
}
def get_python_dot_version(version):
"""
Return a dot version from a plain, non-dot version.
"""
return PYTHON_DOT_VERSIONS_BY_VER[version]
ABIS_BY_PYTHON_VERSION = {
"36": ["cp36", "cp36m"],
"37": ["cp37", "cp37m"],
"38": ["cp38", "cp38m"],
"39": ["cp39", "cp39m"],
"310": ["cp310", "cp310m"],
"36": ["cp36", "abi3"],
"37": ["cp37", "abi3"],
"38": ["cp38", "abi3"],
"39": ["cp39", "abi3"],
"310": ["cp310", "abi3"],
}
PLATFORMS_BY_OS = {
"linux": [
"linux_x86_64",
"manylinux1_x86_64",
"manylinux2010_x86_64",
"manylinux2014_x86_64",
],
"macos": [
"macosx_10_6_intel",
"macosx_10_6_x86_64",
"macosx_10_9_intel",
"macosx_10_9_x86_64",
"macosx_10_10_intel",
"macosx_10_10_x86_64",
"macosx_10_11_intel",
"macosx_10_11_x86_64",
"macosx_10_12_intel",
"macosx_10_12_x86_64",
"macosx_10_13_intel",
"macosx_10_13_x86_64",
"macosx_10_14_intel",
"macosx_10_14_x86_64",
"macosx_10_15_intel",
"macosx_10_15_x86_64",
"macosx_11_0_x86_64",
"macosx_11_intel",
# 'macosx_11_0_arm64',
],
"windows": [
"win_amd64",
],
}
THIRDPARTY_DIR = "thirdparty"
CACHE_THIRDPARTY_DIR = ".cache/thirdparty"
REMOTE_LINKS_URL = "https://thirdparty.aboutcode.org/pypi"
EXTENSIONS_APP = (".pyz",)
EXTENSIONS_SDIST = (
".tar.gz",
".tar.bz2",
".zip",
".tar.xz",
)
EXTENSIONS_INSTALLABLE = EXTENSIONS_SDIST + (".whl",)
EXTENSIONS_ABOUT = (
".ABOUT",
".LICENSE",
".NOTICE",
)
EXTENSIONS = EXTENSIONS_INSTALLABLE + EXTENSIONS_ABOUT + EXTENSIONS_APP
PYPI_SIMPLE_URL = "https://pypi.org/simple"
LICENSEDB_API_URL = "https://scancode-licensedb.aboutcode.org"
LICENSING = license_expression.Licensing()
# time to wait build for in seconds, as a string
# 0 measn no wait
DEFAULT_ROMP_BUILD_WAIT = "5"
################################################################################
#
# Fetch remote wheels and sources locally
#
################################################################################
def fetch_wheels(
environment=None,
requirements_file="requirements.txt",
allow_unpinned=False,
dest_dir=THIRDPARTY_DIR,
remote_links_url=REMOTE_LINKS_URL,
):
"""
Download all of the wheel of packages listed in the ``requirements_file``
requirements file into ``dest_dir`` directory.
Only get wheels for the ``environment`` Enviromnent constraints. If the
provided ``environment`` is None then the current Python interpreter
environment is used implicitly.
Only accept pinned requirements (e.g. with a version) unless
``allow_unpinned`` is True.
Use exclusively direct downloads from a remote repo at URL
``remote_links_url``. If ``remote_links_url`` is a path, use this as a
directory of links instead of a URL.
Yield tuples of (PypiPackage, error) where is None on success.
"""
missed = []
if not allow_unpinned:
force_pinned = True
else:
force_pinned = False
try:
rrp = list(
get_required_remote_packages(
requirements_file=requirements_file,
force_pinned=force_pinned,
remote_links_url=remote_links_url,
)
)
except Exception as e:
raise Exception(
dict(
requirements_file=requirements_file,
force_pinned=force_pinned,
remote_links_url=remote_links_url,
)
) from e
fetched_filenames = set()
for name, version, package in rrp:
if not package:
missed.append(
(
name,
version,
)
)
nv = f"{name}=={version}" if version else name
yield None, f"fetch_wheels: Missing package in remote repo: {nv}"
else:
fetched_filename = package.fetch_wheel(
environment=environment,
fetched_filenames=fetched_filenames,
dest_dir=dest_dir,
)
if fetched_filename:
fetched_filenames.add(fetched_filename)
error = None
else:
if fetched_filename in fetched_filenames:
error = None
else:
error = f"Failed to fetch"
yield package, error
if missed:
rr = get_remote_repo()
print()
print(f"===> fetch_wheels: Missed some packages")
for n, v in missed:
nv = f"{n}=={v}" if v else n
print(f"Missed package {nv} in remote repo, has only:")
for pv in rr.get_versions(n):
print(" ", pv)
raise Exception("Missed some packages in remote repo")
def fetch_sources(
requirements_file="requirements.txt",
allow_unpinned=False,
dest_dir=THIRDPARTY_DIR,
remote_links_url=REMOTE_LINKS_URL,
):
"""
Download all of the dependent package sources listed in the
``requirements_file`` requirements file into ``dest_dir`` destination
directory.
Use direct downloads to achieve this (not pip download). Use exclusively the
packages found from a remote repo at URL ``remote_links_url``. If
``remote_links_url`` is a path, use this as a directory of links instead of
a URL.
Only accept pinned requirements (e.g. with a version) unless
``allow_unpinned`` is True.
Yield tuples of (PypiPackage, error message) for each package where error
message will empty on success.
"""
missed = []
if not allow_unpinned:
force_pinned = True
else:
force_pinned = False
rrp = list(
get_required_remote_packages(
requirements_file=requirements_file,
force_pinned=force_pinned,
remote_links_url=remote_links_url,
)
)
for name, version, package in rrp:
if not package:
missed.append(
(
name,
name,
)
)
nv = f"{name}=={version}" if version else name
yield None, f"fetch_sources: Missing package in remote repo: {nv}"
elif not package.sdist:
yield package, f"Missing sdist in links"
else:
fetched = package.fetch_sdist(dest_dir=dest_dir)
error = f"Failed to fetch" if not fetched else None
yield package, error
if missed:
raise Exception(f"Missing source packages in {remote_links_url}", missed)
################################################################################
#
# Core models
#
################################################################################
@attr.attributes
class NameVer:
name = attr.ib(
type=str,
metadata=dict(help="Python package name, lowercase and normalized."),
)
version = attr.ib(
type=str,
metadata=dict(help="Python package version string."),
)
@property
def normalized_name(self):
return NameVer.normalize_name(self.name)
@staticmethod
def normalize_name(name):
"""
Return a normalized package name per PEP503, and copied from
https://www.python.org/dev/peps/pep-0503/#id4
"""
return name and re.sub(r"[-_.]+", "-", name).lower() or name
@staticmethod
def standardize_name(name):
"""
Return a standardized package name, e.g. lowercased and using - not _
"""
return name and re.sub(r"[-_]+", "-", name).lower() or name
@property
def name_ver(self):
return f"{self.name}-{self.version}"
def sortable_name_version(self):
"""
Return a tuple of values to sort by name, then version.
This method is a suitable to use as key for sorting NameVer instances.
"""
return self.normalized_name, packaging_version.parse(self.version)
@classmethod
def sorted(cls, namevers):
return sorted(namevers, key=cls.sortable_name_version)
@attr.attributes
class Distribution(NameVer):
# field names that can be updated from another dist of mapping
updatable_fields = [
"license_expression",
"copyright",
"description",
"homepage_url",
"primary_language",
"notice_text",
"extra_data",
]
filename = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="File name."),
)
path_or_url = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="Path or download URL."),
)
sha256 = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="SHA256 checksum."),
)
sha1 = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="SHA1 checksum."),
)
md5 = attr.ib(
repr=False,
type=int,
default=0,
metadata=dict(help="MD5 checksum."),
)
type = attr.ib(
repr=False,
type=str,
default="pypi",
metadata=dict(help="Package type"),
)
namespace = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="Package URL namespace"),
)
qualifiers = attr.ib(
repr=False,
type=dict,
default=attr.Factory(dict),
metadata=dict(help="Package URL qualifiers"),
)
subpath = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="Package URL subpath"),
)
size = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="Size in bytes."),
)
primary_language = attr.ib(
repr=False,
type=str,
default="Python",
metadata=dict(help="Primary Programming language."),
)
description = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="Description."),
)
homepage_url = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="Homepage URL"),
)
notes = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="Notes."),
)
copyright = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="Copyright."),
)
license_expression = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="License expression"),
)
licenses = attr.ib(
repr=False,
type=list,
default=attr.Factory(list),
metadata=dict(help="List of license mappings."),
)
notice_text = attr.ib(
repr=False,
type=str,
default="",
metadata=dict(help="Notice text"),
)
extra_data = attr.ib(
repr=False,
type=dict,
default=attr.Factory(dict),
metadata=dict(help="Extra data"),
)
@property
def package_url(self):
"""
Return a Package URL string of self.
"""
return str(packageurl.PackageURL(**self.purl_identifiers()))
@property
def download_url(self):
if self.path_or_url and self.path_or_url.startswith("https://"):
return self.path_or_url
else:
return self.get_best_download_url()
@property
def about_filename(self):
return f"{self.filename}.ABOUT"
def has_about_file(self, dest_dir=THIRDPARTY_DIR):
return os.path.exists(os.path.join(dest_dir, self.about_filename))
@property
def about_download_url(self):
return self.build_remote_download_url(self.about_filename)
@property
def notice_filename(self):
return f"{self.filename}.NOTICE"
@property
def notice_download_url(self):
return self.build_remote_download_url(self.notice_filename)
@classmethod
def from_path_or_url(cls, path_or_url):
"""
Return a distribution built from the data found in the filename of a
`path_or_url` string. Raise an exception if this is not a valid
filename.
"""
filename = os.path.basename(path_or_url.strip("/"))
dist = cls.from_filename(filename)
dist.path_or_url = path_or_url
return dist
@classmethod
def get_dist_class(cls, filename):
if filename.endswith(".whl"):
return Wheel
elif filename.endswith(
(
".zip",
".tar.gz",
)
):
return Sdist
raise InvalidDistributionFilename(filename)
@classmethod
def from_filename(cls, filename):
"""
Return a distribution built from the data found in a `filename` string.
Raise an exception if this is not a valid filename
"""
clazz = cls.get_dist_class(filename)
return clazz.from_filename(filename)
@classmethod
def from_data(cls, data, keep_extra=False):
"""
Return a distribution built from a `data` mapping.
"""
filename = data["filename"]
dist = cls.from_filename(filename)
dist.update(data, keep_extra=keep_extra)
return dist
@classmethod
def from_dist(cls, data, dist):
"""
Return a distribution built from a `data` mapping and update it with data
from another dist Distribution. Return None if it cannot be created
"""
# We can only create from a dist of the same package
has_same_key_fields = all(
data.get(kf) == getattr(dist, kf, None) for kf in ("type", "namespace", "name")
)
if not has_same_key_fields:
print(
f"Missing key fields: Cannot derive a new dist from data: {data} and dist: {dist}"
)
return
has_key_field_values = all(data.get(kf) for kf in ("type", "name", "version"))
if not has_key_field_values:
print(
f"Missing key field values: Cannot derive a new dist from data: {data} and dist: {dist}"
)
return
data = dict(data)
# do not overwrite the data with the other dist
# only supplement
data.update({k: v for k, v in dist.get_updatable_data().items() if not data.get(k)})
return cls.from_data(data)
@classmethod
def build_remote_download_url(cls, filename, base_url=REMOTE_LINKS_URL):
"""
Return a direct download URL for a file in our remote repo
"""
return f"{base_url}/{filename}"
def get_best_download_url(self):
"""
Return the best download URL for this distribution where best means that
PyPI is better and our own remote repo URLs are second.
If none is found, return a synthetic remote URL.
"""
name = self.normalized_name
version = self.version
filename = self.filename
pypi_package = get_pypi_package(name=name, version=version)
if pypi_package:
pypi_url = pypi_package.get_url_for_filename(filename)
if pypi_url:
return pypi_url
remote_package = get_remote_package(name=name, version=version)
if remote_package:
remote_url = remote_package.get_url_for_filename(filename)
if remote_url:
return remote_url
else:
# the package may not have been published yet, so we craft a URL
# using our remote base URL
return self.build_remote_download_url(self.filename)
def purl_identifiers(self, skinny=False):
"""
Return a mapping of non-empty identifier name/values for the purl
fields. If skinny is True, only inlucde type, namespace and name.
"""
identifiers = dict(
type=self.type,
namespace=self.namespace,
name=self.name,
)
if not skinny:
identifiers.update(
version=self.version,
subpath=self.subpath,
qualifiers=self.qualifiers,
)
return {k: v for k, v in sorted(identifiers.items()) if v}
def identifiers(self, purl_as_fields=True):
"""
Return a mapping of non-empty identifier name/values.
Return each purl fields separately if purl_as_fields is True.
Otherwise return a package_url string for the purl.
"""
if purl_as_fields:
identifiers = self.purl_identifiers()
else:
identifiers = dict(package_url=self.package_url)
identifiers.update(
download_url=self.download_url,
filename=self.filename,
md5=self.md5,
sha1=self.sha1,
package_url=self.package_url,
)
return {k: v for k, v in sorted(identifiers.items()) if v}
def has_key_metadata(self):
"""
Return True if this distribution has key metadata required for basic attribution.
"""
if self.license_expression == "public-domain":
# copyright not needed
return True
return self.license_expression and self.copyright and self.path_or_url
def to_about(self):
"""
Return a mapping of ABOUT data from this distribution fields.
"""
about_data = dict(
about_resource=self.filename,
checksum_md5=self.md5,
checksum_sha1=self.sha1,
copyright=self.copyright,
description=self.description,
download_url=self.download_url,
homepage_url=self.homepage_url,
license_expression=self.license_expression,
name=self.name,
namespace=self.namespace,
notes=self.notes,
notice_file=self.notice_filename if self.notice_text else "",
package_url=self.package_url,
primary_language=self.primary_language,
qualifiers=self.qualifiers,
size=self.size,
subpath=self.subpath,
type=self.type,
version=self.version,
)
about_data.update(self.extra_data)
about_data = {k: v for k, v in sorted(about_data.items()) if v}
return about_data
def to_dict(self):
"""
Return a mapping data from this distribution.
"""
return {k: v for k, v in attr.asdict(self).items() if v}
def save_about_and_notice_files(self, dest_dir=THIRDPARTY_DIR):
"""
Save a .ABOUT file to `dest_dir`. Include a .NOTICE file if there is a
notice_text.
"""
def save_if_modified(location, content):
if os.path.exists(location):
with open(location) as fi:
existing_content = fi.read()
if existing_content == content:
return False
if TRACE:
print(f"Saving ABOUT (and NOTICE) files for: {self}")
with open(location, "w") as fo:
fo.write(content)
return True
save_if_modified(
location=os.path.join(dest_dir, self.about_filename),
content=saneyaml.dump(self.to_about()),
)
notice_text = self.notice_text and self.notice_text.strip()
if notice_text:
save_if_modified(
location=os.path.join(dest_dir, self.notice_filename),
content=notice_text,
)
def load_about_data(self, about_filename_or_data=None, dest_dir=THIRDPARTY_DIR):
"""
Update self with ABOUT data loaded from an `about_filename_or_data`
which is either a .ABOUT file in `dest_dir` or an ABOUT data mapping.
`about_filename_or_data` defaults to this distribution default ABOUT
filename if not provided. Load the notice_text if present from dest_dir.
"""
if not about_filename_or_data:
about_filename_or_data = self.about_filename
if isinstance(about_filename_or_data, str):
# that's an about_filename
about_path = os.path.join(dest_dir, about_filename_or_data)
if os.path.exists(about_path):
with open(about_path) as fi:
about_data = saneyaml.load(fi.read())
if not about_data:
return False
else:
return False
else:
about_data = about_filename_or_data
md5 = about_data.pop("checksum_md5", None)
if md5:
about_data["md5"] = md5
sha1 = about_data.pop("checksum_sha1", None)
if sha1:
about_data["sha1"] = sha1
sha256 = about_data.pop("checksum_sha256", None)
if sha256:
about_data["sha256"] = sha256
about_data.pop("about_resource", None)
notice_text = about_data.pop("notice_text", None)
notice_file = about_data.pop("notice_file", None)
if notice_text:
about_data["notice_text"] = notice_text
elif notice_file:
notice_loc = os.path.join(dest_dir, notice_file)
if os.path.exists(notice_loc):
with open(notice_loc) as fi:
about_data["notice_text"] = fi.read()
return self.update(about_data, keep_extra=True)
def load_remote_about_data(self):
"""
Fetch and update self with "remote" data Distribution ABOUT file and
NOTICE file if any. Return True if the data was updated.
"""
try:
about_text = fetch_content_from_path_or_url_through_cache(self.about_download_url)
except RemoteNotFetchedException:
return False
if not about_text:
return False
about_data = saneyaml.load(about_text)
notice_file = about_data.pop("notice_file", None)
if notice_file:
try:
notice_text = fetch_content_from_path_or_url_through_cache(self.notice_download_url)
if notice_text:
about_data["notice_text"] = notice_text
except RemoteNotFetchedException:
print(f"Failed to fetch NOTICE file: {self.notice_download_url}")
return self.load_about_data(about_data)
def get_checksums(self, dest_dir=THIRDPARTY_DIR):
"""
Return a mapping of computed checksums for this dist filename is
`dest_dir`.
"""
dist_loc = os.path.join(dest_dir, self.filename)
if os.path.exists(dist_loc):
return multi_checksums(dist_loc, checksum_names=("md5", "sha1", "sha256"))
else:
return {}
def set_checksums(self, dest_dir=THIRDPARTY_DIR):
"""
Update self with checksums computed for this dist filename is `dest_dir`.
"""
self.update(self.get_checksums(dest_dir), overwrite=True)
def validate_checksums(self, dest_dir=THIRDPARTY_DIR):
"""
Return True if all checksums that have a value in this dist match
checksums computed for this dist filename is `dest_dir`.
"""
real_checksums = self.get_checksums(dest_dir)
for csk in ("md5", "sha1", "sha256"):
csv = getattr(self, csk)
rcv = real_checksums.get(csk)
if csv and rcv and csv != rcv:
return False
return True
def get_pip_hash(self):
"""
Return a pip hash option string as used in requirements for this dist.
"""
assert self.sha256, f"Missinh SHA256 for dist {self}"
return f"--hash=sha256:{self.sha256}"
def get_license_keys(self):
try:
keys = LICENSING.license_keys(self.license_expression, unique=True, simple=True)
except license_expression.ExpressionParseError:
return ["unknown"]
return keys
def fetch_license_files(self, dest_dir=THIRDPARTY_DIR):
"""
Fetch license files is missing in `dest_dir`.
Return True if license files were fetched.
"""
paths_or_urls = get_remote_repo().links
errors = []
extra_lic_names = [l.get("file") for l in self.extra_data.get("licenses", {})]
extra_lic_names += [self.extra_data.get("license_file")]
extra_lic_names = [ln for ln in extra_lic_names if ln]
lic_names = [f"{key}.LICENSE" for key in self.get_license_keys()]
for filename in lic_names + extra_lic_names:
floc = os.path.join(dest_dir, filename)
if os.path.exists(floc):
continue
try:
# try remotely first
lic_url = get_link_for_filename(filename=filename, paths_or_urls=paths_or_urls)
fetch_and_save_path_or_url(
filename=filename,
dest_dir=dest_dir,
path_or_url=lic_url,
as_text=True,
)
if TRACE:
print(f"Fetched license from remote: {lic_url}")
except:
try:
# try licensedb second
lic_url = f"{LICENSEDB_API_URL}/{filename}"
fetch_and_save_path_or_url(
filename=filename,
dest_dir=dest_dir,
path_or_url=lic_url,
as_text=True,
)
if TRACE:
print(f"Fetched license from licensedb: {lic_url}")
except:
msg = f'No text for license {filename} in expression "{self.license_expression}" from {self}'
print(msg)
errors.append(msg)
return errors
def extract_pkginfo(self, dest_dir=THIRDPARTY_DIR):
"""
Return the text of the first PKG-INFO or METADATA file found in the
archive of this Distribution in `dest_dir`. Return None if not found.
"""
fmt = "zip" if self.filename.endswith(".whl") else None
dist = os.path.join(dest_dir, self.filename)
with tempfile.TemporaryDirectory(prefix="pypi-tmp-extract") as td:
shutil.unpack_archive(filename=dist, extract_dir=td, format=fmt)
# NOTE: we only care about the first one found in the dist
# which may not be 100% right
for pi in fileutils.resource_iter(location=td, with_dirs=False):
if pi.endswith(
(
"PKG-INFO",
"METADATA",
)
):
with open(pi) as fi:
return fi.read()
def load_pkginfo_data(self, dest_dir=THIRDPARTY_DIR):
"""
Update self with data loaded from the PKG-INFO file found in the
archive of this Distribution in `dest_dir`.
"""
pkginfo_text = self.extract_pkginfo(dest_dir=dest_dir)
if not pkginfo_text:
print(f"!!!!PKG-INFO not found in {self.filename}")
return
raw_data = email.message_from_string(pkginfo_text)
classifiers = raw_data.get_all("Classifier") or []
declared_license = [raw_data["License"]] + [
c for c in classifiers if c.startswith("License")
]
license_expression = compute_normalized_license_expression(declared_license)
other_classifiers = [c for c in classifiers if not c.startswith("License")]
holder = raw_data["Author"]
holder_contact = raw_data["Author-email"]
copyright_statement = f"Copyright (c) {holder} <{holder_contact}>"
pkginfo_data = dict(