-
Notifications
You must be signed in to change notification settings - Fork 17
/
MPWURLStreamingStream.m
134 lines (100 loc) · 3.23 KB
/
MPWURLStreamingStream.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//
// MPWURLStreamingStream.m
// MPWFoundation
//
// Created by Marcel Weiher on 9/14/17.
//
//
#import "MPWURLStreamingStream.h"
#import <MPWByteStream.h>
#import "MPWURLCall.h"
#import "MPWURLReference.h"
#import "MPWRESTOperation.h"
@interface MPWURLStreamingFetchHelper : MPWFilter <NSURLSessionDelegate>
@property (nonatomic, strong) NSMutableOrderedSet *inflight;
@end
@implementation MPWURLStreamingStream
-(instancetype)initWithBaseURL:(NSURL*)newBaseURL target:aTarget queue:(NSOperationQueue*)targetQueue
{
MPWURLStreamingFetchHelper *helper = [MPWURLStreamingFetchHelper streamWithTarget:aTarget];
NSURLSession *session=[NSURLSession sessionWithConfiguration:[self config]
delegate:helper
delegateQueue:targetQueue];
self.streamingDelegate=helper;
self=[super initWithBaseURL:newBaseURL target:aTarget session:session];
self.streamingDelegate.inflight = self.inflight;
return self;
}
-(instancetype)initWithBaseURL:(NSURL*)newBaseURL target:aTarget
{
return [self initWithBaseURL:newBaseURL target:aTarget queue:nil];
}
- (NSURLSessionTask*)taskForExecutingRequest:(MPWURLCall*)request
{
NSParameterAssert( [request isStreaming]);
return [[self downloader] dataTaskWithRequest: [self resolvedRequest:request]];
}
-(void)streamingGet:(NSURL *)theURL
{
MPWURLReference *ref=[MPWURLReference referenceWithURL:theURL];
MPWRESTOperation<MPWURLReference*>* op=[MPWRESTOperation operationWithReference:ref verb:MPWRESTVerbGET];
MPWURLCall *request=[[[MPWURLCall alloc] initWithRESTOperation:op] autorelease];
request.isStreaming=YES;
[self executeRequest:request];
}
-(void)setTarget:(id)newVar
{
[super setTarget:newVar];
[self.streamingDelegate setTarget:newVar];
}
-(void)dealloc
{
[_streamingDelegate release];
[super dealloc];
}
@end
@implementation MPWURLStreamingFetchHelper
- (void)URLSession:(NSURLSession *)session dataTask:(NSURLSessionDataTask *)dataTask
didReceiveData:(NSData *)data
{
FORWARD(data);
}
- (void)URLSession:(NSURLSession *)session task:(NSURLSessionTask *)task
didCompleteWithError:(nullable NSError *)error
{
@synchronized (self) {
// a streaming fetchstream can only have one in progress
// and we don't have accesss to the MPWCall here.
[self.inflight removeAllObjects];
}
if ( error ){
[self reportError:error];
}
[(MPWFilter*)self.target close];
}
-(void)dealloc
{
[_inflight release];
[super dealloc];
}
@end
#import "DebugMacros.h"
@implementation MPWURLStreamingStream(testing)
+(void)testCanHandleDataStreamingResponse
{
NSMutableString *testTarget=[NSMutableString string];
NSURL *testURL=[[NSBundle bundleForClass:self] URLForResource:@"ResourceTest" withExtension:nil];
MPWWriteStream *target=[MPWByteStream streamWithTarget:testTarget];
MPWURLStreamingStream* stream=[self streamWithTarget:target];
[stream streamingGet:testURL];
[stream awaitResultForSeconds:0.5];
IDEXPECT( testTarget, @"This is a simple resource",@"should have written");
}
+testSelectors
{
return
@[
@"testCanHandleDataStreamingResponse",
];
}
@end