Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor 2fa #105

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 82 additions & 66 deletions pyaarlo/tfa.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import imaplib
import re
import time
import ssl

import requests

Expand Down Expand Up @@ -38,7 +37,8 @@ def __init__(self, arlo):
self._arlo = arlo
self._imap = None
self._old_ids = None
self._new_ids = None
self._MAIL_COUNT_TO_READ = 10
self._continue_checking_mail_body = True

def start(self):
self._arlo.debug("2fa-imap: starting")
Expand All @@ -48,110 +48,126 @@ def start(self):
self.stop()

try:
# allow default ciphers to be specified
if self._arlo.cfg.default_ciphers:
ctx = ssl.create_default_context()
ctx.set_ciphers('DEFAULT')
self._arlo.debug(f"imap is using DEFAULT ciphers")
else:
ctx = None

self._imap = imaplib.IMAP4_SSL(self._arlo.cfg.tfa_host, port=self._arlo.cfg.tfa_port, ssl_context=ctx)
self._imap = imaplib.IMAP4_SSL(
self._arlo.cfg.tfa_host, port=self._arlo.cfg.tfa_port
)
res, status = self._imap.login(
self._arlo.cfg.tfa_username, self._arlo.cfg.tfa_password
)
if res.lower() != "ok":
self._arlo.debug("imap login failed")
return False
res, status = self._imap.select(mailbox='INBOX', readonly=True)
res, status = self._imap.select()
if res.lower() != "ok":
self._arlo.debug("imap select failed")
return False
res, self._old_ids = self._imap.search(
None, "FROM", "do_not_reply@arlo.com"
)
if res.lower() != "ok":
self._arlo.debug("imap search failed")
return False
except Exception as e:
self._arlo.error(f"imap connection failed{str(e)}")
return False

self._new_ids = self._old_ids
self._arlo.debug("old-ids={}".format(self._old_ids))
if res.lower() == "ok":
return True

return False

def get(self):
self._arlo.debug("2fa-imap: checking")
self._old_ids = None

# give tfa_total_timeout seconds for email to arrive
# Give tfa_total_timeout seconds for email to arrive
start = time.time()
while True:

# wait a short while, stop after a total timeout
# ok to do on first run gives email time to arrive
# Wait a short while, stop after a total timeout
# Ok to do on first run, gives the email time to arrive
time.sleep(self._arlo.cfg.tfa_timeout)
if time.time() > (start + self._arlo.cfg.tfa_total_timeout):
return None

try:
# grab new email ids
# Get all mail ids
self._imap.check()
res, self._new_ids = self._imap.search(
None, "FROM", "do_not_reply@arlo.com"
)
self._arlo.debug("2fa-imap: new-ids={}".format(self._new_ids))
if self._new_ids == self._old_ids:
self._arlo.debug("2fa-imap: no change in emails")
res, all_mail_ids = self._imap.search(None, "ALL")

# Only keep the last self._MAIL_COUNT_TO_READ
recent_ids = all_mail_ids[0].split()
recent_ids.reverse()
del recent_ids[self._MAIL_COUNT_TO_READ:]
self._arlo.debug("2fa-imap: recent_ids={}".format(recent_ids))

# If we are looking at the same list of mails then wait for new ones
if self._old_ids == recent_ids and not self._continue_checking_mail_body:
continue
# After this iteration do not check the mails if they are from Arlo
# No need to check because the mails are the same as last time we checked
self._continue_checking_mail_body = False

# Use list comprehension to find the last received Arlo mail and check if it contains a code
# If we find the code in one of the mails then save it, otherwise save None
mail_id = next((x for x in recent_ids if self.is_arlo_mail(x) and self.get_code(x) is not None), None)

# If we found the code then return it and delete the mail
# We should not use unnecessary space in the inbox
if mail_id is not None:
code = self.get_code(mail_id)
self._arlo.debug("2fa-imap: code={}".format(code))
self.delete_mail(mail_id)
return code

# Set the self._old_ids to the mail ids we just searched
self._arlo.debug("2fa-imap: Setting old value")
self._old_ids = recent_ids

# new message...
old_ids = self._old_ids[0].split()
for msg_id in self._new_ids[0].split():

# seen it?
if msg_id in old_ids:
continue

# New message. Look at all the parts and try to grab the code, if we catch an exception
# just move onto the next part.
self._arlo.debug("2fa-imap: new-msg={}".format(msg_id))
res, msg = self._imap.fetch(msg_id, "(BODY[TEXT])")
if isinstance(msg[0][1], bytes):
for part in email.message_from_bytes(msg[0][1]).walk():
try:
for line in part.get_payload(decode=True).splitlines():
# match code in email, this might need some work if the email changes
code = re.match(r"^\W*(\d{6})\W*$", line.decode())
if code is not None:
self._arlo.debug(
"2fa-imap: code={}".format(code.group(1))
)
return code.group(1)
except:
self._arlo.debug("trying next part")

# update old so we don't keep trying new
self._old_ids = self._new_ids

# problem parsing the message, force a fail
# Problem parsing the message, force a fail
except Exception as e:
self._arlo.error(f"imap message read failed{str(e)}")
return None

return None

# Fetch the mail and look at the sender, if Arlo the return True, otherwise False
def is_arlo_mail(self, mail_id):
self._arlo.debug("2fa-imap: Checking if mail is Arlo mail, mail id: " + format(mail_id))
res, msg = self._imap.fetch(mail_id, "(RFC822)")
if isinstance(msg[0][1], bytes):
Copy link

@Jopand Jopand Nov 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, I have to use msg[1][1] in line 131 and 132 with my yahoo mail. Mentioning it in #96. No idea why it works differently if you've also tested with a yahoo mail

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a printout of msg[0][1] and msg[0][1]..

msg[0][1]: 50

... nothing else...

msg[1][1]:
b'Received: from 127.0.0.1\r\n by atlas-production.v2-mail-prod1-gq1.omega.yahoo.com pod-id atlas--production-gq1-6db95f6ddc-xz57w.gq1.yahoo.com with HTTP..... + a lot more. LOOOOOONG string containing the entire mail

message = email.message_from_bytes(msg[0][1])
self._arlo.debug("2fa-imap: From: {}".format(message.get("From")))
sender = message.get("From")
if sender.find("Arlo") != -1:
self._arlo.debug("2fa-imap: Arlo mail found")
return True
else:
# If we did not read the mail successfully then try again
self._continue_checking_mail_body = True
return False

# This is only called if we have found a mail we need to check for a valid code in
# Fetch the mail and loop through each line in the body to look for a line with a valid code
# Return the code if found, otherwise None
def get_code(self, mail_id):
res, msg = self._imap.fetch(mail_id, "(BODY[])")
if isinstance(msg[0][1], bytes):
for part in email.message_from_bytes(msg[0][1]).walk():
if part.get_content_type() == "text/html":
for line in part.get_payload(decode=True).splitlines():
# Match code in email, this might need some work if the email changes
code = re.match(r"^\W*(\d{6})\W*$", line.decode())
if code is not None:
return code.group(1)
return None

# This is only called if we have extracted the code we need
# Delete the mail with the code in
def delete_mail(self, mail_id):
self._imap.store(mail_id, '+FLAGS', '\\Deleted')
self._arlo.debug("2fa-imap: Deleted mail with code")

def stop(self):
self._arlo.debug("2fa-imap: stopping")

self._imap.close()
self._imap.logout()
self._imap = None
if self._imap is not None:
self._imap.close()
self._imap.logout()
self._imap = None
self._old_ids = None
self._new_ids = None


class Arlo2FARestAPI:
Expand Down