-
Notifications
You must be signed in to change notification settings - Fork 2
/
xhttp.py
125 lines (93 loc) · 2.33 KB
/
xhttp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import socket
import datetime
import time
def http2_test(sock: socket.socket):
data = [
"HTTP/1.1 200 OK",
"Content-Type: text/plain",
"Transfer-Encoding: chunked",
"",
"",
]
data = "\r\n".join(data)
resp = data.encode()
print("resp: ", data)
sock.sendall(resp)
sock.close()
return
def chunked_test(sock: socket.socket):
data = [
"HTTP/1.1 200 OK",
"Content-Type: text/plain",
"Transfer-Encoding: chunked",
"",
"",
]
xdata = [
r"7",
r"Mozilla",
r"11",
r"Developer Network",
r"0",
r"",
r"",
# r"7\r\n",
# r"Mozilla\r\n",
# r"11\r\n",
# r"Developer Network\r\n",
# r"0\r\n",
# r"\r\n",
]
data = "\r\n".join(data)
data += "\r\n".join(xdata)
resp = data.encode()
print("resp: ", data)
sock.sendall(resp)
sock.close()
return
def chunk_http_server(sock: socket.socket):
_resp = [
"HTTP/1.1 200 OK",
"Content-Type: text/plain",
"X-Accel-Buffering: no",
"Transfer-Encoding: chunked",
"",
"",
]
resp = "\r\n".join(_resp).encode()
print(f"response header sent: {resp}")
sock.sendall(resp)
cnt = 0
while True:
cnt += 1
tm = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
_data = f"line {cnt} + {tm}"
xlen = len(_data)
data = f'{xlen:x}\r\n{_data}\r\n'.encode()
print(f"chunk send: {data}")
sock.sendall(data)
if cnt >= 5:
break
time.sleep(1)
data = '0\r\n\r\n\r\n'.encode()
print(f"chunk send: {data}")
sock.sendall(data)
sock.close()
print("http serve done")
def serve_http(sock: socket.socket):
recv = sock.recv(102400)
print("====== request data received ======")
print(recv.decode())
print("====== request data received print end ======")
# return chunked_test(sock)
http2_test(sock)
def serve():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 7556))
sock.listen()
while True:
conn, _addr = sock.accept()
serve_http(conn)
if __name__ == '__main__':
serve()