* SocketServer体系及编程接口
  - Socket套接字 Socket是一种通用的网络编程接口 和网络层次没有一一对应关系
  
  - 进程间通信 **序列化与反序列化**代价比较大
  - 进程内部 效率会较高(但单个线程效率不高) 不用序列化与反序列化
  
* 服务器端编程步骤
  - 创建Socket对象
  - 绑定IP Address 和 Port  bind()
    - IP：路由 解决主机间通信
    - 端口：和进程绑定(确定唯一进程)
  - 开始监听  listen()
  - 获取用于传送数据的Socket对象
    - socket.accept() -> (socket object, address info)  

* Server端开发
  - socket对象 --> bind((IP, Port)) --> listen --> accept --> close
                                                    |--> recv or send --> close  


newsock1和 newsock2 顺序执行 会被阻塞
下面的 accept方法 和recv方法 是阻塞的 主线程经常被阻塞而不能工作 怎么办？
解决：多线程 IO密集型 GIL不明显

In [None]:
import socket


# 1. 监听socket
server = socket.socket()  # 监听socket 默认TCP IPv4
# 2. 占用 bind (IP, Port)
server.bind(('127.0.0.1', 60000))  # 绑定只能一次
# 3. 监听
server.listen()  # listen完立即返回
print(server)  # 占文件描述符fd

# 测试客户端 Unix: sokit
newsock1, raddr1 = server.accept()  # accept默认阻塞 等队列中的连接
print(newsock1) # 该socket用来通信 知道对端地址
print(raddr1)
# 发送缓存区 会满就会阻塞 一般不会满
data = newsock1.recv(1024)  # TCP 我们知道对端是谁 阻塞，等数据到来
print(type(data), data)
msg = "Your meg = {}".format(data.decode()).encode()  # UTF-8 C/S两端一定要协商字符串编码规则
newsock1.send(msg)  # 有连接协议 不用sendto
print('=' * 30)

newsock2, raddr2 = server.accept()
print(newsock2)
data = newsock2.recv(1024)
newsock2.send(data + b'~@#')

newsock1.close()
newsock2.close()
server.close()


In [None]:
* 群聊工具实现

In [None]:
# v1.0

import socket
import logging


FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(level=20, format=FORMAT, datefmt='%F %T')


# 实现群聊工具 TCP Server
# Server OSError start stop  OOD 面向对象设计
class ChatServer:
    def __init__(self, ip='localhost', port=9999):  # TCP UDP
        self.addr = ip, port
        self.sock = socket.socket()

    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()

        newsock, raddr = self.sock.accept()  # 从队列中取一个连接来
        logging.info(newsock)
        while True:  # Echo Server
            data = newsock.recv(1024)
            msg = "Your msg = {}".format(data.decode()).encode()
            newsock.send(msg)

    def stop(self):
        self.sock.close()


cs = ChatServer()  # 监听 IP Port
cs.start()

In [None]:
Main-Thread -> accept-thread -> thread-1 recv send
                             -> thread-2 recv send
                             -> thread-N recv send

In [None]:
# v2.0

import socket
import logging
import threading
import datetime


FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(level=20, format=FORMAT, datefmt='%F %T')


# 实现群聊工具 TCP Server
# Server OSError start stop  OOD 面向对象设计
class ChatServer:
    def __init__(self, ip='localhost', port=9999):  # TCP UDP
        self.addr = ip, port
        self.sock = socket.socket()

    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        threading.Thread(target=self.accept, name='Accept').start()  # 启动1个

    def accept(self):
        while True:  # 循环取到来的连接 放到不同的线程中处理
            newsock, raddr = self.sock.accept()  # 从队列中取一个连接来
            logging.info(newsock)
            threading.Thread(target=self.recv, args=(newsock, raddr)).start()  # non-daemon

    # 每有一个客户端 就甩出一个线程 里面有newsock和对应的客户端通信
    def recv(self, sock, raddr):  # 把这段会阻塞的代码 扔到单独的线程中执行
        while True:  # Echo Server
            data = sock.recv(1024)
            msg = "[{:%F %T}] {}:{} {}".format(
                datetime.datetime.now(),
                *raddr,
                data.decode('gbk')
            ).encode('gbk')
            sock.send(msg)

    def stop(self):
        self.sock.close()


def main():
    cs = ChatServer()  # 监听 IP Port
    cs.start()
    while True:
        cmd = input('>>>')
        if cmd == 'quit':
            cs.stop()
            break
        print(threading.active_count(), *threading.enumerate(), sep='\n')


if __name__ == '__main__':
    main()

In [None]:
# v3.0 把while True -> 使用threading.Event() -- is_set() set()
# Echo Server 回音

import socket
import logging
import threading
import datetime


FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(level=20, format=FORMAT, datefmt='%F %T')


# 实现群聊工具 TCP Server
# Server OSError start stop  OOD 面向对象设计
class ChatServer:
    def __init__(self, ip='localhost', port=9999):  # TCP UDP
        self.addr = ip, port
        self.sock = socket.socket()
        self.event = threading.Event()

    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        threading.Thread(target=self.accept, name='Accept').start()  # 启动1个

    def accept(self):
        while not self.event.is_set():  # 循环取到来的连接 放到不同的线程中处理
            newsock, raddr = self.sock.accept()  # 从队列中取一个连接来
            logging.info(newsock)
            threading.Thread(target=self.recv, args=(newsock, raddr)).start()  # non-daemon

    # 每有一个客户端 就甩出一个线程 里面有newsock和对应的客户端通信
    def recv(self, sock, raddr):  # 把这段会阻塞的代码 扔到单独的线程中执行
        while not self.event.is_set():  # Echo Server
            data = sock.recv(1024)
            msg = "[{:%F %T}] {}:{} {}".format(
                datetime.datetime.now(),
                *raddr,
                data.decode('gbk')
            ).encode('gbk')
            sock.send(msg)

    def stop(self):
        self.event.set()  # 结束的时候 event 置为True
        self.sock.close()


def main():
    cs = ChatServer()  # 监听 IP Port
    cs.start()
    while True:
        cmd = input('>>>')
        if cmd == 'quit':
            cs.stop()
            break
        print(threading.active_count(), *threading.enumerate(), sep='\n')


if __name__ == '__main__':
    main()


In [None]:
# v4.0 
# 实现群发
# todo: 关闭时候的异常捕获 aeecpt线程 和 recv工作线程


import socket
import logging
import threading
import datetime


FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(level=20, format=FORMAT, datefmt='%F %T')


# 实现群聊工具 TCP Server
# Server OSError start stop  OOD 面向对象设计
class ChatServer:
    def __init__(self, ip='localhost', port=9999):  # TCP UDP
        self.addr = ip, port
        self.sock = socket.socket()
        self.event = threading.Event()
        self.clients = []

    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        threading.Thread(target=self.accept, name='Accept').start()  # 启动1个

    def accept(self):
        while not self.event.is_set():  # 循环取到来的连接 放到不同的线程中处理
            newsock, raddr = self.sock.accept()  # 从队列中取一个连接来
            self.clients.append(newsock)
            logging.info(newsock)
            threading.Thread(target=self.recv, args=(newsock, raddr)).start()  # non-daemon

    # 每有一个客户端 就甩出一个线程 里面有newsock和对应的客户端通信
    def recv(self, sock, raddr):  # 把这段会阻塞的代码 扔到单独的线程中执行
        while not self.event.is_set():  # Echo Server
            data = sock.recv(1024)
            msg = "[{:%F %T}] {}:{} {}".format(
                datetime.datetime.now(),
                *raddr,
                data.decode('gbk')
            ).encode('gbk')
            for c in self.clients:
                c.send(msg)

    def stop(self):
        self.event.set()  # 结束的时候 event 置为True
        for c in self.clients:
            c.close()
        self.sock.close()


def main():
    cs = ChatServer()  # 监听 IP Port
    cs.start()
    while True:
        cmd = input('>>>')
        if cmd == 'quit':
            cs.stop()
            break
        print(threading.active_count(), *threading.enumerate(), sep='\n')
        print(cs.clients)


if __name__ == '__main__':
    main()

**遍历过程中 改变hash表Size [set dict]都有这个问题**

* 遍历过程中 改变hash表Size 会抛出RuntimeError
RuntimeError: dictionary changed size during iteration

In [None]:
# v5.0 
# 注意几处需要加锁的地方 保证线程安全

import socket
import logging
import threading
import datetime


FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(level=20, format=FORMAT, datefmt='%F %T')


# 实现群聊工具 TCP Server
# Server OSError start stop  OOD 面向对象设计
class ChatServer:
    def __init__(self, ip='localhost', port=9999):  # TCP UDP
        self.addr = ip, port
        self.sock = socket.socket()
        self.event = threading.Event()
        self.lock = threading.Lock()
        self.clients = {}
        # [] 当sock对象数量增大 使用列表效率低
        # set() 使用集合 不方便观察

    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        threading.Thread(target=self.accept, name='Accept').start()  # 启动1个

    def accept(self):
        while not self.event.is_set():  # 循环取到来的连接 放到不同的线程中处理
            newsock, raddr = self.sock.accept()  # 从队列中取一个连接来
            with self.lock:
                self.clients[raddr] = newsock
            logging.info(newsock)
            threading.Thread(target=self.recv, args=(newsock, raddr)).start()  # non-daemon

    # 每有一个客户端 就甩出一个线程 里面有newsock和对应的客户端通信
    def recv(self, sock, raddr):  # 把这段会阻塞的代码 扔到单独的线程中执行
        while not self.event.is_set():  # Echo Server
            data = sock.recv(1024)  # 抛异常
            print(data, '+++++')  # 客户端主动断开 会发送一个空bytes
            if not data or data == b'quit':
                logging.info('{} bye bye'.format(raddr))
                sock.close()
                # 清理工作
                with self.lock:
                    self.clients.pop(raddr)
                break  # 跳出当前循环 recv终止
            msg = "[{:%F %T}] {}:{} {}".format(
                datetime.datetime.now(),
                *raddr,
                data.decode('gbk')
            ).encode('gbk')
            # 一个线程遍历之后 CPU时间片用完 切到另一个线程需要退出(line:43) 或者增加(line:30)
            # 操作 self.clients 导致线程不安全
            with self.lock:  # 线程安全 必须要等 
                for c in self.clients.values():
                    c.send(msg)

    def stop(self):
        self.event.set()  # 结束的时候 event 置为True
        with self.lock:
            for c in self.clients.values():
                c.close()
        self.sock.close()


def main():
    cs = ChatServer()  # 监听 IP Port
    cs.start()
    while True:
        cmd = input('>>>')
        if cmd == 'quit':
            cs.stop()
            break
        print(threading.active_count(), *threading.enumerate(), sep='\n')
        print(cs.clients.keys())


if __name__ == '__main__':
    main()

In [None]:
* 同步 --> 优化: 同步变异步 --> 使用队列：kafka、Queue

In [None]:
# v6.0 
# 处理 客户端异常退出(崩溃) 产生的异常
# 清理self.clients 保存的无用的newsock对象

import socket
import logging
import threading
import datetime


FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(level=20, format=FORMAT, datefmt='%F %T')


# 实现群聊工具 TCP Server
# Server OSError start stop  OOD 面向对象设计
class ChatServer:
    def __init__(self, ip='localhost', port=9999):  # TCP UDP
        self.addr = ip, port
        self.sock = socket.socket()
        self.event = threading.Event()
        self.lock = threading.Lock()
        self.clients = {}
        # [] 当sock对象数量增大 使用列表效率低
        # set() 使用集合 不方便观察

    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        threading.Thread(target=self.accept, name='Accept').start()  # 启动1个

    def accept(self):
        while not self.event.is_set():  # 循环取到来的连接 放到不同的线程中处理
            newsock, raddr = self.sock.accept()  # 从队列中取一个连接来
            with self.lock:
                self.clients[raddr] = newsock
            logging.info(newsock)
            threading.Thread(target=self.recv, args=(newsock, raddr)).start()  # non-daemon

    # 每有一个客户端 就甩出一个线程 里面有newsock和对应的客户端通信
    def recv(self, sock, raddr):  # 把这段会阻塞的代码 扔到单独的线程中执行
        while not self.event.is_set():  # Echo Server
            try:
                data = sock.recv(1024)  # 抛异常
            except Exception as e:  # SOError
                logging.error(e)  # 连接不可用
                data = b''
            print(data, '+++++')  # 客户端主动断开 会发送一个空bytes
            if not data or data == b'quit':
                logging.info('{} bye bye'.format(raddr))
                sock.close()
                # 清理工作
                with self.lock:
                    self.clients.pop(raddr)
                break  # 跳出当前循环 recv终止
            msg = "[{:%F %T}] {}:{} {}".format(
                datetime.datetime.now(),
                *raddr,
                data.decode('gbk')
            ).encode('gbk')
            # 一个线程遍历之后 CPU时间片用完 切到另一个线程需要退出(line:43) 或者增加(line:30)
            # 操作 self.clients 导致线程不安全
            with self.lock:
                for c in self.clients.values():
                    c.send(msg)

    def stop(self):
        self.event.set()  # 结束的时候 event 置为True
        with self.lock:
            for c in self.clients.values():
                c.close()
        self.sock.close()


def main():
    cs = ChatServer()  # 监听 IP Port
    cs.start()
    while True:
        cmd = input('>>>')
        if cmd == 'quit':
            cs.stop()
            break
        print(threading.active_count(), *threading.enumerate(), sep='\n')
        print(cs.clients.keys())


if __name__ == '__main__':
    main()



In [None]:
其他线程 recv阻塞的时候 直接关闭 会报错
解决： 1. 异常捕获
      2. 直接使用daemon线程 socket.close()之后 其他recv daemon线程不用管 
         随着一起关闭(把recv和aeecpt线程设置为daemon线程)

* MakeFile
对比socket 
  - 将recv方法 对映成 文件读操作(read|readline)
  - 将send方法 对应成 文件写操作(write)

In [None]:

import socket


server = socket.socket()
server.bind(('127.0.0.1', 9999))
server.listen()

newsock, client = server.accept()
f = newsock.makefile(mode='rw', encoding='utf-8')  # mode和文件操作稍微不一样 注意区分
print(newsock)
print(f)
data = f.readline()  # str
print(type(data), data)
f.write(data + '###')
f.flush()


# data = newsock.recv(1024)  # bytes
# newsock.send(data)

newsock.close()
server.close()

In [None]:
* 客户端实现
ChatClient

In [None]:
# 群聊 客户端版本
import logging
import socket
import threading


# FORMAT = '%(asctime)s %(threadNmae)s %(thread)d %(message)s'
FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=20, format=FORMAT, datefmt='%F %T')


class ChatClient:
    def __init__(self, rip='localhost', rport=9999):
        self.__raddr = rip, rport
        self.sock = socket.socket()
        self.event = threading.Event()
        # self.encoding = 'utf-8'  # 与服务端约定编码

    def start(self):
        # 捕获异常 断线重连；连接失败重连
        self.sock.connect(self.__raddr)

        # 模拟client向服务器汇报
        self.send('say hello')
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        while not self.event.is_set():
            data = self.sock.recv(1024)
            logging.info(data)

    def send(self, msg:str):
        self.sock.send(msg.encode())

    def stop(self):
        self.sock.close()


def main():
    cc = ChatClient()
    cc.start()
    while True:
        cmd = input('>>>').strip()
        if cmd == 'quit':
            cc.stop()
            break
        cc.send(cmd)
        print(threading.enumerate())


if __name__ == '__main__':
    main()

* SocketServer

上面socket写的Server 出了消息处理部分 其余大部分都是固定套路

socket编程过于底层 编程虽然有套路 但是想要写出**健壮**的代码还是比较困难的 所以很多语言都对socket底层API进行封装
Python的封装就是sockerserver模块 它是网络服务编程框架 便于企业级快速开发

* 类的继承关系

BaseServer  （基类 约定应该实现的方法）
    |
TCPServer  ->  UnixStreamServer
    |
DDPServer  ->  UnixDatagramServer

SockerServer简化了网络服务器的编写
它有4个同步类：
    - TCPServer
    - UDPServer
    - UnixStreamServer
    - UnixDatagramServer
2个Mixin类：ForkingMixIn(多进程支持) 和 ThreadingMixIn(多线程支持)类 用来支持异步
由此得到：
    - class ForkingUDPServer(ForkingMixIn, UDPServer): pass
    - class ForkingTCPServer(ForkingMixIn, TCPServer): pass
    - class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
    - class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

fork是创建多进程 thread是创建多线程
fork需要操作系统支持 Windows不支持

In [None]:
* socketserver

In [None]:
import socketserver
import logging
import threading


FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(level=20, format=FORMAT, datefmt='%F %T')


# 实现一个EchoServer

# 作业：使用socketserver重写群聊软件

# 给你个基类 定义了处理流程
class MyHandler(socketserver.BaseRequestHandler):
    def handle(self) -> None:  # recv方法
        print(self.request)  # socket对象
        print(self.client_address)  # 客户端地址：Port
        print(self.server)  # TCPServer
        print(id(self.server))
        print('=' * 30)

        while True:  # 单线程 永远阻塞在处理第一个连接
            # for i in range(3):  # 单线程 阻塞在handle 三次握手完成 把下一个连接放在队列中等待
            data = self.request.recv(1024)
            msg = "### {} ###".format(data.decode()).encode()
            self.request.send(msg)
            print('-' * 30)
            logging.info(threading.enumerate())


# class BaseServer:
#     def __init__(self, server_address, RequestHandlerClass):
#         """Constructor.  May be extended, do not override."""
#         self.server_address = server_address
#         self.RequestHandlerClass = RequestHandlerClass
#         self.__is_shut_down = threading.Event()
#         self.__shutdown_request = False
#
#     def finish_request(self, request, client_address):
#         """Finish one request by instantiating RequestHandlerClass."""
#         self.RequestHandlerClass(request, client_address, self)  # 可能有若干实例

# 请求是什么？ 客户端发来的数据
# 一个请求时什么？

# socketserver socket编程框架
# server = socketserver.TCPServer(('127.0.0.1', 9999), MyHandler)
server = socketserver.ThreadingTCPServer(('127.0.0.1', 9999), MyHandler)
print(1, id(server))
# server.handle_request()  # 处理客户端请求 一次连接(一次请求)处理完
server.serve_forever()  # IO多路复用
server.server_close()  # close & clean-up

In [None]:
* 实现一个EchoServer
* 使用socketserver重写群聊软件

In [None]:
import socketserver
import logging
import threading


FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(level=20, format=FORMAT, datefmt='%F %T')


# 实现一个EchoServer

# 作业：使用socketserver重写群聊软件

# 给你个基类 定义了处理流程
class EchoHandler(socketserver.BaseRequestHandler):
    def setup(self) -> None:
        super().setup()
        self.event = threading.Event()

    def finish(self) -> None:
        super().finish()
        self.event.set()

    def handle(self) -> None:  # recv方法
        super().handle()
        print('=' * 30)
        while True:  # 单线程 永远阻塞在处理第一个连接
            data = self.request.recv(1024)
            if not data or data == b'quit':
                break
            msg = "### {} ###".format(data.decode()).encode()
            self.request.send(msg)
            print('-' * 30)
            logging.info(threading.enumerate())


# 请求是什么？ 客户端发来的数据
# 一个请求时什么？

# socketserver socket编程框架
# server = socketserver.TCPServer(('127.0.0.1', 9999), MyHandler)
server = socketserver.ThreadingTCPServer(('127.0.0.1', 9999), EchoHandler)
# server.serve_forever()  # IO多路复用
threading.Thread(target=server.serve_forever, name='serve').start()

while True:
    cmd = input('>>>')
    if cmd == 'quit':
        server.server_close()
        break
    print(threading.enumerate())
# server.server_close()  # close & clean-up
print('=' * 30)