/
goto_http_redirect_server.py
executable file
·2066 lines (1788 loc) · 77.2 KB
/
goto_http_redirect_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# -*- pyversion: >=3.6 -*-
#
# black --line-length 100
#
# This source code was created in-part to learn about various Python 3 features
# and useful modules: typing, mypy, pytest, other stuff, while aiming to be
# "Pythontic". This makes for some verbose if descriptive code.
import argparse
from collections import defaultdict, OrderedDict
import copy
import csv
import datetime
import enum
import getpass
import html
import http
from http import server
import json
import logging
import os
import pathlib
import pprint
import re
import signal
import socket
import socketserver
import sys
import threading
import time
import typing
from typing import cast, DefaultDict, List, NamedTuple, NewType, Optional, Tuple, Union
from urllib import parse
import uuid
# canonical module informations used by setup.py
__version__ = "1.2.0"
__author__ = "jtmoon79"
__url_github__ = "https://github.com/jtmoon79/goto_http_redirect_server"
__url_azure__ = "https://dev.azure.com/jtmmoon/goto_http_redirect_server"
__url_circleci__ = "https://circleci.com/gh/jtmoon79/goto_http_redirect_server"
__url_pypi__ = "https://pypi.org/project/goto-http-redirect-server/"
__url_issues__ = "https://github.com/jtmoon79/goto_http_redirect_server/issues"
# first line of __doc__ is used in setup.py. Should match README.md and title at
# github.com project site and Azure project site.
__doc__ = """\
The "Go To" HTTP Redirect Server for sharing dynamic shortcut URLs on your \
network.
"""
#
# globals and constants initialization needed for default values
#
USER_DEFAULT = getpass.getuser()
TIME_START = time.time()
DATETIME_START = datetime.datetime.fromtimestamp(TIME_START).replace(microsecond=0)
# parseable datetime string formats for a Redirect Entry
DATETIME_STRPTIMES = (
r"%Y%m%dT%H%M%S",
r"%Y-%m-%dT%H:%M:%S",
r"%Y-%m-%d %H:%M:%S",
r"%Y-%m-%d_%H:%M:%S",
r"%Y/%m/%d %H:%M",
r"%Y-%m-%d %H:%M",
r"%Y/%m/%d_%H:%M",
r"%Y-%m-%d_%H:%M",
r"%Y-%m-%d",
r"%Y/%m/%d",
)
#
# Types
# FYI: "Re" means "Redirect Entry"
#
# XXX: `from parse import ParseResult` raises ModuleNotFoundError
ParseResult = parse.ParseResult
# Redirect Entry types
# Redirect From URL Path as input from the Administrator (not modified)
Re_From = NewType("Re_From", str)
# Redirect To URL Location
Re_To = NewType("Re_To", str)
# User that created the Redirect (records-keeping thing, does not affect behavior)
Re_User = NewType("Re_User", str)
# Datetime Redirect was created (records-keeping thing, does not affect behavior)
Re_Date = NewType("Re_Date", datetime.datetime)
Re_EntryKey = Re_From # XXX: this might be too confusing?
# path, parameters, query in a str as given by the BaseHTTPRequestHandler, i.e. the
# incoming user request
Ppq = NewType("Ppq", str)
def Re_From_to_Re_EntryKey(from_: Re_From) -> Re_EntryKey:
"""
Convert Re_From to Re_Entry
XXX: not necessary anymore since class re-design
"""
return Re_EntryKey(from_)
def to_ParseResult(value: Union[str, Ppq, Re_From, Re_To, Re_EntryKey]) -> ParseResult:
"""
helpful wrapper
XXX: this is somewhat overdone since class re-design
"""
return parse.urlparse(str(value))
@enum.unique
class Re_EntryType(enum.IntEnum):
"""a.k.a. Required Request Modifier"""
# these must be in an order for `getEntryType_From` to succeed
_ = 0 # /foo '' must start from 0
_P = 1 # /foo;param ';'
_Q = 2 # /foo?query '?'
_PQ = 3 # /foo;param?query ';?'
# XXX: Disable Path Required Request Modifier
# P = 4 # /foo/path '/'
# PP = 5 # /foo/path;param '/;'
# PQ = 6 # /foo/path?query '/?'
# PPQ = 7 # /foo/path;param?query '/;?'
def __init__(self, *_):
# XXX: Python 3.7 introduced _ignore_ but this must support Python 3.5
# So this is a hacky way to create these class-wide dict once.
# XXX: *_ is required but not used. Without *_, super().__init__()
# raises TypeError
# __init__() takes 1 positional argument but 2 were given
cls = self.__class__
if not hasattr(cls, "Map"):
# create once, set once to class-wide attribute
# XXX: using enums, e.g. cls._P, will raise AttributeError
cls.Map = {
0: "",
1: ";",
2: "?",
3: ";?",
# XXX: Disable Path Required Request Modifier
# 4: '/',
# 5: '/;',
# 6: '/?',
# 7: '/;?',
}
if not hasattr(cls, "MapRev"):
cls.MapRev = {v: k for k, v in cls.Map.items()}
# XXX: Disable Path Required Request Modifier
# if not hasattr(cls, 'Paths'):
# cls.Paths = (4, 5, 6, 7)
super(cls, self).__init__()
def getStr_EntryType(self) -> str:
"""reverse mapping of EntryType to it's required appending string"""
return self.Map[self] # type: ignore
@classmethod
def getEntryType_From(cls, from_: Re_From) -> enum.IntEnum:
"""the last matching Re_EntryType is the required matching"""
required = cls._
for typ in cls:
if from_.endswith(cls.Map[typ]): # type: ignore
required = typ
return required
@classmethod
def getEntryKeys(cls, from_: Re_From) -> List[Re_EntryKey]:
"""
return list of all possible Re_EntryKeys
e.g. input '/a' returns ['/a', '/a;', '/a;?', '/a?', …]
"""
ret = [Re_From_to_Re_EntryKey(from_)]
et = cls.getEntryType_From(from_)
for typ in cls:
if et != typ:
ret.append(Re_From_to_Re_EntryKey(Re_From(from_ + typ.getStr_EntryType())))
return ret
@classmethod
def getEntryTypes_fallback(cls, typ) -> Tuple[enum.IntEnum, ...]:
"""return tuple of Re_EntryTypes in order of required fallbacks"""
if typ == cls._: # '/a'
return cls._P, cls._Q, cls._PQ
elif typ == cls._P: # '/a;p'
return (cls._,) # '/a'
elif typ == cls._PQ: # '/a;p?q'
return (cls._,) # '/a'
elif typ == cls._Q: # '/a?q'
return (cls._,) # '/a'
# XXX: Disable Path Required Request Modifier
# elif typ == cls.P: # '/a/b'
# return cls._, # '/a'
# elif typ == cls.PP: # '/;'
# return cls._, #
# elif typ == cls.PPQ: # '/;?'
# return cls._, #
# elif typ == cls.PQ: # '/?'
# return cls._, #
raise ValueError("unmatched type value %s" % typ)
@classmethod
def getEntryType_ParseResult(cls, _: str, pr: ParseResult) -> enum.IntEnum:
# given ParseResult, return appropriate Re_EntryType
#
# TODO: urlparse does not distinguish empty parts and non-existent parts
# using empty string and None.
# e.g. parse.urlparse('/path?') is parse.urlparse('/path')
# The ParseResult.query is '' in both cases but it should be None
# in the second case.
# This function should attempt to distinguish such.
# XXX: Disable Path Required Request Modifier
# if pr.path.count('/') > 1:
# if pr.params and pr.query:
# return cls.PPQ
# elif pr.params:
# return cls.PP
# elif pr.query:
# return cls.PQ
# return cls.P
# else:
if pr.params and pr.query:
return cls._PQ
elif pr.params:
return cls._P
elif pr.query:
return cls._Q
return cls._
# XXX: The entire `class Re_Entry` has a more concise declaration in
# Python >=3.7. The following tedium is required for Python 3.5 support.
# XXX: type annotations for NamedTuple were introduced in Python 3.6 (and cannot
# be used here).
__Re_EntryBase = NamedTuple(
"__Re_EntryBase",
[
("from_", Re_From),
("to", Re_To),
("user", Re_User),
("date", datetime.datetime),
("from_pr", ParseResult), # ParseResult of from_
("to_pr", ParseResult), # ParseResult if to
("etype", Re_EntryType),
],
)
# XXX: setting default values for NamedTuple in Python <3.7 is also tedious.
# Copied from https://stackoverflow.com/a/18348004/471376
__Re_EntryBase.__new__.__defaults__ = ( # type: ignore
None, # from_
None, # to
USER_DEFAULT, # user
DATETIME_START, # date
None, # from_pr
None, # to_pr
None, # etype
)
class Re_Entry(__Re_EntryBase):
"""
Redirect Entry
represents a --from-to CLI argument or one line from a redirects file
"""
def __new__(cls, *args, **kwargs):
"""initialize `from_pr` `to_pr` based on `from_` and `to`"""
# XXX: A tedious way to initialize default arguments that are based on
# other arguments. Does not check for all possible combinations of
# passed initializer arguments.
# Added to ensure correctness and to simplify pytest code.
#
# Attributes of NamedTuple can not be modified after
# `super().__new__(…)`. And there is no typing.NamedList built-in
# which would allow such.
# Overriding via `@property def from_pr(self):` does not allow
# indexing among other subtle behavior differences.
# So settle on this somewhat ugly but workable solution.
#
from_ = "from_"
from_i = 0 # `from_` index
from_pr = "from_pr"
from_val = None
to = "to"
toi = 1 # `to` index
to_pr = "to_pr"
etype = "etype"
etypei = 6 # `etype` index
# set `from_pr` if not passed
if len(args) < 5 and from_pr not in kwargs:
if from_ in kwargs:
kwargs[from_pr] = parse.urlparse(kwargs[from_])
from_val = kwargs[from_]
elif from_i < len(args):
kwargs[from_pr] = parse.urlparse(args[from_i])
from_val = args[from_i]
# set `to_pr` if not passed
if len(args) < 6 and to_pr not in kwargs:
if to in kwargs:
kwargs[to_pr] = parse.urlparse(kwargs[to])
elif toi < len(args):
kwargs[to_pr] = parse.urlparse(args[toi])
# set `etype` if not passed
if len(args) < etypei + 1 and etype not in kwargs:
if not from_val:
if from_ in kwargs:
from_val = kwargs[from_]
elif from_i < len(args):
from_val = args[from_i]
if from_val is not None:
kwargs[etype] = Re_EntryType.getEntryType_From(from_val)
instance = super().__new__(cls, *args, **kwargs)
# self-check
if instance.from_ is None:
raise ValueError("Failed to set from_")
if instance.to is None:
raise ValueError("Failed to set *to*")
if instance.from_pr is None:
raise ValueError("Failed to set from_pr")
if instance.to_pr is None:
raise ValueError("Failed to set to_pr")
if instance.etype is None:
raise ValueError("Failed to set etype")
return instance
# XXX: mypy does not like the following but it seems perfectly fine to me
# Re_Entry_Dict = NewType("Re_Entry_Dict", typing.OrderedDict[Re_EntryKey, Re_Entry])
Re_Entry_Dict = NewType("Re_Entry_Dict", OrderedDict)
def Re_Entry_Dict_new(data: Optional[typing.Sequence] = None) -> Re_Entry_Dict:
"""type annotated Re_Entry_Dict creation"""
if data:
return Re_Entry_Dict(OrderedDict(data))
return Re_Entry_Dict(OrderedDict())
Re_Field_Delimiter = NewType("Re_Field_Delimiter", str)
#
# other helpful types and type aliases
# XXX: some get very pedantic because they are for learning's sake.
#
Path_List = List[pathlib.Path]
FromTo_List = List[Tuple[str, str]]
Redirect_Counter = DefaultDict[str, int]
Redirect_Code_Value = NewType("Redirect_Code_Value", int)
str_None = Optional[str]
Path_None = Optional[pathlib.Path]
Iter_str = typing.Iterable[str]
htmls = NewType("htmls", str) # HTML String
htmls_str = Union[htmls, str]
#
# further globals and constants initialization
#
PROGRAM_NAME = "goto_http_redirect_server"
LISTEN_IP = "0.0.0.0"
LISTEN_PORT = 80
HOSTNAME = socket.gethostname()
# default CSS for various <html>
CSS = htmls(
"""\
body {
background-color: #2F4F4F; /* DarkSlateGray; */
color: #FAEBD7; /* AntiqueWhite; */
font-family: monospace;
}
@media screen and (prefers-color-scheme: light) {
body {
background-color: white;
color: black;
}
}
table td {
border-collapse: collapse;
border: 1px dashed;
padding: 1px;
}
.ar {
text-align: right;
}
tbody tr:nth-child(odd) {
background-color: #778899; /* LightSlateGray; */
}
tbody tr:nth-child(even) {
background-color: #708090; /* SlateGray; */
}
"""
)
# The following Javascript code has been copied from www.kryogenix.org.
# https://www.kryogenix.org/code/browser/sorttable/sorttable.js
# (http://archive.ph/GdD37)
# The Javascript code copied from www.kryogenix.org is licensed under
# "The MIT Licence, for code from kryogenix.org". A copy of that license is
# available at
# https://kryogenix.org/code/browser/licence.html
# (http://archive.ph/lvhUC)
# project file LICENSE-www.kryogenix.org
# Code in this project, goto_http_redirect_server, is licensed under the
# terms of MIT License version outlined in file "LICENSE". The Javascript
# code copied from www.kryogenix.org is not subject that license.
# The Javascript code copied from www.kryogenix.org is subject to the
# "The MIT Licence, for code from kryogenix.org" license.
# The Javascript code copied from www.kryogenix.org has been minified.
# -- START CODE COPIED FROM www.kryogenix.org UNDER MIT LICENSE --
JAVASCRIPT_SORTABLE_JS = r"""
/*
SortTable
version 2
7th April 2007
Stuart Langridge, http://www.kryogenix.org/code/browser/sorttable/
Instructions:
Download this file
Add <script src="sorttable.js"></script> to your HTML
Add class="sortable" to any table you'd like to make sortable
Click on the headers to sort
Thanks to many, many people for contributions and suggestions.
Licenced as X11: http://www.kryogenix.org/code/browser/licence.html
This basically means: do what you want with it.
*/
var stIsIE=!1;if(sorttable={init:function(){arguments.callee.done||(arguments.callee.done=!0,_timer&&clearInterval(_timer),document.createElement&&document.getElementsByTagName&&(sorttable.DATE_RE=/^(\d\d?)[\/\.-](\d\d?)[\/\.-]((\d\d)?\d\d)$/,forEach(document.getElementsByTagName("table"),function(t){-1!=t.className.search(/\bsortable\b/)&&sorttable.makeSortable(t)})))},makeSortable:function(t){if(0==t.getElementsByTagName("thead").length&&(the=document.createElement("thead"),the.appendChild(t.rows[0]),t.insertBefore(the,t.firstChild)),null==t.tHead&&(t.tHead=t.getElementsByTagName("thead")[0]),1==t.tHead.rows.length){sortbottomrows=[];for(var e=0;e<t.rows.length;e++)-1!=t.rows[e].className.search(/\bsortbottom\b/)&&(sortbottomrows[sortbottomrows.length]=t.rows[e]);if(sortbottomrows){null==t.tFoot&&(tfo=document.createElement("tfoot"),t.appendChild(tfo));for(e=0;e<sortbottomrows.length;e++)tfo.appendChild(sortbottomrows[e]);delete sortbottomrows}headrow=t.tHead.rows[0].cells;for(e=0;e<headrow.length;e++)headrow[e].className.match(/\bsorttable_nosort\b/)||(mtch=headrow[e].className.match(/\bsorttable_([a-z0-9]+)\b/),mtch&&(override=mtch[1]),mtch&&"function"==typeof sorttable["sort_"+override]?headrow[e].sorttable_sortfunction=sorttable["sort_"+override]:headrow[e].sorttable_sortfunction=sorttable.guessType(t,e),headrow[e].sorttable_columnindex=e,headrow[e].sorttable_tbody=t.tBodies[0],dean_addEvent(headrow[e],"click",sorttable.innerSortFunction=function(t){if(-1!=this.className.search(/\bsorttable_sorted\b/))return sorttable.reverse(this.sorttable_tbody),this.className=this.className.replace("sorttable_sorted","sorttable_sorted_reverse"),this.removeChild(document.getElementById("sorttable_sortfwdind")),sortrevind=document.createElement("span"),sortrevind.id="sorttable_sortrevind",sortrevind.innerHTML=stIsIE?' <font face="webdings">5</font>':" ▴",void this.appendChild(sortrevind);if(-1!=this.className.search(/\bsorttable_sorted_reverse\b/))return sorttable.reverse(this.sorttable_tbody),this.className=this.className.replace("sorttable_sorted_reverse","sorttable_sorted"),this.removeChild(document.getElementById("sorttable_sortrevind")),sortfwdind=document.createElement("span"),sortfwdind.id="sorttable_sortfwdind",sortfwdind.innerHTML=stIsIE?' <font face="webdings">6</font>':" ▾",void this.appendChild(sortfwdind);theadrow=this.parentNode,forEach(theadrow.childNodes,function(t){1==t.nodeType&&(t.className=t.className.replace("sorttable_sorted_reverse",""),t.className=t.className.replace("sorttable_sorted",""))}),sortfwdind=document.getElementById("sorttable_sortfwdind"),sortfwdind&&sortfwdind.parentNode.removeChild(sortfwdind),sortrevind=document.getElementById("sorttable_sortrevind"),sortrevind&&sortrevind.parentNode.removeChild(sortrevind),this.className+=" sorttable_sorted",sortfwdind=document.createElement("span"),sortfwdind.id="sorttable_sortfwdind",sortfwdind.innerHTML=stIsIE?' <font face="webdings">6</font>':" ▾",this.appendChild(sortfwdind),row_array=[],col=this.sorttable_columnindex,rows=this.sorttable_tbody.rows;for(var e=0;e<rows.length;e++)row_array[row_array.length]=[sorttable.getInnerText(rows[e].cells[col]),rows[e]];row_array.sort(this.sorttable_sortfunction),tb=this.sorttable_tbody;for(e=0;e<row_array.length;e++)tb.appendChild(row_array[e][1]);delete row_array}))}},guessType:function(t,e){sortfn=sorttable.sort_alpha;for(var r=0;r<t.tBodies[0].rows.length;r++)if(text=sorttable.getInnerText(t.tBodies[0].rows[r].cells[e]),""!=text){if(text.match(/^-?[£$¤]?[\d,.]+%?$/))return sorttable.sort_numeric;if(possdate=text.match(sorttable.DATE_RE),possdate){if(first=parseInt(possdate[1]),second=parseInt(possdate[2]),first>12)return sorttable.sort_ddmm;if(second>12)return sorttable.sort_mmdd;sortfn=sorttable.sort_ddmm}}return sortfn},getInnerText:function(t){if(!t)return"";if(hasInputs="function"==typeof t.getElementsByTagName&&t.getElementsByTagName("input").length,null!=t.getAttribute("sorttable_customkey"))return t.getAttribute("sorttable_customkey");if(void 0!==t.textContent&&!hasInputs)return t.textContent.replace(/^\s+|\s+$/g,"");if(void 0!==t.innerText&&!hasInputs)return t.innerText.replace(/^\s+|\s+$/g,"");if(void 0!==t.text&&!hasInputs)return t.text.replace(/^\s+|\s+$/g,"");switch(t.nodeType){case 3:if("input"==t.nodeName.toLowerCase())return t.value.replace(/^\s+|\s+$/g,"");case 4:return t.nodeValue.replace(/^\s+|\s+$/g,"");case 1:case 11:for(var e="",r=0;r<t.childNodes.length;r++)e+=sorttable.getInnerText(t.childNodes[r]);return e.replace(/^\s+|\s+$/g,"");default:return""}},reverse:function(t){newrows=[];for(var e=0;e<t.rows.length;e++)newrows[newrows.length]=t.rows[e];for(e=newrows.length-1;e>=0;e--)t.appendChild(newrows[e]);delete newrows},sort_numeric:function(t,e){return aa=parseFloat(t[0].replace(/[^0-9.-]/g,"")),isNaN(aa)&&(aa=0),bb=parseFloat(e[0].replace(/[^0-9.-]/g,"")),isNaN(bb)&&(bb=0),aa-bb},sort_alpha:function(t,e){return t[0]==e[0]?0:t[0]<e[0]?-1:1},sort_ddmm:function(t,e){return mtch=t[0].match(sorttable.DATE_RE),y=mtch[3],m=mtch[2],d=mtch[1],1==m.length&&(m="0"+m),1==d.length&&(d="0"+d),dt1=y+m+d,mtch=e[0].match(sorttable.DATE_RE),y=mtch[3],m=mtch[2],d=mtch[1],1==m.length&&(m="0"+m),1==d.length&&(d="0"+d),dt2=y+m+d,dt1==dt2?0:dt1<dt2?-1:1},sort_mmdd:function(t,e){return mtch=t[0].match(sorttable.DATE_RE),y=mtch[3],d=mtch[2],m=mtch[1],1==m.length&&(m="0"+m),1==d.length&&(d="0"+d),dt1=y+m+d,mtch=e[0].match(sorttable.DATE_RE),y=mtch[3],d=mtch[2],m=mtch[1],1==m.length&&(m="0"+m),1==d.length&&(d="0"+d),dt2=y+m+d,dt1==dt2?0:dt1<dt2?-1:1},shaker_sort:function(t,e){for(var r=0,o=t.length-1,n=!0;n;){n=!1;for(var s=r;s<o;++s)if(e(t[s],t[s+1])>0){var a=t[s];t[s]=t[s+1],t[s+1]=a,n=!0}if(o--,!n)break;for(s=o;s>r;--s)if(e(t[s],t[s-1])<0){a=t[s];t[s]=t[s-1],t[s-1]=a,n=!0}r++}}},document.addEventListener&&document.addEventListener("DOMContentLoaded",sorttable.init,!1),/WebKit/i.test(navigator.userAgent))var _timer=setInterval(function(){/loaded|complete/.test(document.readyState)&&sorttable.init()},10);function dean_addEvent(t,e,r){if(t.addEventListener)t.addEventListener(e,r,!1);else{r.$$guid||(r.$$guid=dean_addEvent.guid++),t.events||(t.events={});var o=t.events[e];o||(o=t.events[e]={},t["on"+e]&&(o[0]=t["on"+e])),o[r.$$guid]=r,t["on"+e]=handleEvent}}function removeEvent(t,e,r){t.removeEventListener?t.removeEventListener(e,r,!1):t.events&&t.events[e]&&delete t.events[e][r.$$guid]}function handleEvent(t){var e=!0;t=t||fixEvent(((this.ownerDocument||this.document||this).parentWindow||window).event);var r=this.events[t.type];for(var o in r)this.$$handleEvent=r[o],!1===this.$$handleEvent(t)&&(e=!1);return e}function fixEvent(t){return t.preventDefault=fixEvent.preventDefault,t.stopPropagation=fixEvent.stopPropagation,t}window.onload=sorttable.init,dean_addEvent.guid=1,fixEvent.preventDefault=function(){this.returnValue=!1},fixEvent.stopPropagation=function(){this.cancelBubble=!0},Array.forEach||(Array.forEach=function(t,e,r){for(var o=0;o<t.length;o++)e.call(r,t[o],o,t)}),Function.prototype.forEach=function(t,e,r){for(var o in t)void 0===this.prototype[o]&&e.call(r,t[o],o,t)},String.forEach=function(t,e,r){Array.forEach(t.split(""),function(o,n){e.call(r,o,n,t)})};var forEach=function(t,e,r){if(t){var o=Object;if(t instanceof Function)o=Function;else{if(t.forEach instanceof Function)return void t.forEach(e,r);"string"==typeof t?o=String:"number"==typeof t.length&&(o=Array)}o.forEach(t,e,r)}};
"""
# -- END CODE COPIED FROM www.kryogenix.org UNDER MIT LICENSE --
#
# RedirectServer class things
#
# SOCKET_LISTEN_BACKLOG is eventually passed to socket.listen
SOCKET_LISTEN_BACKLOG = 31 # type: int
STATUS_PAGE_PATH_DEFAULT = "/status" # type: str
PATH_FAVICON = "/favicon.ico" # type: str
REDIRECT_PATHS_NOT_ALLOWED = (PATH_FAVICON,) # type: Tuple[str, ...]
# HTTP Status Code used for redirects (among several possible redirect codes)
REDIRECT_CODE_DEFAULT = http.HTTPStatus.TEMPORARY_REDIRECT # type: http.HTTPStatus
REDIRECT_CODE = REDIRECT_CODE_DEFAULT # type: http.HTTPStatus
# urlparse-related things
RE_URI_KEYWORDS = re.compile(r"\${(path|params|query|fragment)}")
URI_KEYWORDS_REPL = ("path", "params", "query", "fragment") # type: Iter_str
# signals
SIGNAL_RELOAD_UNIX = "SIGUSR1" # type: str
SIGNAL_RELOAD_WINDOWS = "SIGBREAK" # type: str
# signal to cause --redirects file reload
try:
# Unix (not defined on Windows)
SIGNAL_RELOAD = signal.SIGUSR1 # type: ignore # in Windows, mypy attempts import and fails
except AttributeError:
# Windows (not defined on some Unix)
SIGNAL_RELOAD = signal.SIGBREAK # type: ignore # in Unix, mypy attempts import and fails
# redirect file things
FIELD_DELIMITER_DEFAULT = Re_Field_Delimiter("\t") # type: Re_Field_Delimiter
FIELD_DELIMITER_DEFAULT_NAME = "tab" # type: str
FIELD_DELIMITER_DEFAULT_ESCAPED = FIELD_DELIMITER_DEFAULT.encode("unicode_escape").decode(
"utf-8"
) # type: str
# lines starting with '#' are comment lines and should be ignored. Also, Excel silently surrounds
# any line with double-quotes if the line has odd characters or multiple spaces.
REDIRECT_FILE_COMMENT = "#" # type: str
REDIRECT_FILE_COMMENTS = (
REDIRECT_FILE_COMMENT,
" %s" % REDIRECT_FILE_COMMENT,
'"%s' % REDIRECT_FILE_COMMENT,
'" %s' % REDIRECT_FILE_COMMENT,
)
# logging module initializations (call logging_init to complete)
LOGGING_FORMAT_DATETIME = "%Y-%m-%d %H:%M:%S" # type: str
LOGGING_FORMAT = "%(asctime)s %(name)s %(levelname)s: %(message)s" # type: str
# importers can override 'log'
log = logging.getLogger(PROGRAM_NAME) # type: logging.Logger
# write-once copy of sys.argv
sys_args = [] # type: List[str]
#
# "volatile" global instances
#
# global list of --from-to passed redirects
Redirect_FromTo_List = [] # type: FromTo_List
# global list of --redirects files
Redirect_Files_List = [] # type: Path_List
reload_do = False # type: bool
reload_datetime = None # type: Optional[datetime.datetime]
redirect_counter = defaultdict(int) # type: DefaultDict[str, int]
STATUS_PATH = None # type: str_None
RELOAD_PATH = None # type: str_None
NOTE_ADMIN = htmls("") # type: htmls
#
# functions, classes, code
#
class StrDelay:
"""
Delayed evaluation of object.__str__.
Intended for logging messages that may not need to execute a passed function
because the logging level may not be set.
e.g.
logging.debug('%s', complex_function(foo))
The call to complex_function(foo) may not be necessary because logging.level
might be logging.INFO. So skip the call to complex_function(foo) if it is
not necessary, e.g.
logging.debug('%s', StrDelay(complex_function, foo))
XXX: There are probably more succinct implementations. Good enough.
"""
def __init__(self, func, *args, **kwargs):
self._func = func
self._args = args
self._kwargs = kwargs
def __str__(self) -> str:
out = ""
if self._func:
out = str(self._func(*self._args, **self._kwargs))
return out
def html_escape(s_: htmls_str) -> htmls:
"""transform a Python string into equivalent HTML-displayed string"""
return htmls(html.escape(s_).replace("\n", "<br />\n").replace(" ", r" "))
def html_a(href: str, text: str_None = None) -> htmls:
"""create HTML <a> from href URL"""
if text is None:
text = href
return htmls('<a href="' + href + '">' + html_escape(text) + "</a>")
def datetime_now() -> datetime.datetime:
"""
Wrap datetime.now so pytests can override it.
Also, microseconds are annoying to print so set to 0.
"""
return datetime.datetime.now().replace(microsecond=0)
def logging_init(debug: bool, filename: Path_None) -> None:
"""initialize logging module to my preferences"""
global LOGGING_FORMAT
filename_ = str(filename.absolute()) if filename else None
logging.basicConfig(
filename=filename_,
level=logging.DEBUG,
format=LOGGING_FORMAT,
datefmt=LOGGING_FORMAT_DATETIME,
)
global log
log = logging.getLogger(PROGRAM_NAME)
if debug:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
def print_debug(message: str, end: str = "\n", file=sys.stderr) -> None:
"""
Helper for printing (preferably to stderr) and checking logging.DEBUG.
Sometimes a full logging message is too much.
"""
if log.level <= logging.DEBUG:
print(message, end=end, file=file)
if hasattr(file, "flush"):
file.flush()
def dts_to_datetime(dts: str) -> datetime.datetime:
"""
Parse datetime string among formats. Fallback to DATETIME_START
"""
dt = None
for dts_patt in DATETIME_STRPTIMES:
try:
dt = datetime.datetime.strptime(dts, dts_patt)
except ValueError:
continue
break
if not dt:
dt = DATETIME_START
return dt
class RedirectHandler(server.SimpleHTTPRequestHandler):
"""
XXX: This class is passed to RedirectServer which creates instances of
RedirectHandler. But RedirectHandler instances need to access values
that may change and there is not way to have RedirectServer pass some
tuple of values to new instances. So RedirectHandler instances hold
references to class-wide values. Those are set in the
redirect_handler_factory by call to set_c
"""
# override BaseHTTPRequestHandler.protocol_version to enable HTTP/1.1
# behavior (because HTTP/1.0 is so old)
# https://github.com/python/cpython/blob/5c02a39a0b31a330e06b4d6f44835afb205dc7cc/Lib/http/server.py#L613-L615
protocol_version = "HTTP/1.1"
Header_Server_Host = ("Redirect-Server-Host", HOSTNAME)
Header_Server_Version = ("Redirect-Server-Version", __version__)
# see https://tools.ietf.org/html/rfc2616#page-124
Header_ContentType_html = ("Content-Type", "text/html; charset=utf-8")
# see https://tools.ietf.org/html/rfc2616#section-14.10
Header_Connection_close = ("Connection", "close")
__count = 0
redirects = None # type: Re_Entry_Dict
status_code = None # type: http.HTTPStatus
status_path = None # type: str
reload_path = None # type: str_None
status_path_pr = None # type: ParseResult
reload_path_pr = None # type: ParseResult
note_admin = None # type: htmls
@classmethod
def set_c(
cls,
redirects: Re_Entry_Dict,
status_code: http.HTTPStatus,
status_path: str,
reload_path: str_None,
note_admin: htmls,
):
"""set class-wide attributes to new values"""
cls.redirects = redirects
cls.status_code = status_code
cls.status_path = status_path
cls.reload_path = reload_path
cls.status_path_pr = parse.urlparse(cls.status_path)
cls.reload_path_pr = parse.urlparse(str(cls.reload_path))
cls.note_admin = note_admin
def __init__(self, *args, **kwargs):
RedirectHandler.__count += 1
super().__init__(*args, **kwargs)
log.debug("RedirectHandler.__init__ %d (@0x%08X)", RedirectHandler.__count, id(self))
def log_message(self, format_, *args, **kwargs):
"""
override the RedirectHandler.log_message so RedirectHandler
instances use the module-level logging.Logger instance `log`
"""
try:
prepend = str(self.client_address[0]) + ":" + str(self.client_address[1]) + " "
if "loglevel" in kwargs and isinstance(kwargs["loglevel"], type(log.level)):
log.log(kwargs["loglevel"], prepend + format_, *args)
return
log.debug(prepend + format_, *args)
except Exception as ex:
print("Error during log_message\n%s" % str(ex), file=sys.stderr)
def _write_html_doc(self, html_doc: htmls) -> None:
"""
Write out the HTML document and required headers.
This calls end_headers!
"""
# From https://tools.ietf.org/html/rfc2616#section-14.13
# The Content-Length entity-header field indicates the size of
# the entity-body, in decimal number of OCTETs
# XXX: does this follow *all* Message Length rules?
# https://tools.ietf.org/html/rfc2616#section-4.4
html_docb = bytes(html_doc, encoding="utf-8", errors="xmlcharrefreplace")
self.send_header(*self.Header_Server_Host)
self.send_header(*self.Header_Server_Version)
self.send_header("Content-Length", str(len(html_docb)))
self.send_header(*self.Header_ContentType_html)
self.send_header(*self.Header_Connection_close)
self.end_headers()
self.wfile.write(html_docb)
@staticmethod
def combine_parseresult(pr1: ParseResult, pr2: ParseResult) -> Re_To:
"""
Combine ParseResult parts.
A ParseResult example is
parse.urlparse('http://host.com/path1;parmA=a,parmB=b?a=A&b=%20B&cc=CCC#FRAG')
returns
ParseResult(scheme='http', netloc='host.com', path='/path1',
params='parm2', query='a=A&b=%20B&ccc=CCC', fragment='FRAG')
pr1 is assumed to be a Re_To supplied at startup-time or reload-time
pr2 is assumed to be an incoming user request
From pr1 use .scheme, .netloc, .path
Prefer .fragment from pr2, then pr1
Combine .params, .query
The RedirectEntry 'To' can use string.Template syntax to replace with
URI parts from pr1
For example, given RedirectEntry supplied at start-time `pr1`
/b http://bug-tracker.megacorp.local/search/bug.cgi?id=${query} bob 2019-01-01 11:30:00
A user incoming GET request for URL `pr2`
'http://goto/b?123
processed by `combine_parseresult` would become URL
'http://bug-tracker.megacorp.local/search/bug.cgi?id=123'
Return a URL suitable for HTTP Header 'To'.
XXX: this function is called for every request. It should be implemented
more efficiently.
XXX: This functions works fine for 98% of cases, but can get wonky with
complicated pr1, pr2, and multiple repeating string.Template
replacements.
"""
# work from a OrderDict(pr2) instance, used to track what replacements
# from pr2 have occurred
pr2d = pr2._asdict()
def ssub(val: str) -> str:
"""safe subst. val, if successful replacement then pop pr2d[key]"""
# shortcut empty string case
if not val:
return val
# shortcut when no Template syntax present
if not RE_URI_KEYWORDS.search(val):
return val
# there are replacements to do
remove = dict()
for key in URI_KEYWORDS_REPL:
repl = pr2d[key] if key in pr2d else key
val_old = val
val = re.sub(r"\${%s}" % key, repl, val)
remove[key] = False
if val != val_old:
remove[key] = True
# pr2d.pop(key)
# log.debug(' "%s": "%s" -> "%s" POP? %s', key, val_old, val, popd)
for key, rm in remove.items():
if rm and key in pr2d:
pr2d.pop(key)
return val
# starting with a copy of pr1 with safe_substitutes
pr = dict()
for k_, v_ in pr1._asdict().items():
pr[k_] = ssub(v_)
# selectively combine URI parts from pr2d
# safe_substitute where appropriate
if "fragment" in pr2d and pr2d["fragment"]:
pr["fragment"] = pr2d["fragment"]
if "params" in pr2d and pr2d["params"]:
if pr1.params:
# XXX: how are URI Object Parameters combined?
# see https://tools.ietf.org/html/rfc1808.html#section-2.1
pr["params"] = ssub(pr1.params) + ";" + pr2d["params"]
else:
pr["params"] = pr2d["params"]
if "query" in pr2d and pr2d["query"]:
if pr1.query:
pr["query"] = ssub(pr1.query) + "&" + pr2d["query"]
else:
pr["query"] = pr2d["query"]
url = parse.urlunparse(ParseResult(**pr))
return Re_To(url)
@staticmethod
def query_match(pr1: ParseResult, pr2: ParseResult) -> bool:
"""
:param pr1: was supplied at startup-time or reload-time
:param pr2: is a ppq incoming user request: path + parameters + query
"""
# TODO: how should this interact with path required modifier?
return pr1.path == pr2.path
# manual caching
# use key `hash(ppq)` to avoid storing secrets within the `ppq` URL string
ppq_cache_enabled = True # type: bool
_ppq_cache = OrderedDict() # type: OrderedDict[int, Tuple[Re_Entry, Re_To]]
_ppq_cache_max = 50 # type: int
_ppq_cache_redirects_hash = 0 # type: int
@staticmethod
def ppq_cache_clear() -> None:
RedirectHandler._ppq_cache.clear()
RedirectHandler._ppq_cache_redirects_hash = 0
@staticmethod
def _ppq_cache_save(ppq: Ppq, to: Re_To, entry: Re_Entry) -> None:
if not RedirectHandler.ppq_cache_enabled:
return
# XXX: SECURITY RISK: Python hash is not cryptographically secure
ppqh = hash(ppq)
# delete entry if too big
if len(RedirectHandler._ppq_cache) >= RedirectHandler._ppq_cache_max:
# log.debug("_ppq_cache(@0x%08X).popitem() len %s",
# id(RedirectHandler._ppq_cache),
# len(RedirectHandler._ppq_cache))
# type `OrderedDict` means `popitem` deletes least recently entered
# XXX: this is a primitive cache eviction algorithm. A "least used cache entry" would
# be better. Good enough.
RedirectHandler._ppq_cache.popitem()
# cache the entry
RedirectHandler._ppq_cache[ppqh] = (entry, to)
# log.debug("_ppq_cache(@0x%08X).save[0x%08X]='%s' len %d",
# id(RedirectHandler._ppq_cache), ppqh, entry.to,
# len(RedirectHandler._ppq_cache))
@staticmethod
def _ppq_cache_check(
ppq: Ppq, redirects: Re_Entry_Dict
) -> Union[Tuple[Re_Entry, Re_To], Tuple[None, None]]:
if not RedirectHandler.ppq_cache_enabled:
return None, None
# the `_ppq_cache_redirects_hash` is a sanity check
redirects_hash = id(redirects)
if RedirectHandler._ppq_cache_redirects_hash == 0:
RedirectHandler._ppq_cache_redirects_hash = redirects_hash
log.debug("set _ppq_cache_redirects_hash 0x%016X", redirects_hash)
elif RedirectHandler._ppq_cache_redirects_hash != redirects_hash:
log.error("_ppq_cache_redirects was not cleared")
RedirectHandler.ppq_cache_clear()
return None, None
# XXX: SECURITY RISK: Python hash is not cryptographically secure
ppqh = hash(ppq)
if ppqh in RedirectHandler._ppq_cache:
# log.debug("cached in _ppq_cache(@0x%08X)[0x%08X] len %d",
# id(RedirectHandler._ppq_cache),
# ppqh, len(RedirectHandler._ppq_cache))
return RedirectHandler._ppq_cache[ppqh]
return None, None
@staticmethod
def query_match_finder(
ppq: Ppq, ppqpr: ParseResult, redirects: Re_Entry_Dict
) -> Optional[Re_Entry]:
"""
An incoming query can have multiple matches within redirects. Return the
required request matching entry.
Has underlying caching.
For example, given incoming ppq '/foo?a=1' and redirects
{
'/foo': …
'/foo?': …
'/foo;': …
}
This could match keys '/foo' and '/foo?' (not '/foo;'). This will return
the entry for required request match of '/foo?'.
:param ppq: incoming user request
:param ppqpr: same incoming user request as ParseResult
:param redirects: loaded redirect entries
"""
path = ppqpr.path
keys = []
keyt = []
ppqt = Re_EntryType.getEntryType_ParseResult(ppq, ppqpr)
# search for all possible entry based on path;
# e.g.
# '/foo', '/foo;', '/foo;?', '/foo?'
# search for exact match, accumulate possible matches as it goes
for key in Re_EntryType.getEntryKeys(typing.cast(Re_From, path)):
# XXX: Disable Path Required Request Modifier
# if ppqt in (Re_EntryType.Paths):
# key = key.split('/')[:1].join('')
if key not in redirects:
continue
entry = redirects[key]
if entry.etype == ppqt:
return entry # shortcut remaining matching
keys.append(key)
keyt.append(entry.etype)
# nothing is a possible match
if not keys:
return None
# search for inexact but appropriate type match
for typ in Re_EntryType.getEntryTypes_fallback(ppqt):
if typ in keyt:
r_ = keys[keyt.index(typ)]
return redirects[r_]
# XXX: no fallback was found yet there were fallback keys? concerning.
log.error("Expected to find fallback type for type %s, none found", ppqt)
return None
def do_GET_status(self, note_admin: htmls, reload_datetime_: datetime.datetime) -> None:
"""dump status information about this server instance"""
http_sc = http.HTTPStatus.OK # HTTP Status Code
self.log_message(
"status requested, returning %s (%s)",
int(http_sc),
http_sc.phrase,
loglevel=logging.INFO,
)
self.send_response(http_sc)
he = html_escape # abbreviate
# create the html body
esc_title = he("%s status" % PROGRAM_NAME)
start_datetime = datetime.datetime.fromtimestamp(TIME_START).replace(microsecond=0)
uptime = time.time() - TIME_START
esc_overall = "Program {}".format(html_a(__url_github__, PROGRAM_NAME))
esc_overall += he(" version {}.\n".format(__version__))
esc_overall += he(
"Process ID %s listening on %s:%s on host %s\n"
"Process start datetime %s (up time %s)\n"
"Successful Redirect Status Code is %s (%s)"
% (
os.getpid(),
self.server.server_address[0],
self.server.server_address[1],
HOSTNAME,
start_datetime,
datetime.timedelta(seconds=uptime),
int(self.status_code),
self.status_code.phrase,
)
)
def obj_to_html(obj, sort_keys=False) -> htmls:
"""Convert an object to html"""
return he(
json.dumps(obj, indent=2, ensure_ascii=False, sort_keys=sort_keys, default=str)
)
def redirects_to_html_table(rd: Re_Entry_Dict, reload_datetime__) -> htmls:
"""Convert Re_Entry_Dict into linkable html table"""
esc_reload_datetime = he(cast(datetime.datetime, reload_datetime__).isoformat())
s_ = """\
<table class="sortable">
<caption>Currently Loaded Redirects (last reload {esc_reload_datetime})</caption>
<thead>
<tr>
<th scope="col">From</th><th scope="col">To</th><th scope="col" class="ar">Entry User</th><th scope="col">Entry datetime</th>
</tr>
</thead>
<tbody>
""".format(
esc_reload_datetime=esc_reload_datetime
)
for key in rd.keys():
val = rd[key]
s_ += """\
<tr>
<td>{from_}</td><td>{to_}</td><td class="ar">{user}</td><td>{date}</td>
</tr>
""".format(
from_=html_a(val.from_),
to_=he(val.to),
user=he(val.user),
date=he(str(val.date)),
)
s_ += """\
</tbody>
</table>"""
return htmls(s_)