Skip to content

Commit

Permalink
Merge #148
Browse files Browse the repository at this point in the history
148: feat: Implement Tweets:Volume streams endpoint r=myConsciousness a=robertodoering

# 1. Description

<!-- Provide a description of what this PR is doing.
If you're modifying existing behavior, describe the existing behavior, how this PR is changing it,
and what motivated the change. If this is a breaking change, specify explicitly which APIs have been
changed. -->

This PR adds support for streamed responses and implements the [Volume streams](https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/api-reference/get-tweets-sample-stream) endpoint.

To handle the streamed responses, we can use the [send method](https://pub.dev/documentation/http/latest/http/BaseRequest/send.html) on http requests.
Since they return a [StreamedResponse](https://pub.dev/documentation/http/latest/http/StreamedResponse-class.html) (rather than a [Response](https://pub.dev/documentation/http/latest/http/Response-class.html)), some changed were done to the `TwitterException` to allow also handling exceptions for streamed responses.

The endpoint methods then just return a `Future<Stream<T>>`, which the user can handle like any other stream.

E.g.:
```dart
final response = await tweetsService.volumeStreams();

await for (final tweet in response) {
  print('received tweet: $tweet');
}
```
```dart
final response = await tweetsService.volumeStreams();

response.listen(
  (tweet) => print('received tweet: $tweet'),
  onError: (e) => print('received error: $e'),
  cancelOnError: false,
);
```

## 1.1. Checklist

<!-- Before you create this PR confirm that it meets all requirements listed below by checking the
relevant checkboxes (`[x]`). This will ensure a smooth and quick review process. -->

- [x] The title of my PR starts with a [Conventional Commit] prefix (`fix:`, `feat:`, `docs:` etc).
- [x] I have read the [Contributor Guide] and followed the process outlined for submitting PRs.
- [x] I have updated/added tests for ALL new/updated/fixed functionality.
- [x] I have updated/added relevant documentation in `docs` and added dartdoc comments with `///`.
- [x] I have updated/added relevant examples in `examples`.

## 1.2. Breaking Change

<!-- Does your PR require users to manually update their apps to accommodate your change?

If the PR is a breaking change this should be indicated with suffix "!"  (for example, `feat!:`, `fix!:`). See [Conventional Commit] for details.
-->

- [ ] Yes, this is a breaking change.
- [x] No, this is _not_ a breaking change.

## 1.3. Related Issues

<!-- Provide a list of issues related to this PR from the [issue database].
Indicate which of these issues are resolved or fixed by this PR, i.e. Fixes #xxxx* !-->

Closes #14

<!-- Links -->

[issue database]: https://github.com/twitter-dart/twitter-api-v2/issues
[contributor guide]: https://github.com/twitter-dart/twitter-api-v2/blob/main/CONTRIBUTING.md
[style guide]: https://github.com/twitter-dart/twitter-api-v2/blob/main/STYLEGUIDE.md
[conventional commit]: https://conventionalcommits.org


Co-authored-by: Roberto Doering <rbydoering@gmail.com>
Co-authored-by: Kato Shinya / 加藤 真也 <kato.shinya.dev@gmail.com>
  • Loading branch information
3 people committed May 23, 2022
2 parents 80defc5 + a01fff9 commit 9876daf
Show file tree
Hide file tree
Showing 14 changed files with 777 additions and 26 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
- With the addition of the `includes` field to `TwitterResponse`, the following objects have been added. ([#144](https://github.com/twitter-dart/twitter-api-v2/issues/144))
- `Media`
- `Poll`
- Added **Volume streams** endpoint. ([#14](https://github.com/twitter-dart/twitter-api-v2/issues/14))
- GET /2/tweets/sample/stream

## v2.0.0

Expand Down
8 changes: 7 additions & 1 deletion example/example.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ void main() async {
);

print(response);

final stream = await twitter.tweetsService.connectVolumeStreams();

await for (final tweet in stream.handleError(print)) {
print(tweet);
}
} on v2.TwitterException catch (e) {
print(e.response.headers);
print(e.response.body);
print(e.body);
print(e);
}
}
8 changes: 7 additions & 1 deletion lib/src/client/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ abstract class Client {
Duration timeout = const Duration(seconds: 10),
});

http.Response checkResponse(final http.Response response) {
Future<http.StreamedResponse> send(
http.BaseRequest request, {
Map<String, String> headers = const {},
Duration timeout = const Duration(seconds: 10),
});

T checkResponse<T extends http.BaseResponse>(final T response) {
final statusCode = response.statusCode;
if (200 <= statusCode && statusCode <= 299) {
return response;
Expand Down
27 changes: 27 additions & 0 deletions lib/src/client/client_context.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ abstract class ClientContext {
Duration timeout = const Duration(seconds: 10),
});

Future<http.StreamedResponse> send(
UserContext userContext,
http.BaseRequest request, {
Duration timeout = const Duration(seconds: 10),
});

/// Returns true if this context has an OAuth 1.0a client, otherwise false.
bool get hasOAuth1Client;
}
Expand Down Expand Up @@ -158,6 +164,27 @@ class _ClientContext implements ClientContext {
);
}

@override
Future<http.StreamedResponse> send(
UserContext userContext,
http.BaseRequest request, {
Duration timeout = const Duration(seconds: 10),
}) async {
if (userContext == UserContext.oauth2OrOAuth1 && hasOAuth1Client) {
//! If an authentication token is set, the OAuth 1.0a method is given
//! priority.
return _oauth1Client!.send(
request,
timeout: timeout,
);
}

return _oauth2Client.send(
request,
timeout: timeout,
);
}

@override
bool get hasOAuth1Client => _oauth1Client != null;
}
13 changes: 13 additions & 0 deletions lib/src/client/oauth1_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,17 @@ class OAuth1Client extends Client {
)
.timeout(timeout),
);

@override
Future<http.StreamedResponse> send(
http.BaseRequest request, {
Map<String, String> headers = const {},
Duration timeout = const Duration(seconds: 10),
}) async {
request.headers.addAll(headers);

return checkResponse(
await oauthClient.send(request).timeout(timeout),
);
}
}
15 changes: 15 additions & 0 deletions lib/src/client/oauth2_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,19 @@ class OAuth2Client extends Client {
)
.timeout(timeout),
);

@override
Future<http.StreamedResponse> send(
http.BaseRequest request, {
Map<String, String> headers = const {},
Duration timeout = const Duration(seconds: 10),
}) async {
request.headers.addAll(
{'Authorization': 'Bearer $_bearerToken', ...headers},
);

return checkResponse(
await request.send().timeout(timeout),
);
}
}
42 changes: 42 additions & 0 deletions lib/src/service/base_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,31 @@ abstract class BaseService implements Service {
return response;
}

Future<Stream<Map<String, dynamic>>> send(
final UserContext userContext,
final String method,
final String unencodedPath, {
Map<String, dynamic> queryParameters = const {},
}) async {
final streamedResponse = await _context.send(
userContext,
http.Request(
method,
Uri.https(
_authority,
unencodedPath,
Map.from(_removeNullParameters(queryParameters) ?? {}).map(
(key, value) => MapEntry(key, value.toString()),
),
),
),
);

return streamedResponse.stream
.transform(utf8.decoder)
.map((event) => _checkStreamedResponse(streamedResponse, event));
}

dynamic _removeNullParameters(final dynamic object) {
if (object is! Map) {
return object;
Expand Down Expand Up @@ -203,6 +228,23 @@ abstract class BaseService implements Service {
return jsonBody;
}

Map<String, dynamic> _checkStreamedResponse(
final http.StreamedResponse response,
final String event,
) {
final body = jsonDecode(event);

if (!body.containsKey('data')) {
throw TwitterException(
'No response data exists for the request.',
response,
event,
);
}

return body;
}

@override
String? formatExpansions(List<Expansion>? expansions) =>
expansions?.toSet().map((value) => value.fieldName).toList().join(',');
Expand Down
77 changes: 77 additions & 0 deletions lib/src/service/tweets/tweets_service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,63 @@ abstract class TweetsService {
String? paginationToken,
List<TweetExpansion>? expansions,
});

/// Streams about 1% of all Tweets in real-time.
///
/// If you have [Academic Research access](https://developer.twitter.com/en/products/twitter-api/academic-research),
/// you can connect up to two [redundant connections](https://developer.twitter.com/en/docs/twitter-api/tweets/sampled-stream/integrate/recovery-and-redundancy-features)
/// to maximize your streaming up-time.
///
/// ## Parameters
///
/// - [backfillMinutes]: By passing this parameter, you can request up to five
/// (5) minutes worth of streaming data that you might
/// have missed during a disconnection to be delivered to
/// you upon reconnection. The backfilled Tweets will
/// automatically flow through the reconnected stream,
/// with older Tweets generally being delivered before
/// any newly matching Tweets. You must include a whole
/// number between 1 and 5 as the value to this
/// parameter.
///
/// This feature will deliver duplicate Tweets, meaning
/// that if you were disconnected for 90 seconds, and you
/// requested two minutes of backfill, you will receive
/// 30 seconds worth of duplicate Tweets. Due to this,
/// you should make sure your system is tolerant of
/// duplicate data.
///
/// This feature is currently only available to those
/// that have been approved for Academic Research access.
/// To learn more about this access level, please visit
/// our section on [Academic Research](https://developer.twitter.com/en/products/twitter-api/academic-research).
///
/// - [expansions]: Expansions enable you to request additional data objects
/// that relate to the originally returned Tweets. Submit a
/// list of desired expansions in a comma-separated list
/// without spaces. The ID that represents the expanded data
/// object will be included directly in the Tweet data object,
/// but the expanded object metadata will be returned within
/// the includes response object, and will also include the
/// ID so that you can match this data object to the original
/// Tweet object.
///
/// ## Endpoint Url
///
/// - https://api.twitter.com/2/tweets/sample/stream
///
/// ## Rate Limits
///
/// - **App rate limit (OAuth 2.0 App Access Token)**:
/// 50 requests per 15-minute window shared among all users of your app
///
/// ## Reference
///
/// - https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/api-reference/get-tweets-sample-stream
Future<Stream<TweetData>> connectVolumeStreams({
int? backfillMinutes,
List<TweetExpansion>? expansions,
});
}

class _TweetsService extends BaseService implements TweetsService {
Expand Down Expand Up @@ -1327,4 +1384,24 @@ class _TweetsService extends BaseService implements TweetsService {
dataBuilder: TweetData.fromJson,
metaBuilder: TweetMeta.fromJson,
);

@override
Future<Stream<TweetData>> connectVolumeStreams({
int? backfillMinutes,
List<TweetExpansion>? expansions,
}) async {
final stream = await super.send(
UserContext.oauth2Only,
'GET',
'/2/tweets/sample/stream',
queryParameters: {
'backfill_minutes': backfillMinutes,
'expansions': super.formatExpansions(expansions),
},
);

return stream.map(
(event) => TweetData.fromJson(event['data']),
);
}
}
52 changes: 33 additions & 19 deletions lib/src/twitter_exception.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,44 @@ import 'package:http/http.dart';
/// with the Twitter API.
class TwitterException implements Exception {
/// Returns the new instance of [TwitterException].
TwitterException(this.message, this.response);
TwitterException(this.message, this.response, [String? body])
: body = response is Response ? response.body : body;

/// The error message
final String message;

/// The response from the Twitter API.
final Response response;
final BaseResponse response;

@override
String toString() => '''TwitterException: $message
✅ Status Code:
${response.statusCode}
✅ Request:
${response.request}
/// The body of the response.
///
/// Equal to [Response.body] if [response] is a [Response]. Otherwise set to
/// the event of a [StreamedResponse] that caused this exception (if
/// available).
final String? body;

✅ Headers:
${response.headers}
✅ Body:
${response.body}
Please create an Issue if you have a question or suggestion for this exception.
https://github.com/twitter-dart/twitter-api-v2/issues
''';
@override
String toString() {
final buffer = StringBuffer()
..writeln('TwitterException: $message\n')
..writeln(' ✅ Status Code:')
..writeln(' ${response.statusCode}\n')
..writeln(' ✅ Request:')
..writeln(' ${response.request}\n')
..writeln(' ✅ Headers:')
..writeln(' ${response.headers}\n');

if (body != null) {
buffer
..writeln(' ✅ Body:')
..writeln(' $body\n');
}

buffer
..writeln(' Please create an Issue if you have a question '
'or suggestion for this exception.')
..writeln(' https://github.com/twitter-dart/twitter-api-v2/issues');

return buffer.toString();
}
}
38 changes: 38 additions & 0 deletions test/mocks/client_context_stubs.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// modification, are permitted provided the conditions.

// Dart imports:
import 'dart:convert';
import 'dart:io';

// Package imports:
Expand Down Expand Up @@ -114,3 +115,40 @@ MockClientContext buildPutStub(

return mockClientContext;
}

MockClientContext buildSendStub(
final UserContext userContext,
final String resourcePath, [
final Map<String, String>? queryParameters,
]) {
final mockClientContext = MockClientContext();

Stream<List<int>> responseStream() async* {
final converter = JsonUtf8Encoder();
final responsesString = await File(resourcePath).readAsString();
final responses = jsonDecode(responsesString) as List<dynamic>;

for (final response in responses) {
yield converter.convert(response);
}
}

when(mockClientContext.send(
userContext,
any,
timeout: anyNamed('timeout'),
)).thenAnswer(
(_) async {
return http.StreamedResponse(
responseStream(),
200,
headers: {'content-type': 'application/json; charset=utf-8'},
);
},
);

//! Override if you want to test with OAuth 1.0a.
when(mockClientContext.hasOAuth1Client).thenReturn(false);

return mockClientContext;
}
Loading

0 comments on commit 9876daf

Please sign in to comment.