Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slack addon, three different lead states #79

Merged
merged 8 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/Auth0/config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{

"auth0_query_url" : "https://your_tenant.auth0.com/api/v2/users?per_page=5&sort=created_at%3A-1&fields=created_at%2Cnickname%2Cemail%2Cupdated_at&q=user_metadata.newsletter%3D%22true%22",
"auth0_verified_url" : "https://your_tenant.auth0.com/api/v2/users?per_page=5&sort=created_at%3A-1&fields=created_at%2Cnickname%2Cemail%2Cupdated_at&q=user_metadata.newsletter%3D%22true%22",
"auth0_users_url" :"https://your_tenant.auth0.com/api/v2/users?per_page=5&sort=created_at%3A-1",
"api_url" : "<your api url>",
"request_data" : {"client_id":"<your client id>","client_secret":"<your client secret>","audience":"<your audience>","grant_type":"client_credentials"}

Expand Down
130 changes: 73 additions & 57 deletions python/Auth0/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,86 @@
from scramjet.streams import Stream
import requests
import json
from shiftarray import ShiftArray

class ShiftArray:
def __init__(self):
self.array = []
provides = {"provides": "pipe", "contentType": "text/plain"}

def append(self, value):
if len(self.array) == 5:
self.array.pop(-1)
self.array.append(value)

def contains(self, value):
return value in self.array
class Auth0:
def __init__(self, verified_url, users_url, api_url, request_data, stream):
self.verified_url = verified_url
self.users_url = users_url
self.api_url = api_url
self.request_data = request_data
self.token_header = {
"content-type": "application/json",
}
self.stream = stream

def get_values(self):
return self.array
def lookahead(self, iterable):
it = iter(iterable)
last = next(it)
for val in it:
yield last, True
last = val
yield last, False

async def get_auth(self):
last_verified = ""
last_user = ""
buffer_verified = ShiftArray()
buffer_users = ShiftArray()
response = requests.post(
self.api_url, headers=self.token_header, data=self.request_data
)
token = json.loads(response.text)["access_token"]
while True:
headers = {"authorization": f"Bearer {token}"}
verified = requests.get(self.verified_url, headers=headers).json()
users = requests.get(self.users_url, headers=headers).json()
if "error" in str(verified):
response = requests.post(
self.api_url, headers=self.token_header, data=self.request_data
)
token = json.loads(response.text)["access_token"]
continue
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a asyncio.sleep(WAIT_TIME_ERROR) here - hold back on error (also define a constant near the top of the file).


def lookahead(iterable):
it = iter(iterable)
last = next(it)
for val in it:
yield last, True
last = val
yield last, False
if verified[-1]["email"] != last_verified:
for result, has_more in self.lookahead(verified):
if not buffer_verified.contains(result["email"]):
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-newsletter"
)
buffer_verified.append(result["email"])
if not has_more:
last_verified = result["email"]

provides = {
'provides': 'pipe',
'contentType': 'text/plain'
}
if users[-1]["email"] != last_user:
for result, has_more in self.lookahead(users):
if not buffer_users.contains(result["email"]):
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-user"
)
buffer_users.append(result["email"])
if not has_more:
last_user = result["email"]
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to keep the update time instead of email - users may change and we could just push the changed users as well, but email will do for now.

await asyncio.sleep(5)
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await asyncio.sleep(5)
await asyncio.sleep(WAIT_TIME_ON_USER)


async def get_auth(stream):
token_header = {'content-type': 'application/json',}
last = ""
buffer = ShiftArray()
response = requests.post(run.api_url, headers=token_header, data=run.data)
token = json.loads(response.text)['access_token']

while True:
headers = {'authorization' : f"Bearer {token}"}
users = requests.get(run.query, headers=headers).json()

if "error" in str(users):
response = requests.post(run.api_url, headers=token_header, data=run.data)
token = json.loads(response.text)['access_token']
continue

if users[-1]['email'] != last:
for result, has_more in lookahead(users):
if not buffer.contains(result['email']):
stream.write(result['email'] +" "+result['nickname']+" auth0-user")
buffer.append(result['email'])
if not has_more:
last = result['email']
await asyncio.sleep(5)

async def run(context, input):
config = context.config
stream = Stream()
try:
run.query = config['auth0_query_url']
run.api_url = config['api_url']
run.data = json.dumps(config['request_data'])
asyncio.gather(get_auth(stream), return_exceptions=True)
except Exception as error:
raise Exception(f"Config not loaded: {error}")
return
return stream.map(lambda x : x + '\n')
config = context.config
stream = Stream()
try:
run.verified_url = config["auth0_verified_url"]
run.users_url = config["auth0_users_url"]
run.api_url = config["api_url"]
run.data = json.dumps(config["request_data"])

asyncio.gather(
Auth0(run.verified_url,run.users_url ,run.api_url, run.data, stream).get_auth(),
return_exceptions=True,
)
except Exception as error:
raise Exception(f"Config not loaded: {error}")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return statement is not needed after raising an Exception here (same for chimp and Stripe). Its message "Config not loaded:" suggests that there was a problem with the config, but a different, unrelated exception might be raised by get_auth() method.

return stream.map(lambda x: x + "\n")
8 changes: 5 additions & 3 deletions python/ChimpInsert/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{
"mailchimp_api" : "<your_api_key>",
"audience_id" : "<your_audience>",
"mailchimp_server" : "<your_server>"
"slack_api_url" : "",
"slack_channel_id" : "",
"mailchimp_api" : "",
"audience_id" : "",
"mailchimp_server" : ""
}
137 changes: 78 additions & 59 deletions python/ChimpInsert/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,70 +6,89 @@
from mailchimp_marketing import Client
from mailchimp_marketing.api_client import ApiClientError


requires = {
'requires': 'pipe',
'contentType': 'text/plain'
}

mailchimp = Client()

def get_offset(audience_id):
response = mailchimp.lists.get_list(audience_id)['stats']
users_num = response['member_count'] + response['unsubscribe_count']
if users_num < 5:
return 0
else:
return users_num-5

def get_info(info, given):
for member in info['members']:
if given == member["email_address"]:
return member['id']
class ChimpInsert:
def __init__(self, audience_id, config, slack_channel_id,slack_api_url):
self.mailchimp = Client()
self.audience_id = audience_id
self.mailchimp.set_config(config)
self.slack_channel_id = slack_channel_id
self.slack_api_url = slack_api_url
self.token_header = {'content-type': 'application/json',}

def get_offset(self, audience_id):
response = self.mailchimp.lists.get_list(audience_id)['stats']
users_num = response['member_count'] + response['unsubscribe_count']
if users_num < 5:
return 0
else:
return users_num-5

def get_info(self, info, given):
for member in info['members']:
if given == member["email_address"]:
return member['id']
return -1

def insert_info(self, info):
try:
email, fname, lname = info.split(" ")
except:
print("No data received. start")

async def insert_info(info):
try:
email, fname, lname = info.split(" ")
except:
print("No data received. start")
response = mailchimp.ping.get()
try:
member_info = {
"email_address": email,
"status":"unsubscribed",
"merge_fields" : {
"FNAME":fname,
"LNAME":lname.replace("\n", "")
}}
try:
response = mailchimp.lists.add_list_member(run.audience_id, member_info)
print(f"{email} Auth0 user successfully added")
except ApiClientError as error:
error = json.loads(error.text)
if error["title"] == "Member Exists":
try:
response = mailchimp.lists.get_list_members_info(run.audience_id, offset=get_offset(run.audience_id))
user_id = get_info(response, email)
if "stripe" in lname:
response = mailchimp.lists.update_list_member_tags(run.audience_id, user_id, {"tags" : [{"name": "Stripe", "status": "active"}]})
print(f"{email} Stripe user successfully synchronized")
except ApiClientError as err:
print("Error")
except:
print("No data received.")

try:
member_info = {
"email_address": email,
"status":"unsubscribed",
"merge_fields" : {
"FNAME":fname,
"LNAME":lname.replace("\n", "")
}}
if "auth0-user" in lname:
try:
response = self.mailchimp.lists.add_list_member(self.audience_id, member_info)
print(f"{email} Auth0 user successfully added")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user successfully added"}))
except ApiClientError as error:
return
elif "auth0-newsletter" in lname:
try:
member_info['status'] = "subscribed"
response = self.mailchimp.lists.add_list_member(self.audience_id, member_info)
print(f"{email} Auth0 user with newsletter successfully added")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user with newsletter successfully added"}))
except ApiClientError as error:
error = json.loads(error.text)
if error["title"] == "Member Exists":
response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id))
user_id = self.get_info(response, email)
if user_id == -1:
return
response = self.mailchimp.update_list_member(self.audience_id, user_id, {"status" : "subscribed"})
print(f"{email} Auth0 user with newsletter successfully added")
elif "stripe" in lname:
response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id))
user_id = self.get_info(response, email)
if user_id == -1:
return
response = self.mailchimp.lists.update_list_member_tags(self.audience_id, user_id, {"tags" : [{"name": "Stripe", "status": "active"}]})
print(f"{email} Stripe user successfully synchronized")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Stripe user successfully synchronized"}))
except:
print("No data received.")

async def run(context, input):
try:
run.audience_id = context.config['audience_id']
mailchimp.set_config({
"api_key": context.config['mailchimp_api'],
"server": context.config['mailchimp_server'],
})
except Exception as error:
raise Exception(f"Config not loaded: {error}")
return
return input.each(insert_info)



try:
slack_channel_id = context.config['slack_channel_id']
slack_api_url = context.config['slack_api_url']
audience_id = context.config['audience_id']
config = {"api_key": context.config['mailchimp_api'], "server": context.config['mailchimp_server']}
inserter = ChimpInsert(audience_id, config, slack_channel_id, slack_api_url)
except Exception as error:
raise Exception(f"Config not loaded: {error}")
return
return input.each(inserter.insert_info)