Skip to content

Commit

Permalink
wip review
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoughjr committed Aug 29, 2023
1 parent efb85a4 commit 044755f
Showing 1 changed file with 12 additions and 86 deletions.
98 changes: 12 additions & 86 deletions Sources/WebSocketKit/PMCE.swift
Original file line number Diff line number Diff line change
Expand Up @@ -276,22 +276,7 @@ public final class PMCE: Sendable {

/// The channel whose allocator to use for the compression ByteBuffers and box event loops.
public let channel:NIO.Channel?

/// Enables/disables logging.
public var logging:Bool {
get {
_logging.withLockedValue { v in
v
}
}
set {
_logging.withLockedValue { v in
v = newValue
}
}
}



/// Represents the strategy of pmce used with the server.
public let serverConfig: PMCEConfig

Expand All @@ -317,17 +302,10 @@ public final class PMCE: Sendable {
self.channel = channel
self.extendedSocketType = peerType

self._enabled = NIOLockedValueBox(true)
self._logging = NIOLockedValueBox(true)



switch extendedSocketType {
case .server:
logger.trace("server")

let winSize = PMCE.sizeFor(bits: serverConfig.deflateConfig.agreedParams.maxWindowBits ?? 15)
logger.trace("window size: \(winSize)")

let zscConf = ZlibConfiguration(windowSize: winSize,
compressionLevel: serverConfig.deflateConfig.zlibConfig.compressionLevel,
Expand All @@ -342,14 +320,11 @@ public final class PMCE: Sendable {
eventLoop: channel.eventLoop)
self.decompressorBox = NIOLoopBoundBox(CompressionAlgorithm.deflate(configuration: zsdConf).decompressor,
eventLoop: channel.eventLoop)
logger.trace("compressor \(zscConf)")
logger.trace("decompressor \(zsdConf)")


case .client:
logger.trace("client")

let winSize = PMCE.sizeFor(bits: clientConfig.deflateConfig.agreedParams.maxWindowBits ?? 15)
logger.trace("window size: \(winSize)")

let zccConf = ZlibConfiguration(windowSize: winSize,
compressionLevel: clientConfig.deflateConfig.zlibConfig.compressionLevel,
Expand All @@ -367,15 +342,13 @@ public final class PMCE: Sendable {
self.decompressorBox = NIOLoopBoundBox( CompressionAlgorithm.deflate(configuration: zcdConf).decompressor,
eventLoop: channel.eventLoop)

logger.trace("compressor \(zccConf)")
logger.trace("decompressor \(zcdConf)")

}
startStreams()
}

/// Starts compress-nio streams for DEFLATE support.
/// - returns: Void
public func startStreams() {

func startStreams() {
do {
try compressorBox.value?.startStream()
}
Expand All @@ -390,9 +363,7 @@ public final class PMCE: Sendable {
}
}

/// Stops compress-nio streams for DEFLATE support.
/// - returns: Void
public func stopStreams() {
func stopStreams() {
do {
logger.debug("PMCE: stopping compressor stream...")
try compressorBox.value?.finishStream()
Expand Down Expand Up @@ -422,41 +393,18 @@ public final class PMCE: Sendable {
}
let notakeover = !shouldTakeOverContext()

let startSize = buffer.readableBytes

if logging {
logger.trace("compressing \(startSize) bytes for \(opCode)")
}
do {
var mutBuffer = buffer

if !notakeover {
mutBuffer = unpad(buffer:buffer)
}

let startTime = Date()

let compressed =
try mutBuffer.compressStream(with: compressorBox.value!,
flush: .sync,
allocator: channel.allocator)
if logging {
let endTime = Date()
let endSize = compressed.readableBytes

logger.trace("compressed \(startSize) to \(endSize) bytes @ \(startSize / endSize) ratio from")
switch extendedSocketType {
case .server:
logger.trace(" \(serverConfig.deflateConfig.zlibConfig)")
case .client:
logger.trace(" \(clientConfig.deflateConfig.zlibConfig)")
}

logger.trace("in \(startTime.distance(to: endTime))")
}

let compressed = try mutBuffer.compressStream(with: compressorBox.value!,
flush: .sync,
allocator: channel.allocator)

if notakeover {
logger.trace("resetting compressor stream")
try compressorBox.value?.resetStream()
}else {
}
Expand Down Expand Up @@ -485,15 +433,8 @@ public final class PMCE: Sendable {
}
let takeover = shouldTakeOverContext()

let startTime = Date()



var data = frame.data
let startSize = data.readableBytes
if logging {
logger.trace("PMCE: decompressing \(startSize) bytes for \(frame.opcode)")
}
logger.trace("PMCE: config: \(serverConfig) \(clientConfig)")

if takeover {
data = pad(buffer:data)
Expand All @@ -503,23 +444,10 @@ public final class PMCE: Sendable {
try data.decompressStream(with: self.decompressorBox.value!,
maxSize: .max,
allocator: channel.allocator)
if logging {
let endTime = Date()

let endSize = decompressed.readableBytes

logger.trace("deompressed \(startSize) to \(endSize) bytes @ \(endSize/startSize) ratio from")
logger.trace(" \(serverConfig.deflateConfig.zlibConfig) \(clientConfig.deflateConfig.zlibConfig)")


logger.trace("in \(startTime.distance(to: endTime))")
}


if !takeover {
if logging { logger.trace("PMCE: resetting decompressoer stream.") }
try decompressorBox.value?.resetStream()
}else {
if logging { logger.trace("PMCE: not restting decompressor stream.")}
}

let newFrame = WebSocketFrame(fin: frame.fin,
Expand Down Expand Up @@ -554,8 +482,6 @@ public final class PMCE: Sendable {

///
private let logger = Logger(label: "PMCE")
private let _logging:NIOLockedValueBox<Bool>
private let _enabled:NIOLockedValueBox<Bool>

// Converts windowBits to size of window.
private static func sizeFor(bits:UInt8) -> Int32 {
Expand Down

0 comments on commit 044755f

Please sign in to comment.