-
Notifications
You must be signed in to change notification settings - Fork 0
/
WebServer.java
300 lines (257 loc) · 9.73 KB
/
WebServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
package com.studying.concurrency.v4;
import com.studying.concurrency.util.Logs;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URLConnection;
/**
* Created by junweizhang on 17/11/22.
* 第四版 缓解监听器线程忙等问题.
* 抽象出五个角色:
* Bootstrap-启动器
* WebServer-Web服务器
* Worker-处理HTTP请求的工作者
* Acceptor-监听器
* Queue-任务队列
*/
public class WebServer {
// ServerSocket
private ServerSocket ss;
// 根目录
private File docRoot;
// 服务是否停止
private boolean isStop = false;
// HTTP监听端口
private int port = 8080;
// 处理HTTP请求线程
private Thread workerThread;
// 监听Socket线程
private Thread acceptorThread;
// 监听到的socket队列
private SimpleQueue<Socket> socketQueue;
public WebServer(int port, File docRoot) throws Exception {
// 1. 服务端启动8080端口,并一直监听;
this.port = port;
this.ss = new ServerSocket(port, 10);
this.docRoot = docRoot;
this.socketQueue = new SimpleQueue<>(3);
start(this);
}
/**
* 必需先启动工作线程,再启动监听线程.
*/
private void start(WebServer server) {
// 启动工作线程,工作线程,可以作为守护线程
workerThread = new Thread(new Worker());
workerThread.setName("worker-process-thread");
workerThread.setDaemon(true);
workerThread.start();
Logs.SERVER.info("start worker thread : {} ...", workerThread.getName());
// 启动监听线程,监听线程,不作为守护线程,保证JVM不退出.
acceptorThread = new Thread(new Acceptor());
acceptorThread.setName("http-acceptor-" + port + "-thread");
acceptorThread.start();
Logs.SERVER.info("start acceptor thread : {} ...", acceptorThread.getName());
}
public void serve() {
Logs.SERVER.info("Http Server ready to receive requests...");
while (!isStop) {
try {
Socket socket = listen();
process(socket);
} catch (Exception e) {
Logs.SERVER.error("serve error", e);
isStop = true;
// System.exit(1);
}
}
}
/**
* 2. 监听到有客户端(比如浏览器)要请求http://localhost:8080/,那么建议连接,TCP三次握手;
*/
private Socket listen() throws IOException {
return ss.accept();
}
/**
* 由监听线程往队列中放入socket,以备工作线程从中取值进行处理.
*/
private void assign(Socket socket) throws Exception {
socketQueue.put(socket);
}
/**
* 工作线程从队列中取出socket.
*/
private Socket await() throws Exception {
return socketQueue.take();
}
/**
* 3. 处理接收到的Socket,解析输入字节流,并返回结果.
*/
private void process(Socket socket) throws Exception {
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
/**
* 3. 建立连接后,读取此次连接客户端传来的内容(其实就是解析网络字节流并按HTTP协议去解析);
* GET /dir1/dir2/file.html HTTP/1.1
*/
String requestLine = reader.readLine();
Logs.SERVER.info("requestLine is : {}", requestLine);
if (requestLine == null || requestLine.length() < 1) {
Logs.SERVER.error("could not read request");
return;
}
String[] tokens = requestLine.split(" ");
String method = tokens[0];
String fileName = tokens[1];
File requestedFile = docRoot;
String[] paths = fileName.split("/");
for (String path : paths) {
requestedFile = new File(requestedFile, path);
}
if (requestedFile.exists() && requestedFile.isDirectory()) {
requestedFile = new File(requestedFile, "index.html");
}
BufferedOutputStream bos = new BufferedOutputStream(os);
// 4. 解析到请求路径(比如此处是根路径),那么去根路径下找资源(比如此处是index.html文件);
if (requestedFile.exists()) {
Logs.SERVER.info("return 200 ok");
long length = requestedFile.length();
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(requestedFile));
String contentType = URLConnection.guessContentTypeFromStream(bis);
byte[] headerBytes = createHeaderBytes("HTTP/1.1 200 OK", length, contentType);
bos.write(headerBytes);
// 5. 找到资源后,再通过网络流将内容输出,当然,还是按照HTTP协议去输出,这样客户端(浏览器)就能正常渲染、显示网页内容;
byte[] buf = new byte[2000];
int blockLen;
while ((blockLen = bis.read(buf)) != -1) {
bos.write(buf, 0, blockLen);
}
bis.close();
} else {
Logs.SERVER.info("return 404 not found");
byte[] headerBytes = createHeaderBytes("HTTP/1.0 404 Not Found", -1, null);
bos.write(headerBytes);
}
bos.flush();
socket.close();
}
/**
* 生成HTTP Response头.
*
* @param content
* @param length
* @param contentType
* @return
*/
private byte[] createHeaderBytes(String content, long length, String contentType) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(baos));
bw.write(content + "\r\n");
if (length > 0) {
bw.write("Content-Length: " + length + "\r\n");
}
if (contentType != null) {
bw.write("Content-Type: " + contentType + "\r\n");
}
bw.write("\r\n");
bw.flush();
byte[] data = baos.toByteArray();
bw.close();
return data;
}
/**
* 接收器,监听HTTP端口,接收Socket.
*/
public class Acceptor implements Runnable {
@Override
public void run() {
try {
while (!isStop) {
Logs.SERVER.info("acceptor begin listen socket ...");
Socket s = listen();
Logs.SERVER.info("acceptor a new socket : {}", s);
assign(s);
}
} catch (Exception e) {
Logs.SERVER.error("Acceptor process error", e);
}
}
}
/**
* 处理HTTP请求的工作者.
*/
public class Worker implements Runnable {
@Override
public void run() {
try {
while (!isStop) {
Socket s = await();
if (s != null) {
Logs.SERVER.info("worker begin process socket : {}", s);
process(s);
}
}
} catch (Exception e) {
Logs.SERVER.error("Worker process error", e);
}
}
}
/**
* 一个简单的阻塞队列(先进先出),线程安全,不支持扩容,用数组实现.
*/
public class SimpleQueue<E> {
// 元素数据
private Object[] items;
// 队列容量
private int capacity;
// 队列头索引
private int putIndex;
// 队列尾索引
private int takeIndex;
// 队列当前元素个数
private int size;
public SimpleQueue(int cap) {
this.capacity = cap;
this.items = new Object[cap];
this.size = 0;
this.putIndex = 0;
this.takeIndex = 0;
}
public synchronized void put(E e) throws InterruptedException {
// 监听器线程往队列中放入socket,如果当前队列满了则监听器等待
if (isFull()) {
Logs.SERVER.info("{} wait put queue : {}", Thread.currentThread().getName(), e);
wait();
}
// 若队列没满,则监听器线程往队列中放入socket;并且如果先前已经有工作线程在等待取数据,通知工作线程来取
items[putIndex] = e;
putIndex = (putIndex + 1) % capacity;
size++;
Logs.SERVER.info("queue isFull {}, isEmpty {}, capacity {}, size {}, takeIndex {}, putIndex {}", isFull(), isEmpty(), capacity, size, takeIndex, putIndex);
notify();
}
public synchronized E take() throws InterruptedException {
// 工作线程来取socket,如果当前队列为空,则工作线程进行等待
if (isEmpty()) {
Logs.SERVER.info("{} wait get socket", Thread.currentThread().getName());
wait();
}
// 队列不为空,工作线程从队列中取出socket;并且如果先前有监听器线程在等待往队列中放数据,通知监听器线程放
E e = (E) items[takeIndex];
// 将已经取走的引用置为空,让GC可以回收
items[takeIndex] = null;
takeIndex = (takeIndex + 1) % capacity;
size--;
Logs.SERVER.info("queue isFull {}, isEmpty {}, capacity {}, size {}, takeIndex {}, putIndex {}", isFull(), isEmpty(), capacity, size, takeIndex, putIndex);
notify();
return e;
}
public synchronized boolean isFull() {
return capacity == size;
}
public synchronized boolean isEmpty() {
return size == 0;
}
}
}