Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop RedisKit dependency to use Vapor/Redis directly #19

Merged
merged 2 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ let package = Package(
dependencies: [
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.0"),
.package(url: "https://github.com/vapor/queues.git", from: "1.0.0"),
.package(url: "https://github.com/vapor/redis-kit.git", from: "1.0.0-beta.5"),
.package(url: "https://github.com/vapor/redis.git", from: "4.0.0-beta.6.1"),
],
targets: [
.target(
name: "QueuesRedisDriver",
dependencies: [
.product(name: "Vapor", package: "vapor"),
.product(name: "Queues", package: "queues"),
.product(name: "RedisKit", package: "redis-kit"),
.product(name: "Redis", package: "redis"),
]
),
.testTarget(
Expand Down
2 changes: 1 addition & 1 deletion Sources/QueuesRedisDriver/Exports.swift
Original file line number Diff line number Diff line change
@@ -1 +1 @@
@_exported import RedisKit
@_exported import Redis
75 changes: 62 additions & 13 deletions Sources/QueuesRedisDriver/JobsRedisDriver.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Queues
import RedisKit
import Redis
import NIO
import Foundation
import Vapor
Expand Down Expand Up @@ -27,7 +27,7 @@ extension Application.Queues.Provider {
/// - Throws: An error describing an invalid URL
/// - Returns: The new provider
public static func redis(url: URL) throws -> Self {
guard let configuration = RedisConfiguration(url: url) else {
guard let configuration = try? RedisConfiguration(url: url) else {
throw InvalidRedisURL(url: url.absoluteString)
}
return .redis(configuration)
Expand All @@ -45,7 +45,7 @@ extension Application.Queues.Provider {

/// A `QueuesDriver` for Redis
public struct RedisQueuesDriver {
let pool: EventLoopGroupConnectionPool<RedisConnectionSource>
let pool: RedisConnectionPool

/// Creates the RedisQueuesDriver
/// - Parameters:
Expand All @@ -54,16 +54,21 @@ public struct RedisQueuesDriver {
public init(configuration: RedisConfiguration, on eventLoopGroup: EventLoopGroup) {
let logger = Logger(label: "codes.vapor.redis")
self.pool = .init(
source: .init(configuration: configuration, logger: logger),
maxConnectionsPerEventLoop: 1,
logger: logger,
on: eventLoopGroup
serverConnectionAddresses: configuration.serverAddresses,
loop: eventLoopGroup.next(),
maximumConnectionCount: configuration.pool.maximumConnectionCount,
minimumConnectionCount: configuration.pool.minimumConnectionCount,
connectionPassword: configuration.password,
connectionLogger: logger,
poolLogger: logger,
connectionBackoffFactor: configuration.pool.connectionBackoffFactor,
initialConnectionBackoffDelay: configuration.pool.initialConnectionBackoffDelay
)
}

/// Shuts down the driver
public func shutdown() {
self.pool.shutdown()
self.pool.close()
}
}

Expand All @@ -74,14 +79,14 @@ extension RedisQueuesDriver: QueuesDriver {
/// - Returns: The created `Queue`
public func makeQueue(with context: QueueContext) -> Queue {
_QueuesRedisQueue(
client: pool.pool(for: context.eventLoop).client(),
client: self.pool,
context: context
)
}
}

struct _QueuesRedisQueue {
let client: RedisClient
struct _QueuesRedisQueue<Client: RedisClient> {
let client: Client
let context: QueueContext
}

Expand All @@ -94,8 +99,34 @@ extension _QueuesRedisQueue: RedisClient {
self.client.send(command: command, with: arguments)
}

func setLogging(to logger: Logger) {
self.client.setLogging(to: logger)
func logging(to logger: Logger) -> RedisClient {
return self.client.logging(to: logger)
}

func subscribe(
to channels: [RedisChannelName],
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
) -> EventLoopFuture<Void> {
return self.client.subscribe(to: channels, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}

func psubscribe(
to patterns: [String],
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
) -> EventLoopFuture<Void> {
return self.client.psubscribe(to: patterns, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
}

func unsubscribe(from channels: [RedisChannelName]) -> EventLoopFuture<Void> {
return self.client.unsubscribe(from: channels)
}

func punsubscribe(from patterns: [String]) -> EventLoopFuture<Void> {
return self.client.punsubscribe(from: patterns)
}
}

Expand Down Expand Up @@ -152,3 +183,21 @@ struct DecoderUnwrapper: Decodable {
let decoder: Decoder
init(from decoder: Decoder) { self.decoder = decoder }
}

extension RedisClient {
func get<D>(_ key: RedisKey, asJSON type: D.Type) -> EventLoopFuture<D?> where D: Decodable {
return get(key, as: Data.self).flatMapThrowing { data in
return try data.flatMap { data in
return try JSONDecoder().decode(D.self, from: data)
}
}
}

func set<E>(_ key: RedisKey, toJSON entity: E) -> EventLoopFuture<Void> where E: Encodable {
do {
return try set(key, to: JSONEncoder().encode(entity))
} catch {
return eventLoop.makeFailedFuture(error)
}
}
}
2 changes: 1 addition & 1 deletion Tests/QueuesRedisDriverTests/JobsRedisDriverTests.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import QueuesRedisDriver
@testable import QueuesRedisDriver
import Queues
import XCTVapor

Expand Down