Skip to content

Commit

Permalink
refactor(instagram): ♻️ refactor instagram api method.
Browse files Browse the repository at this point in the history
  • Loading branch information
MerleLiuKun committed Oct 16, 2019
1 parent caa4e0d commit 26b75ac
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 413 deletions.
292 changes: 123 additions & 169 deletions pyfacebook/api/instagram.py
Expand Up @@ -40,220 +40,174 @@ def __init__(self, app_id=None,
if self.instagram_business_id is None:
raise PyFacebookError({"message": "Must provide your instagram business id"})

def get_user_info(self, username=None, return_json=False):
def discovery_user(self,
username,
include_media=False,
return_json=False):
"""
Obtain Instagram given user's basic info,
If not provide username, will return the business id's info.
Obtain the Instagram Business user info. If user not belong to business.
This will return not found.
Args:
username (str, optional)
The username for you want to retrieve data.
username (str)
The username for you want to retrieve data. Business discovery only for username.
include_media (bool, optional)
If you want get recent media by this. Provide this with True
return_json (bool, optional)
If True JSON data will be returned, instead of pyfacebook.InstagramUser.
Returns:
user's public data.
IG business user public info.
"""
if username is None:
params = ','.join(constant.INSTAGRAM_USER_FIELD)
else:
params = 'business_discovery.username({username}){{{fields}}}'.format(
username=username,
fields=','.join(constant.INSTAGRAM_USER_FIELD)
)

args = {
'fields': params
}
metric = constant.INSTAGRAM_USER_FIELD
if include_media:
metric = metric.union({'media{{{}}}'.format(','.join(constant.INSTAGRAM_MEDIA_PUBLIC_FIELD))})
fields = 'business_discovery.username({username}){{{metric}}}'.format(
username=username,
metric=','.join(metric)
)
resp = self._request(
method='GET',
path='{0}/{1}'.format(self.version, self.instagram_business_id),
args=args
args={
'fields': fields
}
)
data = self._parse_response(resp.content.decode('utf-8'))
if return_json:
return data
return data['business_discovery']
else:
return InstagramUser.new_from_json_dict(data['business_discovery'])

def get_media_info(self, media_id=None, return_json=False):
def discovery_user_medias(self,
username,
since_time=None,
until_time=None,
count=10,
limit=10,
return_json=False):
"""
Obtain given media's info. Must the media is the token's owner's.
Others can not get data by this method.
Obtain given user's media by business discovery method.
Args:
media_id (str)
The id for you want to retrieve media.
username (str)
the username which business account you want to retrieve data.
since_time (str, optional)
The medias retrieve begin time, format is %Y-%m-%d.
If not provide, will not limit by this.
until_time (str, optional)
The media retrieve until time, format is %Y-%m-%d.
If not provide, will not limit by this.
count (int, optional)
The count is you want to retrieve medias. Default is 10.
For now This may be not more than 10K.
limit (int, optional)
The count each request get the result count. Default is 10.
return_json (bool, optional):
If True origin data by facebook will be returned, or will return pyfacebook.InstagramMedia.
If True origin data by facebook will be returned, or will return pyfacebook.InstagramMedia list
Returns:
media basic data.
media data list.
"""
if media_id is None:
raise PyFacebookError({'message': "Must specify the media id"})
try:
if since_time is not None:
since_time = datetime.datetime.strptime(since_time, '%Y-%m-%d')
if until_time is not None:
until_time = datetime.datetime.strptime(until_time, '%Y-%m-%d')
except (ValueError, TypeError):
raise PyFacebookError({'message': 'since_time or until_time must format as %Y-%m-%d'})

args = {
'fields': ','.join(constant.INSTAGRAM_MEDIA_PUBLIC_FIELD + constant.INSTAGRAM_MEDIA_OWNER_FIELD)
'path': '{0}/{1}'.format(self.version, self.instagram_business_id),
'username': username,
'limit': min(limit, count)
}

resp = self._request(
method='GET',
path='{0}/{1}'.format(self.version, media_id),
args=args
)
medias = []
next_cursor = None

data = self._parse_response(resp.content.decode('utf-8'))
if return_json:
return data
else:
return InstagramMedia.new_from_json_dict(data)
while True:
next_cursor, previous_cursor, data = self.paged_by_cursor(
args=args,
next_cursor=next_cursor,
business_discovery=True
)
data = data.get('data', [])
# check if the media meet the request.
for item in data:
timestamp = datetime.datetime.strptime(item['timestamp'][:-5], '%Y-%m-%dT%H:%M:%S')
begin_flag = True if since_time is None else since_time < timestamp
end_flag = True if until_time is None else until_time > timestamp

if all([begin_flag, end_flag]):
if return_json:
medias.append(item)
else:
medias.append(InstagramMedia.new_from_json_dict(item))
if not begin_flag:
next_cursor = None
break
if next_cursor is None:
break
if len(medias) >= count:
break
return medias

def get_media_paged(self,
args,
since_time,
until_time,
def paged_by_cursor(self,
target=None,
resource=None,
args=None,
next_cursor=None,
owner=False):
business_discovery=False):
"""
Fetch paging data from api.
Paging response data by cursor.
If paged business response data. Parameter args need contain basic params like username and limit.
If paged owner data just like normally request. Provide target,resource,query_args.
Args:
target (str, optional)
The page id for which you want to get resource data.
resource (str, optional)
The resource string for data. Like media and so on.
args (dict)
args contain params for get data. Whether the owner the handle is different.
since_time (str, optional)
media begin time, format is %Y-%m-%d
until_time (str, optional)
media end time, format is %Y-%m-%d
For owner data, this is query params for request.
For business discovery, this contain basic fields for username and so on.
next_cursor (str, optional)
The paging next page url. for begin it is None.
owner (bool)
This flag is show if pointed user is instagram business account.
The paging cursor str. It will return from the graph api.
business_discovery (bool, optional)
If use business discovery, this should be True.
Returns:
next_cursor (str), previous_cursor (str), list of pyfacebook.InstagramMedia instances.
The origin data return from the graph api.
"""

if owner:
path = '{0}/{1}/media'.format(self.version, self.instagram_business_id)
if next_cursor:
args.update({
'after': next_cursor,
})
else:
path = '{0}/{1}'.format(self.version, self.instagram_business_id)
if next_cursor:
username = args['username']
limit = args['limit']
p = 'business_discovery.username({username}){{media.after({after}).limit({limit}){{{fields}}}}}'.format(
username=username,
limit=limit,
if business_discovery:
if next_cursor is not None:
fields = 'business_discovery.username({username}){{media.after({after}).limit({limit}){{{fields}}}}}'.format(
username=args['username'],
limit=args['limit'],
after=next_cursor,
fields=','.join(constant.INSTAGRAM_MEDIA_PUBLIC_FIELD)
)
else:
p = args['fields']
args = {'fields': p}
fields = 'business_discovery.username({username}){{media.limit({limit}){{{fields}}}}}'.format(
username=args['username'],
limit=args['limit'],
fields=','.join(constant.INSTAGRAM_MEDIA_PUBLIC_FIELD)
)
path = args['path']
args = {'fields': fields}
else:
path = '{0}/{1}/{2}'.format(self.version, target, resource)
if next_cursor is not None:
args['after'] = next_cursor

resp = self._request(
method='GET',
path=path,
args=args
)

next_cursor, previous_cursor = None, None
data = self._parse_response(resp.content.decode('utf-8'))
if not owner:
# Note: business discover only support for media.
if business_discovery:
data = data['business_discovery']['media']

if 'paging' in data:
cursors = data['paging'].get('cursors', {})
next_cursor = cursors.get('after')
previous_cursor = cursors.get('before')

result = []
try:
if since_time is not None:
since_time = datetime.datetime.strptime(since_time, '%Y-%m-%d')
if until_time is not None:
until_time = datetime.datetime.strptime(until_time, '%Y-%m-%d')
except ValueError:
since_time, until_time = None, None

for item in data['data']:
timestamp = datetime.datetime.strptime(item['timestamp'][:-5], '%Y-%m-%dT%H:%M:%S')
begin = True if since_time is None else since_time < timestamp
end = True if until_time is None else until_time > timestamp

if all([begin, end]):
result.append(item)
# result.append(InstagramMedia.new_from_json_dict(item))
if not begin:
next_cursor = None
break

return next_cursor, previous_cursor, result

def get_medias(self,
username=None,
since_time=None,
until_time=None,
count=10,
limit=5,
return_json=False):
"""
Obtain given user's media.
If username not provide, will return the instagram business account's media.
Args:
username (str)
the user you want to retrieve data. If not provide. use the business account.
since_time (str, optional)
The medias retrieve begin time.
until_time ()
The media retrieve until time.
If neither since_time or until_time, it will by now time.
count (int, optional)
The count is you want to retrieve medias.
limit (int, optional)
The count each request get the result count. default is 5.
return_json (bool, optional):
If True origin data by facebook will be returned, or will return pyfacebook.InstagramMedia list
Returns:
media data list.
"""
if username is None:
owner = True
args = {
'fields': ','.join(constant.INSTAGRAM_MEDIA_OWNER_FIELD + constant.INSTAGRAM_MEDIA_PUBLIC_FIELD),
'limit': limit,
}
else:
# notice:
# this args is to provide origin data to paged methods.
owner = False
args = {
'limit': limit,
'username': username,
'fields': 'business_discovery.username({username}){{media.limit({limit}){{{fields}}}}}'.format(
username=username,
limit=limit,
fields=','.join(constant.INSTAGRAM_MEDIA_PUBLIC_FIELD)
)
}

result = []
next_cursor = None

while True:
next_cursor, previous_cursor, medias = self.get_media_paged(
args=args, since_time=since_time, until_time=until_time,
next_cursor=next_cursor, owner=owner
)
if return_json:
result += medias
else:
result += [InstagramMedia.new_from_json_dict(item) for item in medias]
if next_cursor is None:
break
if len(result) >= count:
result = result[:count]
break
return result
return next_cursor, previous_cursor, data
10 changes: 10 additions & 0 deletions pyfacebook/models.py
Expand Up @@ -509,6 +509,7 @@ def __init__(self, **kwargs):
'ig_id': None,
'followers_count': None,
'follows_count': None,
'media': None,
'media_count': None,
'name': None,
'profile_picture_url': None,
Expand All @@ -522,6 +523,15 @@ def __repr__(self):
uid=self.id, username=self.username
)

@classmethod
def new_from_json_dict(cls, data, **kwargs):
media = data.get('media')
if media is not None:
media = [InstagramMedia.new_from_json_dict(item) for item in media.get('data', [])]
return super(cls, cls).new_from_json_dict(
data=data, media=media
)


class InstagramChildren(BaseModel):
"""
Expand Down

0 comments on commit 26b75ac

Please sign in to comment.