diff --git a/python/Auth0/config.json b/python/Auth0/config.json index 1ff3671..490bf32 100644 --- a/python/Auth0/config.json +++ b/python/Auth0/config.json @@ -1,6 +1,7 @@ { - "auth0_query_url" : "https://your_tenant.auth0.com/api/v2/users?per_page=5&sort=created_at%3A-1&fields=created_at%2Cnickname%2Cemail%2Cupdated_at&q=user_metadata.newsletter%3D%22true%22", + "auth0_verified_url" : "https://your_tenant.auth0.com/api/v2/users?per_page=5&sort=created_at%3A-1&fields=created_at%2Cnickname%2Cemail%2Cupdated_at&q=user_metadata.newsletter%3D%22true%22", + "auth0_users_url" :"https://your_tenant.auth0.com/api/v2/users?per_page=5&sort=created_at%3A-1", "api_url" : "", "request_data" : {"client_id":"","client_secret":"","audience":"","grant_type":"client_credentials"} diff --git a/python/Auth0/main.py b/python/Auth0/main.py index a0366e2..96f5727 100644 --- a/python/Auth0/main.py +++ b/python/Auth0/main.py @@ -1,71 +1,91 @@ import asyncio from scramjet.streams import Stream import requests +import requests_async import json +from shiftarray import ShiftArray -class ShiftArray: - def __init__(self): - self.array = [] +provides = {"provides": "pipe", "contentType": "text/plain"} - def append(self, value): - if len(self.array) == 5: - self.array.pop(-1) - self.array.append(value) +WAIT_TIME_ON_USER = 5 +WAIT_TIME_ERROR = 3 - def contains(self, value): - return value in self.array +class Auth0: + def __init__(self, verified_url, users_url, api_url, request_data, stream): + self.verified_url = verified_url + self.users_url = users_url + self.api_url = api_url + self.request_data = request_data + self.token_header = { + "content-type": "application/json", + } + self.stream = stream - def get_values(self): - return self.array + def lookahead(self, iterable): + it = iter(iterable) + last = next(it) + for val in it: + yield last, True + last = val + yield last, False + async def get_auth(self): + last_verified = "" + last_user = "" + buffer_verified = ShiftArray() + buffer_users = ShiftArray() + response = requests.post( + self.api_url, headers=self.token_header, data=self.request_data + ) + token = json.loads(response.text)["access_token"] + while True: + headers = {"authorization": f"Bearer {token}"} + verified = await requests_async.get(self.verified_url, headers=headers) + users = await requests_async.get(self.users_url, headers=headers) + if verified.status_code != 200: + await asyncio.sleep(WAIT_TIME_ERROR) + response = requests.post( + self.api_url, headers=self.token_header, data=self.request_data + ) + token = json.loads(response.text)["access_token"] + continue + verified = verified.json() + users = users.json() + if verified[-1]["email"] != last_verified: + for result, has_more in self.lookahead(verified): + if not buffer_verified.contains(result["email"]): + self.stream.write( + result["email"] + " " + result["nickname"] + " auth0-newsletter" + ) + buffer_verified.append(result["email"]) + if not has_more: + last_verified = result["email"] -def lookahead(iterable): - it = iter(iterable) - last = next(it) - for val in it: - yield last, True - last = val - yield last, False + if users[-1]["email"] != last_user: + for result, has_more in self.lookahead(users): + if not buffer_users.contains(result["email"]): + self.stream.write( + result["email"] + " " + result["nickname"] + " auth0-user" + ) + buffer_users.append(result["email"]) + if not has_more: + last_user = result["email"] + await asyncio.sleep(WAIT_TIME_ON_USER) -provides = { - 'provides': 'pipe', - 'contentType': 'text/plain' -} - -async def get_auth(stream): - token_header = {'content-type': 'application/json',} - last = "" - buffer = ShiftArray() - response = requests.post(run.api_url, headers=token_header, data=run.data) - token = json.loads(response.text)['access_token'] - - while True: - headers = {'authorization' : f"Bearer {token}"} - users = requests.get(run.query, headers=headers).json() - - if "error" in str(users): - response = requests.post(run.api_url, headers=token_header, data=run.data) - token = json.loads(response.text)['access_token'] - continue - - if users[-1]['email'] != last: - for result, has_more in lookahead(users): - if not buffer.contains(result['email']): - stream.write(result['email'] +" "+result['nickname']+" auth0-user") - buffer.append(result['email']) - if not has_more: - last = result['email'] - await asyncio.sleep(5) async def run(context, input): - config = context.config - stream = Stream() - try: - run.query = config['auth0_query_url'] - run.api_url = config['api_url'] - run.data = json.dumps(config['request_data']) - asyncio.gather(get_auth(stream), return_exceptions=True) - except Exception as error: - raise Exception(f"Config not loaded: {error}") - return - return stream.map(lambda x : x + '\n') \ No newline at end of file + config = context.config + stream = Stream() + try: + run.verified_url = config["auth0_verified_url"] + run.users_url = config["auth0_users_url"] + run.api_url = config["api_url"] + run.data = json.dumps(config["request_data"]) + except Exception as error: + raise Exception(f"Config not loaded: {error}") + + asyncio.gather( + Auth0(run.verified_url,run.users_url ,run.api_url, run.data, stream).get_auth(), + return_exceptions=True, + ) + return stream.map(lambda x: x + "\n") diff --git a/python/Auth0/requirements.txt b/python/Auth0/requirements.txt index bf5717a..cbd32b4 100644 --- a/python/Auth0/requirements.txt +++ b/python/Auth0/requirements.txt @@ -1,2 +1,3 @@ scramjet-framework-py requests +requests_async diff --git a/python/Auth0/shiftarray.py b/python/Auth0/shiftarray.py new file mode 100644 index 0000000..68be37e --- /dev/null +++ b/python/Auth0/shiftarray.py @@ -0,0 +1,18 @@ +class ShiftArray: + def __init__(self, length=5): + self.array = [] + self.length = length + + def append(self, value): + if len(self.array) == self.length: + self.array.pop(-1) + self.array.append(value) + + def contains(self, value): + return value in self.array + + def get_values(self): + return self.array + + def __str__(self): + return str(self.array) \ No newline at end of file diff --git a/python/ChimpInsert/config.json b/python/ChimpInsert/config.json index 67c1930..fecd2ff 100644 --- a/python/ChimpInsert/config.json +++ b/python/ChimpInsert/config.json @@ -1,5 +1,7 @@ { - "mailchimp_api" : "", - "audience_id" : "", - "mailchimp_server" : "" + "slack_api_url" : "", + "slack_channel_id" : "", + "mailchimp_api" : "", + "audience_id" : "", + "mailchimp_server" : "" } \ No newline at end of file diff --git a/python/ChimpInsert/main.py b/python/ChimpInsert/main.py index 5fa583b..e526ada 100644 --- a/python/ChimpInsert/main.py +++ b/python/ChimpInsert/main.py @@ -6,70 +6,92 @@ from mailchimp_marketing import Client from mailchimp_marketing.api_client import ApiClientError - requires = { 'requires': 'pipe', 'contentType': 'text/plain' } -mailchimp = Client() - -def get_offset(audience_id): - response = mailchimp.lists.get_list(audience_id)['stats'] - users_num = response['member_count'] + response['unsubscribe_count'] - if users_num < 5: - return 0 - else: - return users_num-5 +WAIT_TIME_ERROR = 3 -def get_info(info, given): - for member in info['members']: - if given == member["email_address"]: - return member['id'] +class ChimpInsert: + def __init__(self, audience_id, config, slack_channel_id, slack_api_url, logger): + self.mailchimp = Client() + self.audience_id = audience_id + self.mailchimp.set_config(config) + self.slack_channel_id = slack_channel_id + self.slack_api_url = slack_api_url + self.token_header = {'content-type': 'application/json',} + self.logger = logger + + def get_offset(self, audience_id): + response = self.mailchimp.lists.get_list(audience_id)['stats'] + users_num = response['member_count'] + response['unsubscribe_count'] + if users_num < 5: + return 0 + else: + return users_num-5 + + def get_info(self, info, given): + for member in info['members']: + if given == member["email_address"]: + return member['id'] + return -1 + + def insert_info(self, info): + try: + email, fname, lname = info.split(" ") + except: + self.logger.info("Bad data received at topic") -async def insert_info(info): - try: - email, fname, lname = info.split(" ") - except: - print("No data received. start") - response = mailchimp.ping.get() - try: - member_info = { - "email_address": email, - "status":"unsubscribed", - "merge_fields" : { - "FNAME":fname, - "LNAME":lname.replace("\n", "") - }} - try: - response = mailchimp.lists.add_list_member(run.audience_id, member_info) - print(f"{email} Auth0 user successfully added") - except ApiClientError as error: - error = json.loads(error.text) - if error["title"] == "Member Exists": - try: - response = mailchimp.lists.get_list_members_info(run.audience_id, offset=get_offset(run.audience_id)) - user_id = get_info(response, email) - if "stripe" in lname: - response = mailchimp.lists.update_list_member_tags(run.audience_id, user_id, {"tags" : [{"name": "Stripe", "status": "active"}]}) - print(f"{email} Stripe user successfully synchronized") - except ApiClientError as err: - print("Error") - except: - print("No data received.") - + try: + member_info = { + "email_address": email, + "status":"unsubscribed", + "merge_fields" : { + "FNAME":fname, + "LNAME":lname.replace("\n", "") + }} + if "auth0-user" in lname: + try: + response = self.mailchimp.lists.add_list_member(self.audience_id, member_info) + self.logger.info("Auth0 user successfully added") + slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user successfully added"})) + except ApiClientError as error: + return + elif "auth0-newsletter" in lname: + try: + member_info['status'] = "subscribed" + response = self.mailchimp.lists.add_list_member(self.audience_id, member_info) + self.logger.info(f"{email} Auth0 user with newsletter successfully added") + slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user with newsletter successfully added"})) + except ApiClientError as error: + error = json.loads(error.text) + if error["title"] == "Member Exists": + response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id)) + user_id = self.get_info(response, email) + if user_id == -1: + return + response = self.mailchimp.update_list_member(self.audience_id, user_id, {"status" : "subscribed"}) + self.logger.info(f"{email} Auth0 user with newsletter successfully added") + elif "stripe" in lname: + response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id)) + user_id = self.get_info(response, email) + if user_id == -1: + return + response = self.mailchimp.lists.update_list_member_tags(self.audience_id, user_id, {"tags" : [{"name": "Stripe", "status": "active"}]}) + self.logger.info(f"{email} Stripe user successfully synchronized") + slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Stripe user successfully synchronized"})) + except: + self.logger.info("No data received") async def run(context, input): - try: - run.audience_id = context.config['audience_id'] - mailchimp.set_config({ - "api_key": context.config['mailchimp_api'], - "server": context.config['mailchimp_server'], - }) - except Exception as error: - raise Exception(f"Config not loaded: {error}") - return - return input.each(insert_info) - - - + try: + slack_channel_id = context.config['slack_channel_id'] + slack_api_url = context.config['slack_api_url'] + audience_id = context.config['audience_id'] + config = {"api_key": context.config['mailchimp_api'], "server": context.config['mailchimp_server']} + except Exception as error: + raise Exception(f"Config not loaded: {error}") + + inserter = ChimpInsert(audience_id, config, slack_channel_id, slack_api_url, context.logger) + return input.each(inserter.insert_info) \ No newline at end of file diff --git a/python/Stripe/main.py b/python/Stripe/main.py index e3fbc38..6f2ac31 100644 --- a/python/Stripe/main.py +++ b/python/Stripe/main.py @@ -8,42 +8,42 @@ 'contentType': 'text/plain' } +class Stripe: + def __init__(self,stream,stripe_api): + self.stream = stream + self.stripe_api = stripe_api + + def get_mail(self, user): + return user['data']['object']['email'] + + def get_fullname(self, user): + mail = user['data']['object']['email'] + name = mail.split("@") + name = name[0] + " stripe" + return name + + async def get_event(self): + await asyncio.sleep(3) + compared = stripe.Event.list(type="customer.created")['data'][-1] -def get_mail(user): - return user['data']['object']['email'] - -def get_fullname(user): - mail = user['data']['object']['email'] - name = mail.split("@") - name = name[0] + " stripe" - return name + self.stream.write(self.get_mail(compared)+ " " +self.get_fullname(compared)) + compared = compared['id'] + while True: + test = stripe.Event.list(type="customer.created", ending_before=compared, limit=3)['data'] -async def get_event(stream): - await asyncio.sleep(3) - compared = stripe.Event.list(type="customer.created")['data'][-1] - tako = stripe.Event.list(type="customer.created")['data'] - - stream.write(get_mail(compared)+ " " +get_fullname(compared)) - compared = compared['id'] - while True: - test = stripe.Event.list(type="customer.created", ending_before=compared, limit=3)['data'] - - for i in range(len(test)): - stream.write(get_mail(test[i]) + " " +get_fullname(test[i])) - if len(test) != 0: - compared = test[0]['id'] - await asyncio.sleep(3) - + for i in range(len(test)): + self.stream.write(self.get_mail(test[i]) + " " +self.get_fullname(test[i])) + if len(test) != 0: + compared = test[0]['id'] + await asyncio.sleep(3) async def run(context, input): - try: - stripe.api_key = context.config['stripe_api'] - except Exception as error: - raise Exception(f"Config not loaded: {error}") - return - stream = Stream() - asyncio.gather(get_event(stream), return_exceptions=True) - return stream.map(lambda x : x + "\n") - - - + stream = Stream() + try: + stripe.api_key = context.config['stripe_api'] + stripeReader = Stripe(stream, stripe.api_key) + except Exception as error: + raise Exception(f"Config not loaded: {error}") + return + asyncio.gather(stripeReader.get_event(), return_exceptions=True) + return stream.map(lambda x : x + "\n") \ No newline at end of file