Skip to content
This repository has been archived by the owner on Jun 30, 2023. It is now read-only.

Streaming Error error response #97

Merged
merged 2 commits into from
Mar 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,32 @@ public class TwitterAPISessionDelegatedStreamTask: TwitterAPISessionStreamTask,
func append(chunk: Data) {
taskQueue.async { [weak self] in
guard let self = self else { return }

guard let httpResponse = self.httpResponse else {
self.notify(result: .failure(.responseFailed(reason: .invalidResponse(error: nil))), rateLimit: nil)
return
}

let rateLimit = TwitterRateLimit(header: httpResponse.allHeaderFields)

guard httpResponse.statusCode < 300 else {

let error = TwitterAPIErrorResponse(data: chunk)
self.notify(
result: .failure(
.responseFailed(
reason: .unacceptableStatusCode(
statusCode: httpResponse.statusCode,
error: error,
rateLimit: rateLimit
)
)), rateLimit: rateLimit)

return
}

for data in chunk.split(separator: chunkSeparator) {
self.notify(result: .success(data))
self.notify(result: .success(data), rateLimit: rateLimit)
}
}
}
Expand All @@ -51,18 +75,18 @@ public class TwitterAPISessionDelegatedStreamTask: TwitterAPISessionStreamTask,
if let error = error {
taskQueue.async { [weak self] in
guard let self = self else { return }
self.notify(result: .failure(.responseFailed(reason: .invalidResponse(error: error))))
self.notify(result: .failure(.responseFailed(reason: .invalidResponse(error: error))), rateLimit: nil)
}
}
}

private func notify(result: Result<Data, TwitterAPIKitError>) {
private func notify(result: Result<Data, TwitterAPIKitError>, rateLimit: TwitterRateLimit?) {
let response = TwitterAPIResponse(
request: currentRequest,
response: httpResponse,
data: result.success,
result: result,
rateLimit: nil
rateLimit: rateLimit
)

dataBlocks.forEach { (queue, block) in
Expand Down
8 changes: 6 additions & 2 deletions Tests/TwitterAPIKitTests/Extensions/ConcurrencyTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ import XCTest
taskIdentifier: 1,
currentRequest: nil,
originalRequest: nil,
httpResponse: nil
httpResponse: .init(
url: URL(string: "http://example.com")!, statusCode: 200, httpVersion: "1.1", headerFields: [:]
)
)

let task = TwitterAPISessionDelegatedStreamTask(task: mockTask)
Expand Down Expand Up @@ -206,7 +208,9 @@ import XCTest
taskIdentifier: 1,
currentRequest: nil,
originalRequest: nil,
httpResponse: nil
httpResponse: .init(
url: URL(string: "http://example.com")!, statusCode: 200, httpVersion: "1.1", headerFields: [:]
)
)

let task = TwitterAPISessionDelegatedStreamTask(task: mockTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,22 @@ class TwitterAPISessionDelegatedStreamTaskTests: XCTestCase {
taskIdentifier: 1,
currentRequest: nil,
originalRequest: nil,
httpResponse: nil
httpResponse: HTTPURLResponse(
url: URL(string: "https://example.com")!,
statusCode: 200,
httpVersion: "1.1",
headerFields: [
"x-rate-limit-limit": "15",
"x-rate-limit-remaining": "1",
"x-rate-limit-reset": "1647099944",
]
)
)

let task = TwitterAPISessionDelegatedStreamTask(task: mockTask)

DispatchQueue.main.async {

task.append(chunk: Data("aaaa\r\nbbbb".utf8))
task.append(chunk: Data("🥓🥓\r\nあ".utf8))
task.complete(error: nil)
Expand All @@ -35,6 +45,11 @@ class TwitterAPISessionDelegatedStreamTaskTests: XCTestCase {

XCTAssertTrue(Thread.isMainThread)

XCTAssertNotNil(response.rateLimit)
XCTAssertEqual(response.rateLimit?.limit, 15)
XCTAssertEqual(response.rateLimit?.remaining, 1)
XCTAssertEqual(response.rateLimit?.reset, 1_647_099_944)

switch count {
case 0:
XCTAssertEqual(response.success.map { String(data: $0, encoding: .utf8) }, "aaaa")
Expand All @@ -58,4 +73,109 @@ class TwitterAPISessionDelegatedStreamTaskTests: XCTestCase {
wait(for: [exp], timeout: 100)
XCTAssertEqual(count, 4)
}

func testInvalidStatusCode() throws {
let mockTask = MockTwitterAPISessionTask(
taskIdentifier: 2,
currentRequest: nil,
originalRequest: nil,
httpResponse: HTTPURLResponse(
url: URL(string: "https://example.com")!,
statusCode: 400,
httpVersion: "1.1",
headerFields: [
"x-rate-limit-limit": "15",
"x-rate-limit-remaining": "1",
"x-rate-limit-reset": "1647099944",
]
)
)

let task = TwitterAPISessionDelegatedStreamTask(task: mockTask)

DispatchQueue.main.async {

task.append(
chunk: Data(
"{\"detail\":\"Authenticating with OAuth 1.0a User Context is forbidden for this endpoint. Supported authentication types are [OAuth 2.0 Application-Only].\",\"title\":\"Unsupported Authentication\",\"status\":403,\"type\":\"https://api.twitter.com/2/problems/unsupported-authentication\"}"
.utf8))
task.complete(error: nil)
}

let exp = expectation(description: "")
exp.expectedFulfillmentCount = 2

var count = 0
task.streamResponse { response in

XCTAssertTrue(Thread.isMainThread)

XCTAssertNotNil(response.rateLimit)
XCTAssertEqual(response.rateLimit?.limit, 15)
XCTAssertEqual(response.rateLimit?.remaining, 1)
XCTAssertEqual(response.rateLimit?.reset, 1_647_099_944)

switch count {
case 0:
XCTAssertTrue(response.error!.isResponseFailed)
default:
XCTFail()
}

count += 1
exp.fulfill()
}.streamResponse(queue: .global(qos: .default)) { response in
XCTAssertFalse(Thread.isMainThread)
XCTAssertTrue(response.error!.isResponseFailed)
exp.fulfill()
}

wait(for: [exp], timeout: 100)
XCTAssertEqual(count, 1)
}

func testError() throws {

let mockTask = MockTwitterAPISessionTask(
taskIdentifier: 1,
currentRequest: nil,
originalRequest: nil,
httpResponse: nil
)

let task = TwitterAPISessionDelegatedStreamTask(task: mockTask)

DispatchQueue.main.async {
task.complete(error: URLError(.notConnectedToInternet))
}

let exp = expectation(description: "")
exp.expectedFulfillmentCount = 2

var count = 0
task.streamResponse { response in

XCTAssertTrue(Thread.isMainThread)

XCTAssertNil(response.rateLimit)

switch count {
case 0:
XCTAssertTrue(response.isError)

default:
XCTFail()
}

count += 1
exp.fulfill()
}.streamResponse(queue: .global(qos: .default)) { response in
XCTAssertFalse(Thread.isMainThread)
XCTAssertTrue(response.isError)
exp.fulfill()
}

wait(for: [exp], timeout: 100)
XCTAssertEqual(count, 1)
}
}