-
Notifications
You must be signed in to change notification settings - Fork 1
/
bot.py
335 lines (300 loc) · 12.9 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
import os
from enum import Enum
import requests
from requests_toolbelt import MultipartEncoder
from pymessenger import utils
DEFAULT_API_VERSION = 2.6
class NotificationType(Enum):
regular = "REGULAR"
silent_push = "SILENT_PUSH"
no_push = "NO_PUSH"
class Bot:
def __init__(self, access_token, **kwargs):
"""
@required:
access_token
@optional:
api_version
app_secret
"""
self.api_version = kwargs.get('api_version') or DEFAULT_API_VERSION
self.app_secret = kwargs.get('app_secret')
self.graph_url = 'https://graph.facebook.com/v{0}'.format(self.api_version)
self.access_token = access_token
@property
def auth_args(self):
if not hasattr(self, '_auth_args'):
auth = {
'access_token': self.access_token
}
if self.app_secret is not None:
appsecret_proof = utils.generate_appsecret_proof(self.access_token, self.app_secret)
auth['appsecret_proof'] = appsecret_proof
self._auth_args = auth
return self._auth_args
def send_recipient(self, recipient_id, payload, notification_type=NotificationType.regular):
payload['recipient'] = {
'id': recipient_id
}
payload['notification_type'] = notification_type.value
return self.send_raw(payload)
def send_message(self, recipient_id, message, notification_type=NotificationType.regular):
return self.send_recipient(recipient_id, {
'message': message
}, notification_type)
def send_attachment(self, recipient_id, attachment_type, attachment_path,
notification_type=NotificationType.regular):
"""Send an attachment to the specified recipient using local path.
Input:
recipient_id: recipient id to send to
attachment_type: type of attachment (image, video, audio, file)
attachment_path: Path of attachment
Output:
Response from API as <dict>
"""
payload = {
'recipient': {
{
'id': recipient_id
}
},
'notification_type': notification_type,
'message': {
{
'attachment': {
'type': attachment_type,
'payload': {}
}
}
},
'filedata': (os.path.basename(attachment_path), open(attachment_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.graph_url, data=multipart_data,
params=self.auth_args, headers=multipart_header).json()
def send_attachment_url(self, recipient_id, attachment_type, attachment_url,
notification_type=NotificationType.regular):
"""Send an attachment to the specified recipient using URL.
Input:
recipient_id: recipient id to send to
attachment_type: type of attachment (image, video, audio, file)
attachment_url: URL of attachment
Output:
Response from API as <dict>
"""
return self.send_message(recipient_id, {
'attachment': {
'type': attachment_type,
'payload': {
'url': attachment_url
}
}
}, notification_type)
def send_text_message(self, recipient_id, message, notification_type=NotificationType.regular):
"""Send text messages to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/text-message
Input:
recipient_id: recipient id to send to
message: message to send
Output:
Response from API as <dict>
"""
return self.send_message(recipient_id, {
'text': message
}, notification_type)
def send_generic_message(self, recipient_id, elements, notification_type=NotificationType.regular):
"""Send generic messages to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/generic-template
Input:
recipient_id: recipient id to send to
elements: generic message elements to send
Output:
Response from API as <dict>
"""
return self.send_message(recipient_id, {
"attachment": {
"type": "template",
"payload": {
"template_type": "generic",
"elements": elements
}
}
}, notification_type)
def send_list_message(self, recipient_id, elements,button, notification_type=NotificationType.regular):
"""Send generic messages to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/generic-template
Input:
recipient_id: recipient id to send to
elements: generic message elements to send
Output:
Response from API as <dict>
"""
return self.send_message(recipient_id, {
"attachment": {
"type": "template",
"payload": {
"template_type": "list",
"top_element_style": "large",
"elements": elements,
"buttons":button
}
}
}, notification_type)
def send_button_message(self, recipient_id, text, buttons, notification_type=NotificationType.regular):
"""Send text messages to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/button-template
Input:
recipient_id: recipient id to send to
text: text of message to send
buttons: buttons to send
Output:
Response from API as <dict>
"""
return self.send_message(recipient_id, {
"attachment": {
"type": "template",
"payload": {
"template_type": "button",
"text": text,
"buttons": buttons
}
}
}, notification_type)
def send_action(self, recipient_id, action, notification_type=NotificationType.regular):
"""Send typing indicators or send read receipts to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/sender-actions
Input:
recipient_id: recipient id to send to
action: action type (mark_seen, typing_on, typing_off)
Output:
Response from API as <dict>
"""
return self.send_recipient(recipient_id, {
'sender_action': action
}, notification_type)
def send_image(self, recipient_id, image_path, notification_type=NotificationType.regular):
"""Send an image to the specified recipient.
Image must be PNG or JPEG or GIF (more might be supported).
https://developers.facebook.com/docs/messenger-platform/send-api-reference/image-attachment
Input:
recipient_id: recipient id to send to
image_path: path to image to be sent
Output:
Response from API as <dict>
"""
return self.send_attachment(recipient_id, "image", image_path, notification_type)
def send_image_url(self, recipient_id, image_url, notification_type=NotificationType.regular):
"""Send an image to specified recipient using URL.
Image must be PNG or JPEG or GIF (more might be supported).
https://developers.facebook.com/docs/messenger-platform/send-api-reference/image-attachment
Input:
recipient_id: recipient id to send to
image_url: url of image to be sent
Output:
Response from API as <dict>
"""
return self.send_attachment_url(recipient_id, "image", image_url, notification_type)
def send_audio(self, recipient_id, audio_path, notification_type=NotificationType.regular):
"""Send audio to the specified recipient.
Audio must be MP3 or WAV
https://developers.facebook.com/docs/messenger-platform/send-api-reference/audio-attachment
Input:
recipient_id: recipient id to send to
audio_path: path to audio to be sent
Output:
Response from API as <dict>
"""
return self.send_attachment(recipient_id, "image", audio_path, notification_type)
def send_audio_url(self, recipient_id, audio_url, notification_type=NotificationType.regular):
"""Send audio to specified recipient using URL.
Audio must be MP3 or WAV
https://developers.facebook.com/docs/messenger-platform/send-api-reference/audio-attachment
Input:
recipient_id: recipient id to send to
audio_url: url of audio to be sent
Output:
Response from API as <dict>
"""
return self.send_attachment_url(recipient_id, "audio", audio_url, notification_type)
def send_video(self, recipient_id, video_path, notification_type=NotificationType.regular):
"""Send video to the specified recipient.
Video should be MP4 or MOV, but supports more (https://www.facebook.com/help/218673814818907).
https://developers.facebook.com/docs/messenger-platform/send-api-reference/video-attachment
Input:
recipient_id: recipient id to send to
video_path: path to video to be sent
Output:
Response from API as <dict>
"""
return self.send_attachment(recipient_id, "video", video_path, notification_type)
def send_video_url(self, recipient_id, video_url, notification_type=NotificationType.regular):
"""Send video to specified recipient using URL.
Video should be MP4 or MOV, but supports more (https://www.facebook.com/help/218673814818907).
https://developers.facebook.com/docs/messenger-platform/send-api-reference/video-attachment
Input:
recipient_id: recipient id to send to
video_url: url of video to be sent
Output:
Response from API as <dict>
"""
return self.send_attachment_url(recipient_id, "video", video_url, notification_type)
def send_file(self, recipient_id, file_path, notification_type=NotificationType.regular):
"""Send file to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/file-attachment
Input:
recipient_id: recipient id to send to
file_path: path to file to be sent
Output:
Response from API as <dict>
"""
return self.send_attachment(recipient_id, "file", file_path, notification_type)
def send_file_url(self, recipient_id, file_url, notification_type=NotificationType.regular):
"""Send file to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/file-attachment
Input:
recipient_id: recipient id to send to
file_url: url of file to be sent
Output:
Response from API as <dict>
"""
return self.send_attachment_url(recipient_id, "file", file_url, notification_type)
def send_airline_itinerary(self, recipient_id, payload, notification_type=NotificationType.regular):
return self.send_message(recipient_id, {
"attachment": {
"type": "template",
"payload": payload
}
}, notification_type)
def get_user_info(self, recipient_id, fields=None):
"""Getting information about the user
https://developers.facebook.com/docs/messenger-platform/user-profile
Input:
recipient_id: recipient id to send to
Output:
Response from API as <dict>
"""
params = {}
if fields is not None and isinstance(fields, (list, tuple)):
params['fields'] = ",".join(fields)
params.update(self.auth_args)
request_endpoint = '{0}/{1}'.format(self.graph_url, recipient_id)
response = requests.get(request_endpoint, params=params)
if response.status_code == 200:
return response.json()
return None
def send_raw(self, payload):
request_endpoint = '{0}/me/messages'.format(self.graph_url)
response = requests.post(
request_endpoint,
params=self.auth_args,
json=payload
)
result = response.json()
from pprint import pprint
return result
def _send_payload(self, payload):
""" Deprecated, use send_raw instead """
return self.send_raw(payload)