Skip to content

Commit

Permalink
Changed the way of getting the refresh token
Browse files Browse the repository at this point in the history
  • Loading branch information
gzukowski committed Jan 18, 2024
1 parent eb5e36c commit 8f0dc20
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 83 deletions.
98 changes: 66 additions & 32 deletions python/Auth0/main.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
import asyncio
from scramjet.streams import Stream
import requests
import requests_async
import json
import enum
import asyncio
from shiftarray import ShiftArray
from scramjet.streams import Stream

provides = {"provides": "pipe", "contentType": "text/plain"}

WAIT_TIME_ON_USER = 5
WAIT_TIME_ERROR = 3

TOKEN_HEADER = {"content-type": "application/json"}

class TokenRefreshResult(enum.Enum):
SUCCESS = 200
FAILURE = 0


class Auth0:
def __init__(self, verified_url, users_url, api_url, request_data, stream):
def __init__(self, verified_url, users_url, api_url, request_data, stream, logger):
self.verified_url = verified_url
self.users_url = users_url
self.api_url = api_url
self.request_data = request_data
self.token_header = {
"content-type": "application/json",
}
self.token_payload = request_data
self.stream = stream
self.logger = logger

def lookahead(self, iterable):
it = iter(iterable)
Expand All @@ -29,63 +34,92 @@ def lookahead(self, iterable):
last = val
yield last, False


async def refresh_token(self):
response = requests.post(
self.api_url, headers=TOKEN_HEADER, data=self.token_payload
)
if response.status_code == TokenRefreshResult.SUCCESS.value:
self.logger.info("Auth0: Token succesfully refreshed!")
return TokenRefreshResult.SUCCESS.value, json.loads(response.text)["access_token"]

self.logger.error("Auth0: Refreshing the token has failed!")
return TokenRefreshResult.FAILURE.value, None


async def get_auth(self):
last_verified = ""
last_user = ""

code, token = await self.refresh_token()

if code != 200:
raise Exception(f"Auth0: Refreshing the token has failed!")

self.logger.info(f"Auth0: Succesfully initiated the token - code: {code}, token: {token}")

last_verified = str()
last_user = str()
buffer_verified = ShiftArray()
buffer_users = ShiftArray()
response = requests.post(
self.api_url, headers=self.token_header, data=self.request_data
)
token = json.loads(response.text)["access_token"]

while True:
headers = {"authorization": f"Bearer {token}"}
verified = await requests_async.get(self.verified_url, headers=headers)
users = await requests_async.get(self.users_url, headers=headers)
if verified.status_code != 200:

try:
verified = requests.get(self.verified_url, headers=headers)
users = requests.get(self.users_url, headers=headers)
except Exception as error:
raise Exception(f"Auth0: Failed to make a request: {error}")


if users.status_code != 200:
await asyncio.sleep(WAIT_TIME_ERROR)
response = requests.post(
self.api_url, headers=self.token_header, data=self.request_data
)
token = json.loads(response.text)["access_token"]
self.logger.info("Auth0: Token's refresh was forced.")

code, token = await self.refresh_token()
continue

verified = verified.json()
users = users.json()
if verified[-1]["email"] != last_verified:
for result, has_more in self.lookahead(verified):
if not buffer_verified.contains(result["email"]):
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-newsletter"
)
buffer_verified.append(result["email"])
if not has_more:
last_verified = result["email"]


if users[-1]["email"] != last_user:
for result, has_more in self.lookahead(users):
if not buffer_users.contains(result["email"]):
self.logger.info(f"Auth0: New user registered in auth0: {result['email']}")
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-user"
)
buffer_users.append(result["email"])
if not has_more:
last_user = result["email"]

if verified[-1]["email"] != last_verified:
for result, has_more in self.lookahead(verified):
if not buffer_verified.contains(result["email"]):
self.logger.info(f"Auth0: User allowed the newsletter: {result['email']}")
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-newsletter"
)
buffer_verified.append(result["email"])
if not has_more:
last_verified = result["email"]
await asyncio.sleep(WAIT_TIME_ON_USER)


async def run(context, input):
config = context.config
stream = Stream()

try:
run.verified_url = config["auth0_verified_url"]
run.users_url = config["auth0_users_url"]
run.api_url = config["api_url"]
run.data = json.dumps(config["request_data"])
except Exception as error:
raise Exception(f"Config not loaded: {error}")
raise Exception(f"Auth0: Config not loaded: {error}")

asyncio.gather(
Auth0(run.verified_url,run.users_url ,run.api_url, run.data, stream).get_auth(),
Auth0(run.verified_url, run.users_url , run.api_url, run.data, stream, context.logger).get_auth(),
return_exceptions=True,
)
return stream.map(lambda x: x + "\n")
104 changes: 63 additions & 41 deletions python/ChimpInsert/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import requests
import asyncio
import json
import enum
from scramjet.streams import Stream
import mailchimp_marketing
from mailchimp_marketing import Client
Expand All @@ -13,6 +14,12 @@

WAIT_TIME_ERROR = 3

class TopicInfo(enum.Enum):
STRIPE = "stripe"
AUTH0 = "auth0-user"
A0_NEWSLETTER = "auth0-newsletter"


class ChimpInsert:
def __init__(self, audience_id, config, slack_channel_id, slack_api_url, logger):
self.mailchimp = Client()
Expand All @@ -36,53 +43,68 @@ def get_info(self, info, given):
if given == member["email_address"]:
return member['id']
return -1

def insert_info(self, info):
try:
email, fname, lname = info.split(" ")
except:
self.logger.info("Bad data received at topic")

try:
member_info = {

def prepare_member_info(self, email, fname, lname):
member_info = {
"email_address": email,
"status":"unsubscribed",
"merge_fields" : {
"FNAME":fname,
"LNAME":lname.replace("\n", "")
}}
if "auth0-user" in lname:
try:
response = self.mailchimp.lists.add_list_member(self.audience_id, member_info)
self.logger.info("Auth0 user successfully added")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user successfully added"}))
except ApiClientError as error:
return
elif "auth0-newsletter" in lname:
try:
member_info['status'] = "subscribed"
response = self.mailchimp.lists.add_list_member(self.audience_id, member_info)
self.logger.info(f"{email} Auth0 user with newsletter successfully added")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user with newsletter successfully added"}))
except ApiClientError as error:
error = json.loads(error.text)
if error["title"] == "Member Exists":
response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id))
user_id = self.get_info(response, email)
if user_id == -1:
return
response = self.mailchimp.update_list_member(self.audience_id, user_id, {"status" : "subscribed"})
self.logger.info(f"{email} Auth0 user with newsletter successfully added")
elif "stripe" in lname:
response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id))
user_id = self.get_info(response, email)
if user_id == -1:
return
response = self.mailchimp.lists.update_list_member_tags(self.audience_id, user_id, {"tags" : [{"name": "Stripe", "status": "active"}]})
self.logger.info(f"{email} Stripe user successfully synchronized")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Stripe user successfully synchronized"}))
return member_info

def insert_info(self, info):

try:
email, fname, lname = info.split(" ")
except ValueError:
self.logger.info("ChimpInsert: Bad data received at topic")
return

member_info = {}

try:
member_info = self.prepare_member_info(email, fname, lname)
except:
self.logger.info("No data received")
self.logger.info("ChimpInsrt: No data received")
return


if TopicInfo.AUTH0.value in lname:
try:
response = self.mailchimp.lists.add_list_member(self.audience_id, member_info)
self.logger.info("ChimpInsert: Auth0 user successfully added")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user successfully added"}))
except ApiClientError as error:
return


elif TopicInfo.A0_NEWSLETTER.value in lname:
try:
member_info['status'] = "subscribed"
response = self.mailchimp.lists.add_list_member(self.audience_id, member_info)
self.logger.info(f"ChimpInsert: {email} Auth0 user with newsletter successfully added")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user with newsletter successfully added"}))
except ApiClientError as error:
error = json.loads(error.text)
if error["title"] == "Member Exists":
response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id))
user_id = self.get_info(response, email)
if user_id == -1:
return
response = self.mailchimp.update_list_member(self.audience_id, user_id, {"status" : "subscribed"})
self.logger.info(f"ChimpInsert: {email} Auth0 user with newsletter successfully added")

elif TopicInfo.AUTH0.stripe in lname:
response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id))
user_id = self.get_info(response, email)
if user_id == -1:
return
response = self.mailchimp.lists.update_list_member_tags(self.audience_id, user_id, {"tags" : [{"name": "Stripe", "status": "active"}]})
self.logger.info(f"ChimpInsert: {email} Stripe user successfully synchronized")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Stripe user successfully synchronized"}))


async def run(context, input):
try:
Expand All @@ -91,7 +113,7 @@ async def run(context, input):
audience_id = context.config['audience_id']
config = {"api_key": context.config['mailchimp_api'], "server": context.config['mailchimp_server']}
except Exception as error:
raise Exception(f"Config not loaded: {error}")
raise Exception(f"ChimpInsert: Config not loaded: {error}")

inserter = ChimpInsert(audience_id, config, slack_channel_id, slack_api_url, context.logger)
return input.each(inserter.insert_info)
27 changes: 17 additions & 10 deletions python/Stripe/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
import asyncio
from scramjet.streams import Stream

EVENT_REFRESH_DELAY = 3

provides = {
'provides': 'pipe',
'contentType': 'text/plain'
}

class Stripe:
def __init__(self,stream,stripe_api):
def __init__(self, stream, stripe_api, logger):
self.stream = stream
self.stripe_api = stripe_api
self.logger = logger

def get_mail(self, user):
return user['data']['object']['email']
Expand All @@ -23,25 +26,29 @@ def get_fullname(self, user):
return name

async def get_event(self):
await asyncio.sleep(3)
compared = stripe.Event.list(type="customer.created")['data'][-1]

self.stream.write(self.get_mail(compared)+ " " +self.get_fullname(compared))
self.logger.info(f"New user in stripe {self.get_mail(compared)}")
self.stream.write(self.get_mail(compared)+ " " + self.get_fullname(compared))

compared = compared['id']

while True:
test = stripe.Event.list(type="customer.created", ending_before=compared, limit=3)['data']
events = stripe.Event.list(type="customer.created", ending_before=compared, limit=3)['data']

for i in range(len(events)):
self.logger.info(f"New user in stripe {self.get_mail(events[i])}")
self.stream.write(self.get_mail(events[i]) + " " + self.get_fullname(events[i]))
if len(events) != 0:
compared = events[0]['id']

for i in range(len(test)):
self.stream.write(self.get_mail(test[i]) + " " +self.get_fullname(test[i]))
if len(test) != 0:
compared = test[0]['id']
await asyncio.sleep(3)
await asyncio.sleep(EVENT_REFRESH_DELAY)

async def run(context, input):
stream = Stream()
try:
stripe.api_key = context.config['stripe_api']
stripeReader = Stripe(stream, stripe.api_key)
stripeReader = Stripe(stream, stripe.api_key, context.logger)
except Exception as error:
raise Exception(f"Config not loaded: {error}")
return
Expand Down

0 comments on commit 8f0dc20

Please sign in to comment.