diff --git a/txtwitter/tests/fake_twitter.py b/txtwitter/tests/fake_twitter.py index 866ec80..0d8a103 100644 --- a/txtwitter/tests/fake_twitter.py +++ b/txtwitter/tests/fake_twitter.py @@ -16,13 +16,65 @@ USER_MENTION_RE = re.compile(r'@[a-zA-Z0-9_]+') +def now(): + return datetime.utcnow() + + +def mention_from_match(twitter_data, match): + user = twitter_data.get_user_by_screen_name(match.group(0)[1:]) + + if user is None: + return None + + return { + 'id_str': user.id_str, + 'id': int(user.id_str), + 'indices': list(match.span(0)), + 'screen_name': user.screen_name, + 'name': user.name, + } + + +def extract_user_mentions(twitter_data, text): + mentions = [] + for match in USER_MENTION_RE.finditer(text): + mention = mention_from_match(twitter_data, match) + if mention is not None: + mentions.append(mention) + return mentions + + +def extract_entities(twitter_data, text): + return { + 'user_mentions': extract_user_mentions(twitter_data, text), + # TODO: More entities + } + + +class FakeStream(object): + def __init__(self): + self.resp = FakeResponse(None) + self._message_types = {} + + def add_message_type(self, message_type, predicate): + self._message_types[message_type] = predicate + + def accepts(self, message_type, data): + predicate = self._message_types.get(message_type) + return predicate is not None and predicate(data) + + def deliver(self, data): + self.resp.deliver_data(json.dumps(data)) + self.resp.deliver_data('\r\n') + + class FakeTweet(object): def __init__(self, id_str, text, user_id_str, reply_to=None, **kw): self.id_str = id_str self.text = text self.user_id_str = user_id_str self.reply_to = reply_to - self.created_at = kw.pop('created_at', datetime.utcnow()) + self.created_at = kw.pop('created_at', now()) self.kw = kw def __cmp__(self, other): @@ -57,39 +109,11 @@ def details(id_str, screen_name): match = USER_MENTION_RE.match(self.text) if match is not None: - mention = self._mention_from_match(twitter_data, match) + mention = mention_from_match(twitter_data, match) return details(mention['id_str'], mention['screen_name']) return {} - def _get_entities(self, twitter_data): - return { - 'user_mentions': self._get_user_mentions(twitter_data), - # TODO: More entities - } - - def _mention_from_match(self, twitter_data, match): - user = twitter_data.get_user_by_screen_name(match.group(0)[1:]) - - if user is None: - return None - - return { - 'id_str': user.id_str, - 'id': int(user.id_str), - 'indices': list(match.span(0)), - 'screen_name': user.screen_name, - 'name': user.name, - } - - def _get_user_mentions(self, twitter_data): - mentions = [] - for match in USER_MENTION_RE.finditer(self.text): - mention = self._mention_from_match(twitter_data, match) - if mention is not None: - mentions.append(mention) - return mentions - def to_dict(self, twitter_data, trim_user=None, include_my_retweet=None, include_entities=None, contributor_details=None): if trim_user is None: @@ -129,7 +153,7 @@ def to_dict(self, twitter_data, trim_user=None, include_my_retweet=None, tweet_dict['user'] = user.to_dict(twitter_data) tweet_dict.update(self._get_reply_to_status_details(twitter_data)) tweet_dict.update(self._get_reply_to_user_details(twitter_data)) - tweet_dict['entities'] = self._get_entities(twitter_data) + tweet_dict['entities'] = extract_entities(twitter_data, self.text) # Provided keyword args can override any of the above tweet_dict.update(self.kw) @@ -140,12 +164,69 @@ def to_dict(self, twitter_data, trim_user=None, include_my_retweet=None, return tweet_dict +class FakeDM(object): + def __init__(self, id_str, text, sender_id_str, recipient_id_str, **kw): + self.id_str = id_str + self.text = text + self.sender_id_str = sender_id_str + self.recipient_id_str = recipient_id_str + self.created_at = kw.pop('created_at', now()) + self.kw = kw + + def __cmp__(self, other): + return cmp(int(self.id_str), int(other.id_str)) + + def _get_sender_details(self, twitter_data): + sender = twitter_data.get_user(self.sender_id_str) + return { + 'sender': sender.to_dict(twitter_data), + 'sender_id': int(self.sender_id_str), + 'sender_id_str': self.sender_id_str, + 'sender_screen_name': sender.screen_name + } + + def _get_recipient_details(self, twitter_data): + recipient = twitter_data.get_user(self.recipient_id_str) + return { + 'recipient': recipient.to_dict(twitter_data), + 'recipient_id': int(self.recipient_id_str), + 'recipient_id_str': self.recipient_id_str, + 'recipient_screen_name': recipient.screen_name + } + + def to_dict(self, twitter_data, skip_status=None, + include_entities=None, **kw): + if include_entities is None: + include_entities = True + if skip_status is not None: + raise NotImplementedError("skip_status param") + + dm_dict = { + 'id': int(self.id_str), + 'id_str': self.id_str, + 'created_at': str(self.created_at), + 'text': self.text, + } + + dm_dict.update(self._get_sender_details(twitter_data)) + dm_dict.update(self._get_recipient_details(twitter_data)) + dm_dict['entities'] = extract_entities(twitter_data, self.text) + + # Provided keyword args can override any of the above + dm_dict.update(self.kw) + + if not include_entities: + dm_dict.pop('entities') + + return dm_dict + + class FakeUser(object): def __init__(self, id_str, screen_name, name, **kw): self.id_str = id_str self.screen_name = screen_name self.name = name - self.created_at = kw.pop('created_at', datetime.utcnow()) + self.created_at = kw.pop('created_at', now()) self.kw = kw def to_dict(self, twitter_data): @@ -169,8 +250,10 @@ def to_dict(self, twitter_data): class FakeTwitterData(object): def __init__(self): self.users = {} + self.dms = {} self.tweets = {} - self.tweet_streams = {} + self.streams = {} + self._next_dm_id = 1000 self._next_tweet_id = 1000 self._next_user_id = 1000 @@ -178,31 +261,58 @@ def __init__(self): def next_tweet_id(self): return str(self._next_tweet_id) + @property + def next_dm_id(self): + return str(self._next_dm_id) + @property def next_user_id(self): return str(self._next_user_id) - def add_tweet_stream(self, resp, predicate): - self.tweet_streams[resp] = predicate + def broadcast_tweet(self, tweet): + for stream in self.streams.itervalues(): + if stream.accepts('tweet', tweet): + stream.deliver(tweet.to_dict(self)) + + def broadcast_dm(self, dm): + for stream in self.streams.itervalues(): + if stream.accepts('dm', dm): + stream.deliver(dm.to_dict(self)) + + def add_stream(self): + stream = FakeStream() + + def finished_callback(r): + self.remove_stream(stream.resp) + + stream.resp.finished_callback = finished_callback + self.streams[stream.resp] = stream + return stream - def remove_tweet_stream(self, resp): - self.tweet_streams.pop(resp, None) + def remove_stream(self, resp): + self.streams.pop(resp, None) def get_tweet(self, id_str): return self.tweets.get(id_str) + def get_dm(self, id_str): + return self.dms.get(id_str) + def get_user(self, id_str): return self.users.get(id_str) def add_tweet(self, *args, **kw): tweet = FakeTweet(*args, **kw) self.tweets[tweet.id_str] = tweet - for resp, predicate in self.tweet_streams.iteritems(): - if predicate(tweet): - resp.deliver_data(json.dumps(tweet.to_dict(self))) - resp.deliver_data('\r\n') + self.broadcast_tweet(tweet) return tweet + def add_dm(self, *args, **kw): + dm = FakeDM(*args, **kw) + self.dms[dm.id_str] = dm + self.broadcast_dm(dm) + return dm + def add_user(self, *args, **kw): user = FakeUser(*args, **kw) self.users[user.id_str] = user @@ -211,6 +321,9 @@ def add_user(self, *args, **kw): def del_tweet(self, id_str): self.tweets.pop(id_str) + def del_dm(self, id_str): + self.dms.pop(id_str) + def del_user(self, id_str): self.users.pop(id_str) @@ -220,6 +333,14 @@ def new_tweet(self, text, user_id_str, *args, **kw): self._next_tweet_id += 10 return tweet + def new_dm(self, text, sender_id_str, recipient_id_str, *args, **kw): + dm = self.add_dm( + self.next_dm_id, text, sender_id_str, recipient_id_str, + *args, **kw) + + self._next_dm_id += 10 + return dm + def new_user(self, screen_name, name, *args, **kw): user = self.add_user(self.next_user_id, screen_name, name, *args, **kw) self._next_user_id += 10 @@ -462,16 +583,6 @@ def statuses_retweet(self, id, trim_user=None): # Streaming - def _make_tweet_stream(self, predicate): - resp = FakeResponse(None) - - def finished_callback(r): - self._twitter_data.remove_tweet_stream(resp) - - self._twitter_data.add_tweet_stream(resp, predicate) - resp.finished_callback = finished_callback - return resp - @fake_api('statuses/filter.json', 'stream') def stream_filter(self, follow=None, track=None, locations=None, stall_warnings=None): @@ -489,7 +600,9 @@ def stream_filter_predicate(tweet): return True return False - return self._make_tweet_stream(stream_filter_predicate) + stream = self._twitter_data.add_stream() + stream.add_message_type('tweet', stream_filter_predicate) + return stream.resp # TODO: Implement stream_sample() # TODO: Implement stream_firehose() @@ -505,7 +618,7 @@ def userstream_user(self, stringify_friend_ids, stall_warnings=None, if with_ != 'user': raise NotImplementedError("with != followings") - def userstream_predicate(tweet): + def userstream_tweet_predicate(tweet): if tweet.user_id_str == self._user_id_str: return True if mention_re.search(tweet.text): @@ -514,18 +627,117 @@ def userstream_predicate(tweet): pass return False - resp = self._make_tweet_stream(userstream_predicate) + def userstream_dm_predicate(dm): + if dm.recipient_id_str == self._user_id_str: + return True + if dm.sender_id_str == self._user_id_str: + return True + return False + + stream = self._twitter_data.add_stream() + stream.add_message_type('tweet', userstream_tweet_predicate) + stream.add_message_type('dm', userstream_dm_predicate) + # TODO: Proper friends. - resp.deliver_data(json.dumps({'friends_str': []}) + '\r\n') - return resp + stream.deliver({'friends_str': []}) + + return stream.resp # Direct Messages + def _clamp_dms(self, dms, since_id=None, max_id=None, count=None): + if since_id is not None: + since_id = int(since_id) + dms = [dm for dm in dms if int(dm.id_str) > since_id] + if max_id is not None: + max_id = int(max_id) + dms = [dm for dm in dms if int(dm.id_str) <= max_id] + + dms = sorted(dms, reverse=True) + return dms if count is None else dms[:count] + + @fake_api('direct_messages.json') + def direct_messages(self, since_id=None, max_id=None, count=None, + include_entities=None, skip_status=None): + dms = self._twitter_data.dms.values() + dms = [dm for dm in dms if dm.recipient_id_str == self._user_id_str] + + count = 20 if count is None else min(count, 20) + dms = self._clamp_dms(dms, since_id, max_id, count) + + return self._twitter_data.to_dicts( + *dms, include_entities=include_entities, skip_status=skip_status) + + @fake_api('direct_messages/sent.json') + def direct_messages_sent(self, since_id=None, max_id=None, count=None, + include_entities=None, page=None): + dms = self._twitter_data.dms.values() + dms = [dm for dm in dms if dm.sender_id_str == self._user_id_str] + + count = 200 if count is None else min(count, 200) + dms = self._clamp_dms(dms, since_id, max_id, count) + + if page is None: + page = 1 + + page_end = page * 20 + dms = dms[page_end - 20:page_end] + + return self._twitter_data.to_dicts( + *dms, include_entities=include_entities) + + @fake_api('direct_messages/show.json') + def direct_messages_show(self, id): + dm = self._twitter_data.dms.get(id) + + if dm is None: + self._404() + + if (dm.recipient_id_str != self._user_id_str and + dm.sender_id_str != self._user_id_str): + raise TwitterAPIError(403, "Forbidden", json.dumps({ + "errors": [{ + "message": "There was an error sending your message: .", + "code": 151 + }]})) + + return self._twitter_data.to_dicts(dm) + + @fake_api('direct_messages/destroy.json') + def direct_messages_destroy(self, id, include_entities=None): + dm = self._twitter_data.dms.get(id) + + if dm is None: + self._404() + + if (dm.recipient_id_str != self._user_id_str and + dm.sender_id_str != self._user_id_str): + raise TwitterAPIError(403, "Forbidden", json.dumps({ + "errors": [{ + "message": "There was an error sending your message: .", + "code": 151 + }]})) + + del self._twitter_data.dms[dm.id_str] + return dm.to_dict( + self._twitter_data, include_entities=include_entities) + + @fake_api('direct_messages/new.json') + def direct_messages_new(self, text, user_id=None, screen_name=None): + if user_id is None and screen_name is None: + raise TwitterAPIError(400, "Bad Request", json.dumps({ + "errors": [{ + "message": ( + "Recipient (user, screen name, or id) " + "parameter is missing."), + "code": 38 + }]})) + + if user_id is None: + user = self._twitter_data.get_user_by_screen_name(screen_name) + user_id = user.id_str - # TODO: Implement direct_messages() - # TODO: Implement direct_messages_sent() - # TODO: Implement direct_messages_show() - # TODO: Implement direct_messages_destroy() - # TODO: Implement direct_messages_new() + dm = self._twitter_data.new_dm(text, self._user_id_str, user_id) + return dm.to_dict(self._twitter_data) # Friends & Followers diff --git a/txtwitter/tests/test_fake_twitter.py b/txtwitter/tests/test_fake_twitter.py index e983052..6a6417c 100644 --- a/txtwitter/tests/test_fake_twitter.py +++ b/txtwitter/tests/test_fake_twitter.py @@ -1,80 +1,54 @@ import json +from datetime import datetime from urllib import urlencode from twisted.protocols.basic import LineOnlyReceiver from twisted.trial.unittest import TestCase +from txtwitter.error import TwitterAPIError +from txtwitter.tests import fake_twitter + def from_fake_twitter(name): @property def prop(self): - from txtwitter.tests import fake_twitter return getattr(fake_twitter, name) return prop -class TestFakeTweet(TestCase): - _FakeTwitterData = from_fake_twitter('FakeTwitterData') - _FakeTweet = from_fake_twitter('FakeTweet') +class TestFakeStream(TestCase): + _FakeStream = from_fake_twitter('FakeStream') - def test__get_user_mentions_none(self): - twitter = self._FakeTwitterData() - tweet = self._FakeTweet('1', 'hello', '1') - self.assertEqual([], tweet._get_user_mentions(twitter)) + def test_accepts(self): + stream = self._FakeStream() + stream.add_message_type('foo', lambda data: data['bar'] == 'baz') + self.assertTrue(stream.accepts('foo', {'bar': 'baz'})) + self.assertFalse(stream.accepts('foo', {'bar': 'qux'})) + self.assertFalse(stream.accepts('corge', {'grault': 'garply'})) - def test__get_user_mentions_not_user(self): - twitter = self._FakeTwitterData() - tweet = self._FakeTweet('1', 'hello @notuser', '1') - self.assertEqual([], tweet._get_user_mentions(twitter)) + def _process_stream_response(self, resp, delegate): + protocol = FakeTwitterStreamProtocol(delegate) + resp.deliverBody(protocol) - def test__get_user_mentions_one_user(self): - twitter = self._FakeTwitterData() - twitter.add_user('1', 'fakeuser', 'Fake User') - tweet = self._FakeTweet('1', 'hello @fakeuser', '1') - self.assertEqual(tweet._get_user_mentions(twitter), [{ - 'id_str': '1', - 'id': 1, - 'indices': [6, 15], - 'screen_name': 'fakeuser', - 'name': 'Fake User', - }]) + def test_deliver(self): + stream = self._FakeStream() - def test__get_user_mentions_two_users(self): - twitter = self._FakeTwitterData() - twitter.add_user('1', 'fakeuser', 'Fake User') - twitter.add_user('2', 'fakeuser2', 'Fake User') - tweet = self._FakeTweet('1', 'hello @fakeuser @fakeuser2', '1') - self.assertEqual(tweet._get_user_mentions(twitter), [{ - 'id_str': '1', - 'id': 1, - 'indices': [6, 15], - 'screen_name': 'fakeuser', - 'name': 'Fake User', - }, { - 'id_str': '2', - 'id': 2, - 'indices': [16, 26], - 'screen_name': 'fakeuser2', - 'name': 'Fake User', - }]) - - def test__get_user_mentions_one_user_twice(self): - twitter = self._FakeTwitterData() - twitter.add_user('1', 'fakeuser', 'Fake User') - tweet = self._FakeTweet('1', 'hello @fakeuser @fakeuser', '1') - self.assertEqual(tweet._get_user_mentions(twitter), [{ - 'id_str': '1', - 'id': 1, - 'indices': [6, 15], - 'screen_name': 'fakeuser', - 'name': 'Fake User', - }, { - 'id_str': '1', - 'id': 1, - 'indices': [16, 25], - 'screen_name': 'fakeuser', - 'name': 'Fake User', - }]) + messages = [] + protocol = FakeTwitterStreamProtocol(messages.append) + stream.resp.deliverBody(protocol) + + stream.deliver({'foo': 'bar'}) + stream.deliver({'baz': 'qux'}) + + self.assertEqual(messages, [ + {'foo': 'bar'}, + {'baz': 'qux'} + ]) + + +class TestFakeTweet(TestCase): + _FakeTwitterData = from_fake_twitter('FakeTwitterData') + _FakeTweet = from_fake_twitter('FakeTweet') def test__get_reply_to_status_details_nonreply(self): twitter = self._FakeTwitterData() @@ -129,6 +103,103 @@ def test__get_reply_to_user_details_reply(self): }) +class TestFakeDM(TestCase): + _FakeTwitterData = from_fake_twitter('FakeTwitterData') + _FakeDM = from_fake_twitter('FakeDM') + _now = datetime(2014, 3, 11, 10, 48, 22, 687699) + + def setUp(self): + self.patch(fake_twitter, 'now', lambda: self._now) + + def test__get_sender_details(self): + twitter = self._FakeTwitterData() + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User 2') + dm = twitter.add_dm('1', 'hello', '1', '2') + + self.assertEqual(dm._get_sender_details(twitter), { + 'sender': { + 'created_at': '2014-03-11 10:48:22.687699', + 'id': 1, + 'id_str': '1', + 'name': 'Fake User', + 'screen_name': 'fakeuser' + }, + 'sender_id': 1, + 'sender_id_str': '1', + 'sender_screen_name': 'fakeuser' + }) + + def test__get_recipient_details(self): + twitter = self._FakeTwitterData() + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User 2') + dm = twitter.add_dm('1', 'hello', '1', '2') + + self.assertEqual(dm._get_recipient_details(twitter), { + 'recipient': { + 'created_at': '2014-03-11 10:48:22.687699', + 'id': 2, + 'id_str': '2', + 'name': 'Fake User 2', + 'screen_name': 'fakeuser2' + }, + 'recipient_id': 2, + 'recipient_id_str': '2', + 'recipient_screen_name': 'fakeuser2' + }) + + def test_to_dict(self): + twitter = self._FakeTwitterData() + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User 2') + dm = twitter.add_dm('1', 'hello @fakeuser2', '1', '2') + + self.assertEqual(dm.to_dict(twitter), { + 'created_at': '2014-03-11 10:48:22.687699', + 'id': 1, + 'id_str': '1', + 'text': 'hello @fakeuser2', + 'sender': { + 'created_at': '2014-03-11 10:48:22.687699', + 'id': 1, + 'id_str': '1', + 'name': 'Fake User', + 'screen_name': 'fakeuser' + }, + 'sender_id': 1, + 'sender_id_str': '1', + 'sender_screen_name': 'fakeuser', + 'recipient': { + 'created_at': '2014-03-11 10:48:22.687699', + 'id': 2, + 'id_str': '2', + 'name': 'Fake User 2', + 'screen_name': 'fakeuser2' + }, + 'recipient_id': 2, + 'recipient_id_str': '2', + 'recipient_screen_name': 'fakeuser2', + 'entities': { + 'user_mentions': [{ + 'id': 2, + 'id_str': '2', + 'indices': [6, 16], + 'name': 'Fake User 2', + 'screen_name': 'fakeuser2' + }] + }, + }) + + def test_to_dict_not_include_entities(self): + twitter = self._FakeTwitterData() + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User 2') + dm = twitter.add_dm('1', 'hello @fakeuser2', '1', '2') + dm_dict = dm.to_dict(twitter, include_entities=False) + self.assertTrue('entities' not in dm_dict) + + class TestFakeTwitter(TestCase): _FakeTwitter = from_fake_twitter('FakeTwitter') _FakeTwitterClient = from_fake_twitter('FakeTwitterClient') @@ -332,7 +403,7 @@ def test_stream_filter_track(self): self.assertEqual(messages, twitter.to_dicts(tweet1, tweet2)) resp.finished() - self.assertEqual(twitter.tweet_streams, {}) + self.assertEqual(twitter.streams, {}) # TODO: More tests for fake stream_filter() @@ -344,7 +415,7 @@ def test_dispatch_userstream_user(self): uri = self._build_uri(TWITTER_USERSTREAM_URL, 'user.json') self.assert_method_uri('userstream_user', uri) - def test_userstream_user_with_user(self): + def test_userstream_user_with_user_friends(self): twitter = self._FakeTwitterData() twitter.add_user('1', 'fakeuser', 'Fake User') twitter.add_user('2', 'fakeuser2', 'Fake User') @@ -354,18 +425,65 @@ def test_userstream_user_with_user(self): resp = api.userstream_user(stringify_friend_ids='true', with_='user') self._process_stream_response(resp, messages.append) self.assertEqual(messages, [{'friends_str': []}]) + + resp.finished() + self.assertEqual(twitter.streams, {}) + + def test_userstream_user_with_user_tweets(self): + twitter = self._FakeTwitterData() + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + api = self._FakeTwitterAPI(twitter, '1') + messages = [] + resp = api.userstream_user(stringify_friend_ids='true', with_='user') + self._process_stream_response(resp, messages.append) messages.pop(0) tweet1 = twitter.new_tweet('hello', '1') twitter.new_tweet('hello', '2') self.assertEqual(messages, twitter.to_dicts(tweet1)) + resp.finished() + self.assertEqual(twitter.streams, {}) + + def test_userstream_user_with_user_mentions(self): + twitter = self._FakeTwitterData() + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + api = self._FakeTwitterAPI(twitter, '1') + messages = [] + resp = api.userstream_user(stringify_friend_ids='true', with_='user') + self._process_stream_response(resp, messages.append) + messages.pop(0) + twitter.new_tweet('@fakeuser2', '2') tweet2 = twitter.new_tweet('@fakeuser', '2') - self.assertEqual(messages, twitter.to_dicts(tweet1, tweet2)) + self.assertEqual(messages, twitter.to_dicts(tweet2)) resp.finished() - self.assertEqual(twitter.tweet_streams, {}) + self.assertEqual(twitter.streams, {}) + + def test_userstream_user_with_user_dms(self): + twitter = self._FakeTwitterData() + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + twitter.add_user('3', 'fakeuser3', 'Fake User') + + api = self._FakeTwitterAPI(twitter, '1') + messages = [] + resp = api.userstream_user(stringify_friend_ids='true', with_='user') + self._process_stream_response(resp, messages.append) + messages.pop(0) + + dm1 = twitter.new_dm('hello', '1', '2') + dm2 = twitter.new_dm('hello', '2', '1') + twitter.new_dm('hello', '2', '3') + self.assertEqual(messages, twitter.to_dicts(dm1, dm2)) + + resp.finished() + self.assertEqual(twitter.streams, {}) # TODO: Replies @@ -373,11 +491,271 @@ def test_userstream_user_with_user(self): # Direct Messages - # TODO: Tests for fake direct_messages() - # TODO: Tests for fake direct_messages_sent() - # TODO: Tests for fake direct_messages_show() - # TODO: Tests for fake direct_messages_destroy() - # TODO: Tests for fake direct_messages_new() + def test_direct_messages(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + twitter.add_user('3', 'fakeuser3', 'Fake User') + + dm1 = twitter.new_dm('hello', '2', '1') + dm2 = twitter.new_dm('hello', '3', '1') + twitter.new_dm('hello', '1', '2') + twitter.new_dm('hello', '1', '3') + + self.assertEqual(api.direct_messages(), twitter.to_dicts(dm2, dm1)) + + def test_direct_messages_limiting(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + dms = [twitter.new_dm('hello', '2', '1') for i in range(21)] + + self.assertEqual( + api.direct_messages(), + twitter.to_dicts(*dms[::-1][:20])) + + def test_direct_messages_since_id(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + dm1 = twitter.new_dm('hello', '2', '1') + dm2 = twitter.new_dm('goodbye', '2', '1') + + self.assertEqual( + api.direct_messages(since_id=dm1.id_str), + twitter.to_dicts(dm2)) + + def test_direct_messages_max_id(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + dm1 = twitter.new_dm('hello', '2', '1') + dm2 = twitter.new_dm('goodbye', '2', '1') + twitter.new_dm('hello again', '2', '1') + + self.assertEqual( + api.direct_messages(max_id=dm2.id_str), + twitter.to_dicts(dm2, dm1)) + + def test_direct_messages_count(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + twitter.new_dm('hello', '2', '1') + dm2 = twitter.new_dm('goodbye', '2', '1') + dm3 = twitter.new_dm('hello again', '2', '1') + + self.assertEqual( + api.direct_messages(count=2), + twitter.to_dicts(dm3, dm2)) + + def test_direct_messages_include_entities(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + twitter.new_dm('hello', '2', '1') + twitter.new_dm('goodbye', '2', '1') + + dms = api.direct_messages(include_entities=False) + self.assertTrue(all('entities' not in dm for dm in dms)) + + def test_direct_messages_sent(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + twitter.add_user('3', 'fakeuser3', 'Fake User') + + dm1 = twitter.new_dm('hello', '1', '2') + dm2 = twitter.new_dm('hello', '1', '3') + twitter.new_dm('hello', '2', '1') + twitter.new_dm('hello', '3', '1') + + self.assertEqual( + api.direct_messages_sent(), + twitter.to_dicts(dm2, dm1)) + + def test_direct_messages_sent_since_id(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + dm1 = twitter.new_dm('hello', '1', '2') + dm2 = twitter.new_dm('goodbye', '1', '2') + + self.assertEqual( + api.direct_messages_sent(since_id=dm1.id_str), + twitter.to_dicts(dm2)) + + def test_direct_messages_sent_max_id(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + dm1 = twitter.new_dm('hello', '1', '2') + dm2 = twitter.new_dm('goodbye', '1', '2') + twitter.new_dm('hello again', '1', '2') + + self.assertEqual( + api.direct_messages_sent(max_id=dm2.id_str), + twitter.to_dicts(dm2, dm1)) + + def test_direct_messages_sent_count(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + twitter.new_dm('hello', '1', '2') + dm2 = twitter.new_dm('goodbye', '1', '2') + dm3 = twitter.new_dm('hello again', '1', '2') + + self.assertEqual( + api.direct_messages_sent(count=2), + twitter.to_dicts(dm3, dm2)) + + def test_direct_messages_sent_include_entities(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + twitter.new_dm('hello', '1', '2') + twitter.new_dm('goodbye', '1', '2') + + dms = api.direct_messages_sent(include_entities=False) + self.assertTrue(all('entities' not in dm for dm in dms)) + + def test_direct_messages_sent_page(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + dms = [twitter.new_dm('hello', '1', '2') for _ in range(80)] + + self.assertEqual( + api.direct_messages_sent(page=2), + twitter.to_dicts(*dms[::-1][20:40])) + + def test_direct_messages_show(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + dm = twitter.new_dm('hello', '1', '2') + + found_dms = api.direct_messages_show(dm.id_str) + self.assertEqual(twitter.to_dicts(dm), found_dms) + + def test_direct_messages_show_not_found(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + self.assertRaises(TwitterAPIError, api.direct_messages_show, '1') + + def test_direct_messages_show_forbidden(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + twitter.add_user('3', 'fakeuser3', 'Fake User 3') + dm = twitter.new_dm('hello', '2', '3') + + self.assertRaises(TwitterAPIError, api.direct_messages_show, dm.id_str) + + def test_direct_messages_destroy(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + dm = twitter.new_dm('hello', '1', '2') + + found_dm = api.direct_messages_destroy(dm.id_str) + self.assertEqual(dm.to_dict(twitter), found_dm) + self.assertTrue(dm.id_str not in twitter.dms) + + def test_direct_messages_destroy_not_found(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + self.assertRaises(TwitterAPIError, api.direct_messages_destroy, '1') + + def test_direct_messages_destroy_forbidden(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + twitter.add_user('3', 'fakeuser3', 'Fake User 3') + dm = twitter.new_dm('hello', '2', '3') + + self.assertRaises( + TwitterAPIError, api.direct_messages_destroy, dm.id_str) + + def test_direct_messages_destroy_not_include_entities(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + dm = twitter.new_dm('hello', '1', '2') + + found_dm = api.direct_messages_destroy( + dm.id_str, include_entities=False) + self.assertTrue('entities' not in found_dm) + + def test_direct_messages_new_by_user_id(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + sent_dm = api.direct_messages_new('hello', user_id='2') + actual_dm = twitter.get_dm(sent_dm['id_str']) + self.assertEqual(sent_dm, actual_dm.to_dict(twitter)) + + def test_direct_messages_new_by_screen_name(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + + twitter.add_user('1', 'fakeuser', 'Fake User') + twitter.add_user('2', 'fakeuser2', 'Fake User') + + sent_dm = api.direct_messages_new('hello', screen_name='fakeuser2') + actual_dm = twitter.get_dm(sent_dm['id_str']) + self.assertEqual(sent_dm, actual_dm.to_dict(twitter)) + + def test_direct_messages_new_no_user_id_or_screen_name(self): + twitter = self._FakeTwitterData() + api = self._FakeTwitterAPI(twitter, '1') + self.assertRaises(TwitterAPIError, api.direct_messages_new, 'hello') # Friends & Followers