Skip to content

Commit

Permalink
Merge pull request #79 from scramjetorg/feat/slack-addon
Browse files Browse the repository at this point in the history
Slack addon, three different lead states
  • Loading branch information
gzukowski committed Jul 6, 2023
2 parents 284c877 + 1b8ac25 commit 63170fd
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 155 deletions.
3 changes: 2 additions & 1 deletion python/Auth0/config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{

"auth0_query_url" : "https://your_tenant.auth0.com/api/v2/users?per_page=5&sort=created_at%3A-1&fields=created_at%2Cnickname%2Cemail%2Cupdated_at&q=user_metadata.newsletter%3D%22true%22",
"auth0_verified_url" : "https://your_tenant.auth0.com/api/v2/users?per_page=5&sort=created_at%3A-1&fields=created_at%2Cnickname%2Cemail%2Cupdated_at&q=user_metadata.newsletter%3D%22true%22",
"auth0_users_url" :"https://your_tenant.auth0.com/api/v2/users?per_page=5&sort=created_at%3A-1",
"api_url" : "<your api url>",
"request_data" : {"client_id":"<your client id>","client_secret":"<your client secret>","audience":"<your audience>","grant_type":"client_credentials"}

Expand Down
136 changes: 78 additions & 58 deletions python/Auth0/main.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,91 @@
import asyncio
from scramjet.streams import Stream
import requests
import requests_async
import json
from shiftarray import ShiftArray

class ShiftArray:
def __init__(self):
self.array = []
provides = {"provides": "pipe", "contentType": "text/plain"}

def append(self, value):
if len(self.array) == 5:
self.array.pop(-1)
self.array.append(value)
WAIT_TIME_ON_USER = 5
WAIT_TIME_ERROR = 3

def contains(self, value):
return value in self.array
class Auth0:
def __init__(self, verified_url, users_url, api_url, request_data, stream):
self.verified_url = verified_url
self.users_url = users_url
self.api_url = api_url
self.request_data = request_data
self.token_header = {
"content-type": "application/json",
}
self.stream = stream

def get_values(self):
return self.array
def lookahead(self, iterable):
it = iter(iterable)
last = next(it)
for val in it:
yield last, True
last = val
yield last, False

async def get_auth(self):
last_verified = ""
last_user = ""
buffer_verified = ShiftArray()
buffer_users = ShiftArray()
response = requests.post(
self.api_url, headers=self.token_header, data=self.request_data
)
token = json.loads(response.text)["access_token"]
while True:
headers = {"authorization": f"Bearer {token}"}
verified = await requests_async.get(self.verified_url, headers=headers)
users = await requests_async.get(self.users_url, headers=headers)
if verified.status_code != 200:
await asyncio.sleep(WAIT_TIME_ERROR)
response = requests.post(
self.api_url, headers=self.token_header, data=self.request_data
)
token = json.loads(response.text)["access_token"]
continue
verified = verified.json()
users = users.json()
if verified[-1]["email"] != last_verified:
for result, has_more in self.lookahead(verified):
if not buffer_verified.contains(result["email"]):
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-newsletter"
)
buffer_verified.append(result["email"])
if not has_more:
last_verified = result["email"]

def lookahead(iterable):
it = iter(iterable)
last = next(it)
for val in it:
yield last, True
last = val
yield last, False
if users[-1]["email"] != last_user:
for result, has_more in self.lookahead(users):
if not buffer_users.contains(result["email"]):
self.stream.write(
result["email"] + " " + result["nickname"] + " auth0-user"
)
buffer_users.append(result["email"])
if not has_more:
last_user = result["email"]
await asyncio.sleep(WAIT_TIME_ON_USER)

provides = {
'provides': 'pipe',
'contentType': 'text/plain'
}

async def get_auth(stream):
token_header = {'content-type': 'application/json',}
last = ""
buffer = ShiftArray()
response = requests.post(run.api_url, headers=token_header, data=run.data)
token = json.loads(response.text)['access_token']

while True:
headers = {'authorization' : f"Bearer {token}"}
users = requests.get(run.query, headers=headers).json()

if "error" in str(users):
response = requests.post(run.api_url, headers=token_header, data=run.data)
token = json.loads(response.text)['access_token']
continue

if users[-1]['email'] != last:
for result, has_more in lookahead(users):
if not buffer.contains(result['email']):
stream.write(result['email'] +" "+result['nickname']+" auth0-user")
buffer.append(result['email'])
if not has_more:
last = result['email']
await asyncio.sleep(5)

async def run(context, input):
config = context.config
stream = Stream()
try:
run.query = config['auth0_query_url']
run.api_url = config['api_url']
run.data = json.dumps(config['request_data'])
asyncio.gather(get_auth(stream), return_exceptions=True)
except Exception as error:
raise Exception(f"Config not loaded: {error}")
return
return stream.map(lambda x : x + '\n')
config = context.config
stream = Stream()
try:
run.verified_url = config["auth0_verified_url"]
run.users_url = config["auth0_users_url"]
run.api_url = config["api_url"]
run.data = json.dumps(config["request_data"])
except Exception as error:
raise Exception(f"Config not loaded: {error}")

asyncio.gather(
Auth0(run.verified_url,run.users_url ,run.api_url, run.data, stream).get_auth(),
return_exceptions=True,
)
return stream.map(lambda x: x + "\n")
1 change: 1 addition & 0 deletions python/Auth0/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
scramjet-framework-py
requests
requests_async
18 changes: 18 additions & 0 deletions python/Auth0/shiftarray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class ShiftArray:
def __init__(self, length=5):
self.array = []
self.length = length

def append(self, value):
if len(self.array) == self.length:
self.array.pop(-1)
self.array.append(value)

def contains(self, value):
return value in self.array

def get_values(self):
return self.array

def __str__(self):
return str(self.array)
8 changes: 5 additions & 3 deletions python/ChimpInsert/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
{
"mailchimp_api" : "<your_api_key>",
"audience_id" : "<your_audience>",
"mailchimp_server" : "<your_server>"
"slack_api_url" : "",
"slack_channel_id" : "",
"mailchimp_api" : "",
"audience_id" : "",
"mailchimp_server" : ""
}
138 changes: 80 additions & 58 deletions python/ChimpInsert/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,70 +6,92 @@
from mailchimp_marketing import Client
from mailchimp_marketing.api_client import ApiClientError


requires = {
'requires': 'pipe',
'contentType': 'text/plain'
}

mailchimp = Client()

def get_offset(audience_id):
response = mailchimp.lists.get_list(audience_id)['stats']
users_num = response['member_count'] + response['unsubscribe_count']
if users_num < 5:
return 0
else:
return users_num-5
WAIT_TIME_ERROR = 3

def get_info(info, given):
for member in info['members']:
if given == member["email_address"]:
return member['id']
class ChimpInsert:
def __init__(self, audience_id, config, slack_channel_id, slack_api_url, logger):
self.mailchimp = Client()
self.audience_id = audience_id
self.mailchimp.set_config(config)
self.slack_channel_id = slack_channel_id
self.slack_api_url = slack_api_url
self.token_header = {'content-type': 'application/json',}
self.logger = logger

def get_offset(self, audience_id):
response = self.mailchimp.lists.get_list(audience_id)['stats']
users_num = response['member_count'] + response['unsubscribe_count']
if users_num < 5:
return 0
else:
return users_num-5

def get_info(self, info, given):
for member in info['members']:
if given == member["email_address"]:
return member['id']
return -1

def insert_info(self, info):
try:
email, fname, lname = info.split(" ")
except:
self.logger.info("Bad data received at topic")

async def insert_info(info):
try:
email, fname, lname = info.split(" ")
except:
print("No data received. start")
response = mailchimp.ping.get()
try:
member_info = {
"email_address": email,
"status":"unsubscribed",
"merge_fields" : {
"FNAME":fname,
"LNAME":lname.replace("\n", "")
}}
try:
response = mailchimp.lists.add_list_member(run.audience_id, member_info)
print(f"{email} Auth0 user successfully added")
except ApiClientError as error:
error = json.loads(error.text)
if error["title"] == "Member Exists":
try:
response = mailchimp.lists.get_list_members_info(run.audience_id, offset=get_offset(run.audience_id))
user_id = get_info(response, email)
if "stripe" in lname:
response = mailchimp.lists.update_list_member_tags(run.audience_id, user_id, {"tags" : [{"name": "Stripe", "status": "active"}]})
print(f"{email} Stripe user successfully synchronized")
except ApiClientError as err:
print("Error")
except:
print("No data received.")

try:
member_info = {
"email_address": email,
"status":"unsubscribed",
"merge_fields" : {
"FNAME":fname,
"LNAME":lname.replace("\n", "")
}}
if "auth0-user" in lname:
try:
response = self.mailchimp.lists.add_list_member(self.audience_id, member_info)
self.logger.info("Auth0 user successfully added")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user successfully added"}))
except ApiClientError as error:
return
elif "auth0-newsletter" in lname:
try:
member_info['status'] = "subscribed"
response = self.mailchimp.lists.add_list_member(self.audience_id, member_info)
self.logger.info(f"{email} Auth0 user with newsletter successfully added")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Auth0 user with newsletter successfully added"}))
except ApiClientError as error:
error = json.loads(error.text)
if error["title"] == "Member Exists":
response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id))
user_id = self.get_info(response, email)
if user_id == -1:
return
response = self.mailchimp.update_list_member(self.audience_id, user_id, {"status" : "subscribed"})
self.logger.info(f"{email} Auth0 user with newsletter successfully added")
elif "stripe" in lname:
response = self.mailchimp.lists.get_list_members_info(self.audience_id, offset=self.get_offset(self.audience_id))
user_id = self.get_info(response, email)
if user_id == -1:
return
response = self.mailchimp.lists.update_list_member_tags(self.audience_id, user_id, {"tags" : [{"name": "Stripe", "status": "active"}]})
self.logger.info(f"{email} Stripe user successfully synchronized")
slack_message_resp = requests.post(self.slack_api_url, headers=self.token_header, data=str({"text":f"{email} Stripe user successfully synchronized"}))
except:
self.logger.info("No data received")

async def run(context, input):
try:
run.audience_id = context.config['audience_id']
mailchimp.set_config({
"api_key": context.config['mailchimp_api'],
"server": context.config['mailchimp_server'],
})
except Exception as error:
raise Exception(f"Config not loaded: {error}")
return
return input.each(insert_info)



try:
slack_channel_id = context.config['slack_channel_id']
slack_api_url = context.config['slack_api_url']
audience_id = context.config['audience_id']
config = {"api_key": context.config['mailchimp_api'], "server": context.config['mailchimp_server']}
except Exception as error:
raise Exception(f"Config not loaded: {error}")

inserter = ChimpInsert(audience_id, config, slack_channel_id, slack_api_url, context.logger)
return input.each(inserter.insert_info)
Loading

0 comments on commit 63170fd

Please sign in to comment.