Skip to content

Commit

Permalink
Add custom HTTP session abilities. (#344)
Browse files Browse the repository at this point in the history
* Use associatedtypes to clean up URLSession conformance

* Add ability to have custom HTTP sessions.

* Added/updated tests

* Fixed linux exclusion
  • Loading branch information
bsneed committed May 14, 2024
1 parent a1af4aa commit c4f8f12
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 57 deletions.
11 changes: 11 additions & 0 deletions Sources/Segment/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class Configuration {
var jsonNonConformingNumberStrategy: JSONSafeEncoder.NonConformingFloatEncodingStrategy = .zero
var storageMode: StorageMode = .disk
var anonymousIdGenerator: AnonymousIdGenerator = SegmentAnonymousId()
var httpSession: (() -> any HTTPSession) = HTTPSessions.urlSession
}

internal var values: Values
Expand Down Expand Up @@ -272,6 +273,16 @@ public extension Configuration {
values.anonymousIdGenerator = generator
return self
}

/// Use a custom HTTP session; Useful for non-apple platforms where Swift networking isn't as mature
/// or has issues to work around.
/// - Parameter httpSession: A class conforming to the HTTPSession protocol
/// - Returns: The current configuration
@discardableResult
func httpSession(_ httpSession: @escaping @autoclosure () -> any HTTPSession) -> Configuration {
values.httpSession = httpSession
return self
}
}

extension Analytics {
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
internal struct UploadTaskInfo {
let url: URL?
let data: Data?
let task: URLSessionDataTask
let task: DataTask
// set/used via an extension in iOSLifecycleMonitor.swift
typealias CleanupClosure = () -> Void
var cleanup: CleanupClosure? = nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public enum HTTPClientErrors: Error {
public class HTTPClient {
private static let defaultAPIHost = "api.segment.io/v1"
private static let defaultCDNHost = "cdn-settings.segment.com/v1"
internal var session: URLSession

internal var session: any HTTPSession
private var apiHost: String
private var apiKey: String
private var cdnHost: String
Expand All @@ -35,7 +35,7 @@ public class HTTPClient {
self.apiHost = analytics.configuration.values.apiHost
self.cdnHost = analytics.configuration.values.cdnHost

self.session = Self.configuredSession(for: self.apiKey)
self.session = analytics.configuration.values.httpSession()
}

func segmentURL(for host: String, path: String) -> URL? {
Expand All @@ -52,7 +52,7 @@ public class HTTPClient {
/// - batch: The array of the events, considered a batch of events.
/// - completion: The closure executed when done. Passes if the task should be retried or not if failed.
@discardableResult
func startBatchUpload(writeKey: String, batch: URL, completion: @escaping (_ result: Result<Bool, Error>) -> Void) -> URLSessionDataTask? {
func startBatchUpload(writeKey: String, batch: URL, completion: @escaping (_ result: Result<Bool, Error>) -> Void) -> (any DataTask)? {
guard let uploadURL = segmentURL(for: apiHost, path: "/b") else {
self.analytics?.reportInternalError(HTTPClientErrors.failedToOpenBatch)
completion(.failure(HTTPClientErrors.failedToOpenBatch))
Expand All @@ -77,7 +77,7 @@ public class HTTPClient {
/// - batch: The array of the events, considered a batch of events.
/// - completion: The closure executed when done. Passes if the task should be retried or not if failed.
@discardableResult
func startBatchUpload(writeKey: String, data: Data, completion: @escaping (_ result: Result<Bool, Error>) -> Void) -> URLSessionDataTask? {
func startBatchUpload(writeKey: String, data: Data, completion: @escaping (_ result: Result<Bool, Error>) -> Void) -> (any UploadTask)? {
guard let uploadURL = segmentURL(for: apiHost, path: "/b") else {
self.analytics?.reportInternalError(HTTPClientErrors.failedToOpenBatch)
completion(.failure(HTTPClientErrors.failedToOpenBatch))
Expand Down Expand Up @@ -199,11 +199,4 @@ extension HTTPClient {

return request
}

internal static func configuredSession(for writeKey: String) -> URLSession {
let configuration = URLSessionConfiguration.ephemeral
configuration.httpMaximumConnectionsPerHost = 2
let session = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
return session
}
}
11 changes: 11 additions & 0 deletions Sources/Segment/Utilities/Networking/HTTPSession+Apple.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import Foundation

#if os(Linux) || os(Windows)
import FoundationNetworking
#endif

extension URLSessionDataTask: DataTask {}
extension URLSessionUploadTask: UploadTask {}

// Give the built in `URLSession` conformance to HTTPSession so that it can easily be used
extension URLSession: HTTPSession {}
34 changes: 34 additions & 0 deletions Sources/Segment/Utilities/Networking/HTTPSession.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import Foundation

#if os(Linux) || os(Windows)
import FoundationNetworking
#endif

public protocol DataTask {
var state: URLSessionTask.State { get }
func resume()
}

public protocol UploadTask: DataTask {}

// An enumeration of default `HTTPSession` configurations to be used
// This can be extended buy consumer to easily refer back to their configured session.
public enum HTTPSessions {
/// An implementation of `HTTPSession` backed by Apple's `URLSession`.
public static func urlSession() -> any HTTPSession {
let configuration = URLSessionConfiguration.ephemeral
configuration.httpMaximumConnectionsPerHost = 2
let session = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
return session
}
}

public protocol HTTPSession {
associatedtype DataTaskType: DataTask
associatedtype UploadTaskType: UploadTask

func uploadTask(with request: URLRequest, fromFile file: URL, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> UploadTaskType
func uploadTask(with request: URLRequest, from bodyData: Data?, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> UploadTaskType
func dataTask(with request: URLRequest, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> DataTaskType
func finishTasksAndInvalidate()
}
64 changes: 56 additions & 8 deletions Tests/Segment-Tests/HTTPClient_Tests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,76 @@
// Created by Brandon Sneed on 1/21/21.
//

#if !os(Linux)

import XCTest
@testable import Segment

class HTTPClientTests: XCTestCase {

override func setUpWithError() throws {
// Put setup code here. This method is called before the invocation of each test method in the class.
RestrictedHTTPSession.reset()
}

override func tearDownWithError() throws {
// Put teardown code here. This method is called after the invocation of each test method in the class.
}

/*func testExample() throws {
func testCustomHTTPSessionUpload() throws {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(
configuration: Configuration(writeKey: "testCustomSesh")
.flushInterval(9999)
.flushAt(9999)
.httpSession(RestrictedHTTPSession())
)

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)

analytics.identify(userId: "brandon", traits: MyTraits(email: "blah@blah.com"))

let flushDone = XCTestExpectation(description: "flush done")
analytics.flush {
flushDone.fulfill()
}

wait(for: [flushDone])

XCTAssertEqual(RestrictedHTTPSession.fileUploads, 1)
}
func testPerformanceExample() throws {
// This is an example of a performance test case.
self.measure {
// Put the code you want to measure the time of here.

func testDefaultHTTPSessionUpload() throws {
// Use a specific writekey to this test so we do not collide with other cached items.
let analytics = Analytics(
configuration: Configuration(writeKey: "testDefaultSesh")
.flushInterval(9999)
.flushAt(9999)
)

// reach in and set it, would be the same as the default ultimately
let segment = analytics.find(pluginType: SegmentDestination.self)
XCTAssertTrue(!(segment?.httpClient?.session is RestrictedHTTPSession))
XCTAssertTrue(segment?.httpClient?.session is URLSession)
segment?.httpClient?.session = RestrictedHTTPSession()

waitUntilStarted(analytics: analytics)

analytics.storage.hardReset(doYouKnowHowToUseThis: true)

analytics.identify(userId: "brandon", traits: MyTraits(email: "blah@blah.com"))

let flushDone = XCTestExpectation(description: "flush done")
analytics.flush {
flushDone.fulfill()
}
}*/


wait(for: [flushDone])

XCTAssertEqual(RestrictedHTTPSession.fileUploads, 1)
}
}

#endif
54 changes: 18 additions & 36 deletions Tests/Segment-Tests/StressTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,34 @@
// Created by Brandon Sneed on 11/4/21.
//

#if !os(Linux) && !os(tvOS) && !os(watchOS)

import XCTest
@testable import Segment

class StressTests: XCTestCase {

override func setUpWithError() throws {
// Put setup code here. This method is called before the invocation of each test method in the class.
RestrictedHTTPSession.reset()
}

override func tearDownWithError() throws {
// Put teardown code here. This method is called after the invocation of each test method in the class.
}

// Linux doesn't know what URLProtocol is and on tvOS/watchOS it somehow works differently and isn't hit.
#if !os(Linux) && !os(tvOS) && !os(watchOS)
func testDirectoryStorageStress2() throws {
// register our network blocker
guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return }

let analytics = Analytics(configuration: Configuration(writeKey: "stressTest2").errorHandler({ error in
XCTFail("Storage Error: \(error)")
}))
let analytics = Analytics(configuration: Configuration(writeKey: "stressTest2")
.errorHandler({ error in
XCTFail("Storage Error: \(error)")
})
.httpSession(RestrictedHTTPSession())
)

analytics.purgeStorage()
analytics.storage.hardReset(doYouKnowHowToUseThis: true)

Expand All @@ -41,20 +47,6 @@ class StressTests: XCTestCase {

waitUntilStarted(analytics: analytics)

// set the httpclient to use our blocker session
let segment = analytics.find(pluginType: SegmentDestination.self)
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
configuration.timeoutIntervalForRequest = 60
configuration.httpMaximumConnectionsPerHost = 2
configuration.protocolClasses = [BlockNetworkCalls.self]
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]
let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
segment?.httpClient?.session = blockSession

@Atomic var ready = false
var queues = [DispatchQueue]()
for i in 0..<30 {
Expand Down Expand Up @@ -110,9 +102,12 @@ class StressTests: XCTestCase {
// register our network blocker
guard URLProtocol.registerClass(BlockNetworkCalls.self) else { XCTFail(); return }

let analytics = Analytics(configuration: Configuration(writeKey: "stressTest").errorHandler({ error in
XCTFail("Storage Error: \(error)")
}))
let analytics = Analytics(configuration: Configuration(writeKey: "stressTest2")
.errorHandler({ error in
XCTFail("Storage Error: \(error)")
})
.httpSession(RestrictedHTTPSession())
)
analytics.storage.hardReset(doYouKnowHowToUseThis: true)

DirectoryStore.fileValidator = { url in
Expand All @@ -126,20 +121,6 @@ class StressTests: XCTestCase {

waitUntilStarted(analytics: analytics)

// set the httpclient to use our blocker session
let segment = analytics.find(pluginType: SegmentDestination.self)
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
configuration.timeoutIntervalForRequest = 60
configuration.httpMaximumConnectionsPerHost = 2
configuration.protocolClasses = [BlockNetworkCalls.self]
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]
let blockSession = URLSession(configuration: configuration, delegate: nil, delegateQueue: nil)
segment?.httpClient?.session = blockSession

let writeQueue1 = DispatchQueue(label: "write queue 1", attributes: .concurrent)
let writeQueue2 = DispatchQueue(label: "write queue 2", attributes: .concurrent)
let writeQueue3 = DispatchQueue(label: "write queue 3", attributes: .concurrent)
Expand Down Expand Up @@ -317,5 +298,6 @@ class StressTests: XCTestCase {
RunLoop.main.run(until: Date.distantPast)
}
}
#endif
}

#endif
55 changes: 55 additions & 0 deletions Tests/Segment-Tests/Support/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,61 @@ extension XCTestCase {

#if !os(Linux)

class RestrictedHTTPSession: HTTPSession {
let sesh: URLSession
static var fileUploads: Int = 0
static var dataUploads: Int = 0
static var dataTasks: Int = 0
static var invalidated: Int = 0

init(blocking: Bool = true, failing: Bool = false) {
let configuration = URLSessionConfiguration.ephemeral
configuration.allowsCellularAccess = true
configuration.timeoutIntervalForResource = 30
configuration.timeoutIntervalForRequest = 60
configuration.httpMaximumConnectionsPerHost = 2
configuration.httpAdditionalHeaders = ["Content-Type": "application/json; charset=utf-8",
"Authorization": "Basic test",
"User-Agent": "analytics-ios/\(Analytics.version())"]

var protos = [URLProtocol.Type]()
if blocking { protos.append(BlockNetworkCalls.self) }
if failing { protos.append(FailedNetworkCalls.self) }
configuration.protocolClasses = protos

sesh = URLSession(configuration: configuration)
}

static func reset() {
fileUploads = 0
dataUploads = 0
dataTasks = 0
invalidated = 0
}

func uploadTask(with request: URLRequest, fromFile file: URL, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> URLSessionUploadTask {
defer { Self.fileUploads += 1 }
return sesh.uploadTask(with: request, fromFile: file, completionHandler: completionHandler)
}

func uploadTask(with request: URLRequest, from bodyData: Data?, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> URLSessionUploadTask {
defer { Self.dataUploads += 1 }
return sesh.uploadTask(with: request, from: bodyData, completionHandler: completionHandler)
}

func dataTask(with request: URLRequest, completionHandler: @escaping @Sendable (Data?, URLResponse?, (any Error)?) -> Void) -> URLSessionDataTask {
defer { Self.dataTasks += 1 }
return sesh.dataTask(with: request, completionHandler: completionHandler)
}

func finishTasksAndInvalidate() {
defer { Self.invalidated += 1 }
sesh.finishTasksAndInvalidate()
}
}



class BlockNetworkCalls: URLProtocol {
var initialURL: URL? = nil
override class func canInit(with request: URLRequest) -> Bool {
Expand Down

0 comments on commit c4f8f12

Please sign in to comment.