From 8f0dc20e176eadc63628f0832f6f6d8c26caf6e4 Mon Sep 17 00:00:00 2001 From: gzukowski Date: Thu, 18 Jan 2024 22:18:45 +0000 Subject: [PATCH] Changed the way of getting the refresh token --- python/Auth0/main.py | 98 ++++++++++++++++++++++------------ python/ChimpInsert/main.py | 104 ++++++++++++++++++++++--------------- python/Stripe/main.py | 27 ++++++---- 3 files changed, 146 insertions(+), 83 deletions(-) diff --git a/python/Auth0/main.py b/python/Auth0/main.py index 96f5727..abbfc59 100644 --- a/python/Auth0/main.py +++ b/python/Auth0/main.py @@ -1,25 +1,30 @@ -import asyncio -from scramjet.streams import Stream import requests -import requests_async import json +import enum +import asyncio from shiftarray import ShiftArray +from scramjet.streams import Stream provides = {"provides": "pipe", "contentType": "text/plain"} WAIT_TIME_ON_USER = 5 WAIT_TIME_ERROR = 3 +TOKEN_HEADER = {"content-type": "application/json"} + +class TokenRefreshResult(enum.Enum): + SUCCESS = 200 + FAILURE = 0 + + class Auth0: - def __init__(self, verified_url, users_url, api_url, request_data, stream): + def __init__(self, verified_url, users_url, api_url, request_data, stream, logger): self.verified_url = verified_url self.users_url = users_url self.api_url = api_url - self.request_data = request_data - self.token_header = { - "content-type": "application/json", - } + self.token_payload = request_data self.stream = stream + self.logger = logger def lookahead(self, iterable): it = iter(iterable) @@ -29,63 +34,92 @@ def lookahead(self, iterable): last = val yield last, False + + async def refresh_token(self): + response = requests.post( + self.api_url, headers=TOKEN_HEADER, data=self.token_payload + ) + if response.status_code == TokenRefreshResult.SUCCESS.value: + self.logger.info("Auth0: Token succesfully refreshed!") + return TokenRefreshResult.SUCCESS.value, json.loads(response.text)["access_token"] + + self.logger.error("Auth0: Refreshing the token has failed!") + return TokenRefreshResult.FAILURE.value, None + + async def get_auth(self): - last_verified = "" - last_user = "" + + code, token = await self.refresh_token() + + if code != 200: + raise Exception(f"Auth0: Refreshing the token has failed!") + + self.logger.info(f"Auth0: Succesfully initiated the token - code: {code}, token: {token}") + + last_verified = str() + last_user = str() buffer_verified = ShiftArray() buffer_users = ShiftArray() - response = requests.post( - self.api_url, headers=self.token_header, data=self.request_data - ) - token = json.loads(response.text)["access_token"] + while True: headers = {"authorization": f"Bearer {token}"} - verified = await requests_async.get(self.verified_url, headers=headers) - users = await requests_async.get(self.users_url, headers=headers) - if verified.status_code != 200: + + try: + verified = requests.get(self.verified_url, headers=headers) + users = requests.get(self.users_url, headers=headers) + except Exception as error: + raise Exception(f"Auth0: Failed to make a request: {error}") + + + if users.status_code != 200: await asyncio.sleep(WAIT_TIME_ERROR) - response = requests.post( - self.api_url, headers=self.token_header, data=self.request_data - ) - token = json.loads(response.text)["access_token"] + self.logger.info("Auth0: Token's refresh was forced.") + + code, token = await self.refresh_token() continue + verified = verified.json() users = users.json() - if verified[-1]["email"] != last_verified: - for result, has_more in self.lookahead(verified): - if not buffer_verified.contains(result["email"]): - self.stream.write( - result["email"] + " " + result["nickname"] + " auth0-newsletter" - ) - buffer_verified.append(result["email"]) - if not has_more: - last_verified = result["email"] + if users[-1]["email"] != last_user: for result, has_more in self.lookahead(users): if not buffer_users.contains(result["email"]): + self.logger.info(f"Auth0: New user registered in auth0: {result['email']}") self.stream.write( result["email"] + " " + result["nickname"] + " auth0-user" ) buffer_users.append(result["email"]) if not has_more: last_user = result["email"] + + if verified[-1]["email"] != last_verified: + for result, has_more in self.lookahead(verified): + if not buffer_verified.contains(result["email"]): + self.logger.info(f"Auth0: User allowed the newsletter: {result['email']}") + self.stream.write( + result["email"] + " " + result["nickname"] + " auth0-newsletter" + ) + buffer_verified.append(result["email"]) + if not has_more: + last_verified = result["email"] await asyncio.sleep(WAIT_TIME_ON_USER) async def run(context, input): config = context.config stream = Stream() + try: run.verified_url = config["auth0_verified_url"] run.users_url = config["auth0_users_url"] run.api_url = config["api_url"] run.data = json.dumps(config["request_data"]) except Exception as error: - raise Exception(f"Config not loaded: {error}") + raise Exception(f"Auth0: Config not loaded: {error}") asyncio.gather( - Auth0(run.verified_url,run.users_url ,run.api_url, run.data, stream).get_auth(), + Auth0(run.verified_url, run.users_url , run.api_url, run.data, stream, context.logger).get_auth(), return_exceptions=True, ) return stream.map(lambda x: x + "\n") diff --git a/python/ChimpInsert/main.py b/python/ChimpInsert/main.py index e526ada..8e1587b 100644 --- a/python/ChimpInsert/main.py +++ b/python/ChimpInsert/main.py @@ -1,6 +1,7 @@ import requests import asyncio import json +import enum from scramjet.streams import Stream import mailchimp_marketing from mailchimp_marketing import Client @@ -13,6 +14,12 @@ WAIT_TIME_ERROR = 3 +class TopicInfo(enum.Enum): + STRIPE = "stripe" + AUTH0 = "auth0-user" + A0_NEWSLETTER = "auth0-newsletter" + + class ChimpInsert: def __init__(self, audience_id, config, slack_channel_id, slack_api_url, logger): self.mailchimp = Client() @@ -36,53 +43,68 @@ def get_info(self, info, given): if given == member["email_address"]: return member['id'] return -1 - - def insert_info(self, info): - try: - email, fname, lname = info.split(" ") - except: - self.logger.info("Bad data received at topic") - - try: - member_info = { + + def prepare_member_info(self, email, fname, lname): + member_info = { "email_address": email, "status":"unsubscribed", "merge_fields" : { "FNAME":fname, "LNAME":lname.replace("\n", "") }} - if "auth0-user" in lname: - try: - response = self.mailchimp.lists.add_list_member(self.audience_id, member_info) - self.logger.info("Auth0 user successfully added") - slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user successfully added"})) - except ApiClientError as error: - return - elif "auth0-newsletter" in lname: - try: - member_info['status'] = "subscribed" - response = self.mailchimp.lists.add_list_member(self.audience_id, member_info) - self.logger.info(f"{email} Auth0 user with newsletter successfully added") - slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user with newsletter successfully added"})) - except ApiClientError as error: - error = json.loads(error.text) - if error["title"] == "Member Exists": - response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id)) - user_id = self.get_info(response, email) - if user_id == -1: - return - response = self.mailchimp.update_list_member(self.audience_id, user_id, {"status" : "subscribed"}) - self.logger.info(f"{email} Auth0 user with newsletter successfully added") - elif "stripe" in lname: - response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id)) - user_id = self.get_info(response, email) - if user_id == -1: - return - response = self.mailchimp.lists.update_list_member_tags(self.audience_id, user_id, {"tags" : [{"name": "Stripe", "status": "active"}]}) - self.logger.info(f"{email} Stripe user successfully synchronized") - slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Stripe user successfully synchronized"})) + return member_info + + def insert_info(self, info): + + try: + email, fname, lname = info.split(" ") + except ValueError: + self.logger.info("ChimpInsert: Bad data received at topic") + return + + member_info = {} + + try: + member_info = self.prepare_member_info(email, fname, lname) except: - self.logger.info("No data received") + self.logger.info("ChimpInsrt: No data received") + return + + + if TopicInfo.AUTH0.value in lname: + try: + response = self.mailchimp.lists.add_list_member(self.audience_id, member_info) + self.logger.info("ChimpInsert: Auth0 user successfully added") + slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user successfully added"})) + except ApiClientError as error: + return + + + elif TopicInfo.A0_NEWSLETTER.value in lname: + try: + member_info['status'] = "subscribed" + response = self.mailchimp.lists.add_list_member(self.audience_id, member_info) + self.logger.info(f"ChimpInsert: {email} Auth0 user with newsletter successfully added") + slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user with newsletter successfully added"})) + except ApiClientError as error: + error = json.loads(error.text) + if error["title"] == "Member Exists": + response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id)) + user_id = self.get_info(response, email) + if user_id == -1: + return + response = self.mailchimp.update_list_member(self.audience_id, user_id, {"status" : "subscribed"}) + self.logger.info(f"ChimpInsert: {email} Auth0 user with newsletter successfully added") + + elif TopicInfo.AUTH0.stripe in lname: + response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id)) + user_id = self.get_info(response, email) + if user_id == -1: + return + response = self.mailchimp.lists.update_list_member_tags(self.audience_id, user_id, {"tags" : [{"name": "Stripe", "status": "active"}]}) + self.logger.info(f"ChimpInsert: {email} Stripe user successfully synchronized") + slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Stripe user successfully synchronized"})) + async def run(context, input): try: @@ -91,7 +113,7 @@ async def run(context, input): audience_id = context.config['audience_id'] config = {"api_key": context.config['mailchimp_api'], "server": context.config['mailchimp_server']} except Exception as error: - raise Exception(f"Config not loaded: {error}") + raise Exception(f"ChimpInsert: Config not loaded: {error}") inserter = ChimpInsert(audience_id, config, slack_channel_id, slack_api_url, context.logger) return input.each(inserter.insert_info) \ No newline at end of file diff --git a/python/Stripe/main.py b/python/Stripe/main.py index 6f2ac31..25d86f0 100644 --- a/python/Stripe/main.py +++ b/python/Stripe/main.py @@ -3,15 +3,18 @@ import asyncio from scramjet.streams import Stream +EVENT_REFRESH_DELAY = 3 + provides = { 'provides': 'pipe', 'contentType': 'text/plain' } class Stripe: - def __init__(self,stream,stripe_api): + def __init__(self, stream, stripe_api, logger): self.stream = stream self.stripe_api = stripe_api + self.logger = logger def get_mail(self, user): return user['data']['object']['email'] @@ -23,25 +26,29 @@ def get_fullname(self, user): return name async def get_event(self): - await asyncio.sleep(3) compared = stripe.Event.list(type="customer.created")['data'][-1] - self.stream.write(self.get_mail(compared)+ " " +self.get_fullname(compared)) + self.logger.info(f"New user in stripe {self.get_mail(compared)}") + self.stream.write(self.get_mail(compared)+ " " + self.get_fullname(compared)) + compared = compared['id'] + while True: - test = stripe.Event.list(type="customer.created", ending_before=compared, limit=3)['data'] + events = stripe.Event.list(type="customer.created", ending_before=compared, limit=3)['data'] + + for i in range(len(events)): + self.logger.info(f"New user in stripe {self.get_mail(events[i])}") + self.stream.write(self.get_mail(events[i]) + " " + self.get_fullname(events[i])) + if len(events) != 0: + compared = events[0]['id'] - for i in range(len(test)): - self.stream.write(self.get_mail(test[i]) + " " +self.get_fullname(test[i])) - if len(test) != 0: - compared = test[0]['id'] - await asyncio.sleep(3) + await asyncio.sleep(EVENT_REFRESH_DELAY) async def run(context, input): stream = Stream() try: stripe.api_key = context.config['stripe_api'] - stripeReader = Stripe(stream, stripe.api_key) + stripeReader = Stripe(stream, stripe.api_key, context.logger) except Exception as error: raise Exception(f"Config not loaded: {error}") return