Skip to content

Commit

Permalink
arlo: update to 0.8.4 beta (#923)
Browse files Browse the repository at this point in the history
* basestation debugging output

* faster 2 way startup

* fix type annotation

* separate thread for logging server + bump scrypted-arlo-go

* update backup auth hosts

* bump 0.8.1 for release

* further optimize 2 way startup latency

* bump 0.8.2 for release

* skip pings on battery doorbell

* bump 0.8.3 for beta

* more docs

* try fix cloudflare 403 with curl-cffi

* bump 0.8.4 for beta
  • Loading branch information
bjia56 committed Jul 3, 2023
1 parent f10cdfb commit 8eb533c
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 50 deletions.
2 changes: 1 addition & 1 deletion plugins/arlo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ If you experience any trouble logging in, clear the username and password boxes,
* It is highly recommended to enable the Rebroadcast plugin to allow multiple downstream plugins to pull the video feed within Scrypted.
* If there is no audio on your camera, switch to the `FFmpeg (TCP)` parser under the `Cloud RTSP` settings.
* Prebuffering should only be enabled if the camera is wired to a persistent power source, such as a wall outlet. Prebuffering will only work if your camera does not have a battery or `Plugged In to External Power` is selected.
* The plugin supports pulling RTSP or DASH streams from Arlo Cloud. It is recommended to use RTSP for the lowest latency streams. DASH is inconsistent in reliability, and may return finicky codecs that require additional FFmpeg output arguments, e.g. `-vcodec h264`.
* The plugin supports pulling RTSP or DASH streams from Arlo Cloud. It is recommended to use RTSP for the lowest latency streams. DASH is inconsistent in reliability, and may return finicky codecs that require additional FFmpeg output arguments, e.g. `-vcodec h264`. *Note that both RTSP and DASH will ultimately pull the same video stream feed from your camera, and they cannot both be used at the same time due to the single stream limitation.*

Note that streaming cameras uses extra Internet bandwidth, since video and audio packets will need to travel from the camera through your network, out to Arlo Cloud, and then back to your network and into Scrypted.

Expand Down
4 changes: 2 additions & 2 deletions plugins/arlo/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugins/arlo/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/arlo",
"version": "0.8.0",
"version": "0.8.4",
"description": "Arlo Plugin for Scrypted",
"keywords": [
"scrypted",
Expand Down
12 changes: 8 additions & 4 deletions plugins/arlo/src/arlo_plugin/arlo/arlo_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def change_stream_class(s_class):
class Arlo(object):
BASE_URL = 'my.arlo.com'
AUTH_URL = 'ocapi-app.arlo.com'
BACKUP_AUTH_HOSTS = ["NTIuMjEyLjIwNS4xNDU="] # list(scrypted_arlo_go.BACKUP_AUTH_HOSTS())
BACKUP_AUTH_HOSTS = ['NTIuMjEwLjMuMTIx', 'MzQuMjU1LjkyLjIxMg==', 'MzQuMjUxLjE3Ny45MA==', 'NTQuMjQ2LjE3MS4x']
TRANSID_PREFIX = 'web'

random.shuffle(BACKUP_AUTH_HOSTS)
Expand All @@ -101,7 +101,7 @@ def __init__(self, username, password):
self.username = username
self.password = password
self.event_stream = None
self.request = Request()
self.request = None

def to_timestamp(self, dt):
if sys.version[0] == '2':
Expand Down Expand Up @@ -150,6 +150,7 @@ def UseExistingAuth(self, user_id, headers):
self.user_id = user_id
headers['Content-Type'] = 'application/json; charset=UTF-8'
headers['User-Agent'] = USER_AGENTS['arlo']
self.request = Request(mode="cloudscraper")
self.request.session.headers.update(headers)
self.BASE_URL = 'myapi.arlo.com'

Expand All @@ -170,7 +171,7 @@ def LoginMFA(self):
'Host': self.AUTH_URL,
}

self.request = Request(mode="cloudscraper")
self.request = Request()
try:
auth_host = self.AUTH_URL
self.request.options(f'https://{auth_host}/api/auth', headers=headers)
Expand Down Expand Up @@ -246,7 +247,7 @@ def complete_auth(code):
if finish_auth_body.get('data', {}).get('token') is None:
raise Exception("Could not complete 2FA, maybe invalid token? If the error persists, please try reloading the plugin and logging in again.")

self.request = Request()
self.request = Request(mode="cloudscraper")

# Update Authorization code with new code
headers = {
Expand Down Expand Up @@ -311,6 +312,9 @@ async def heartbeat(self, basestations, interval=30):
basestation['deviceType'] not in ['doorbell', 'siren', 'arloq', 'arloqs'] and \
basestation['modelId'].lower() not in ['abc1000', 'abc1000a']:
continue
# avd2001 is the battery doorbell, and we don't want to drain its battery, so disable pings
if basestation['modelId'].lower().startswith('avd2001'):
continue
devices_to_ping[basestation['deviceId']] = basestation

logger.info(f"Will send heartbeat to the following devices: {list(devices_to_ping.keys())}")
Expand Down
25 changes: 23 additions & 2 deletions plugins/arlo/src/arlo_plugin/arlo/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,28 @@
# limitations under the License.
##

from functools import partialmethod
import requests
from requests.exceptions import HTTPError
from requests_toolbelt.adapters import host_header_ssl
import cloudscraper
import time
import uuid

from .logging import logger


try:
from curl_cffi import requests as curl_cffi_requests
HAS_CURL_CFFI = True

# upstream curl_cffi doesn't have OPTIONS support, so this is
# a bit of a hack to add it
class CurlCffiSession(curl_cffi_requests.Session):
options = partialmethod(curl_cffi_requests.Session.request, "OPTIONS")
except:
HAS_CURL_CFFI = False

#from requests_toolbelt.utils import dump
#def print_raw_http(response):
# data = dump.dump_all(response, request_prefix=b'', response_prefix=b'')
Expand All @@ -29,14 +44,20 @@
class Request(object):
"""HTTP helper class"""

def __init__(self, timeout=5, mode="cloudscraper"):
if mode == "cloudscraper":
def __init__(self, timeout=5, mode="curl" if HAS_CURL_CFFI else "cloudscraper"):
if mode == "curl":
logger.debug("HTTP helper using curl_cffi")
self.session = CurlCffiSession(impersonate="chrome110")
elif mode == "cloudscraper":
logger.debug("HTTP helper using cloudscraper")
from .arlo_async import USER_AGENTS
self.session = cloudscraper.CloudScraper(browser={"custom": USER_AGENTS["android"]})
elif mode == "ip":
logger.debug("HTTP helper using requests with HostHeaderSSLAdapter")
self.session = requests.Session()
self.session.mount('https://', host_header_ssl.HostHeaderSSLAdapter())
else:
logger.debug("HTTP helper using requests")
self.session = requests.Session()
self.timeout = timeout

Expand Down
27 changes: 23 additions & 4 deletions plugins/arlo/src/arlo_plugin/basestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import List, TYPE_CHECKING

from scrypted_sdk import ScryptedDeviceBase
from scrypted_sdk.types import Device, DeviceProvider, ScryptedInterface, ScryptedDeviceType
from scrypted_sdk.types import Device, DeviceProvider, Setting, SettingValue, Settings, ScryptedInterface, ScryptedDeviceType

from .base import ArloDeviceBase
from .vss import ArloSirenVirtualSecuritySystem
Expand All @@ -13,7 +13,7 @@
from .provider import ArloProvider


class ArloBasestation(ArloDeviceBase, DeviceProvider):
class ArloBasestation(ArloDeviceBase, DeviceProvider, Settings):
MODELS_WITH_SIRENS = [
"vmb4000",
"vmb4500"
Expand All @@ -29,7 +29,10 @@ def has_siren(self) -> bool:
return any([self.arlo_device["modelId"].lower().startswith(model) for model in ArloBasestation.MODELS_WITH_SIRENS])

def get_applicable_interfaces(self) -> List[str]:
return [ScryptedInterface.DeviceProvider.value]
return [
ScryptedInterface.DeviceProvider.value,
ScryptedInterface.Settings.value,
]

def get_device_type(self) -> str:
return ScryptedDeviceType.DeviceProvider.value
Expand Down Expand Up @@ -68,4 +71,20 @@ def get_or_create_vss(self) -> ArloSirenVirtualSecuritySystem:
vss_id = f'{self.arlo_device["deviceId"]}.vss'
if not self.vss:
self.vss = ArloSirenVirtualSecuritySystem(vss_id, self.arlo_device, self.arlo_basestation, self.provider, self)
return self.vss
return self.vss

async def getSettings(self) -> List[Setting]:
return [
{
"group": "General",
"key": "print_debug",
"title": "Debug Info",
"description": "Prints information about this device to console.",
"type": "button",
}
]

async def putSetting(self, key: str, value: SettingValue) -> None:
if key == "print_debug":
self.logger.info(f"Device Capabilities: {self.arlo_capabilities}")
await self.onDeviceEvent(ScryptedInterface.Settings.value, None)
114 changes: 79 additions & 35 deletions plugins/arlo/src/arlo_plugin/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import socket
import time
import threading
from typing import List, TYPE_CHECKING

import scrypted_arlo_go
Expand Down Expand Up @@ -36,10 +37,10 @@ def __init__(self, camera: ArloCamera) -> None:
self.arlo_basestation = camera.arlo_basestation

async def initialize_push_to_talk(self, media: MediaObject) -> None:
raise Exception("not implemented")
raise NotImplementedError("not implemented")

async def shutdown(self) -> None:
raise Exception("not implemented")
raise NotImplementedError("not implemented")


class ArloCamera(ArloDeviceBase, Settings, Camera, VideoCamera, DeviceProvider, VideoClips, MotionSensor, AudioSensor, Battery, Charger):
Expand Down Expand Up @@ -111,8 +112,9 @@ class ArloCamera(ArloDeviceBase, Settings, Camera, VideoCamera, DeviceProvider,
last_picture_time: datetime = datetime(1970, 1, 1)

# socket logger
logger_server = None
logger_server_port = 0
logger_loop: asyncio.AbstractEventLoop = None
logger_server: asyncio.AbstractServer = None
logger_server_port: int = 0

def __init__(self, nativeId: str, arlo_device: dict, arlo_basestation: dict, provider: ArloProvider) -> None:
super().__init__(nativeId=nativeId, arlo_device=arlo_device, arlo_basestation=arlo_basestation, provider=provider)
Expand All @@ -126,7 +128,11 @@ def __init__(self, nativeId: str, arlo_device: dict, arlo_basestation: dict, pro

def __del__(self) -> None:
super().__del__()
self.logger_server.close()
def logger_exit_callback():
self.logger_server.close()
self.logger_loop.stop()
self.logger_loop.close()
self.logger_loop.call_soon_threadsafe(logger_exit_callback)

async def delayed_init(self) -> None:
await self.create_tcp_logger_server()
Expand All @@ -150,24 +156,40 @@ async def delayed_init(self) -> None:

@async_print_exception_guard
async def create_tcp_logger_server(self) -> None:
async def callback(reader, writer):
try:
while not reader.at_eof():
line = await reader.readline()
if not line:
break
line = str(line, 'utf-8')
line = line.rstrip()
self.logger.info(line)
writer.close()
await writer.wait_closed()
except Exception:
self.logger.exception("Logger server callback raised an exception")
self.logger_loop = asyncio.new_event_loop()

def thread_main():
asyncio.set_event_loop(self.logger_loop)
self.logger_loop.run_forever()

threading.Thread(target=thread_main).start()

# this is a bit convoluted since we need the async functions to run in the
# logger loop thread instead of in the current thread
def setup_callback():
async def callback(reader, writer):
try:
while not reader.at_eof():
line = await reader.readline()
if not line:
break
line = str(line, 'utf-8')
line = line.rstrip()
self.logger.info(line)
writer.close()
await writer.wait_closed()
except Exception:
self.logger.exception("Logger server callback raised an exception")

async def setup():
self.logger_server = await asyncio.start_server(callback, host='localhost', port=0, family=socket.AF_INET, flags=socket.SOCK_STREAM)
self.logger_server_port = self.logger_server.sockets[0].getsockname()[1]
self.logger.info(f"Started logging server at localhost:{self.logger_server_port}")

self.logger_loop.create_task(setup())

self.logger_server = await asyncio.start_server(callback, host='localhost', port=0, family=socket.AF_INET, flags=socket.SOCK_STREAM)
self.logger_server_port = self.logger_server.sockets[0].getsockname()[1]
self.logger_loop.call_soon_threadsafe(setup_callback)

self.logger.info(f"Started logging server at localhost:{self.logger_server_port}")

def start_error_subscription(self) -> None:
def callback(code, message):
Expand Down Expand Up @@ -291,7 +313,7 @@ def eco_mode(self) -> bool:
return False

@property
def snapshot_throttle_interval(self) -> bool:
def snapshot_throttle_interval(self) -> int:
interval = self.storage.getItem("snapshot_throttle_interval")
if interval is None:
interval = 60
Expand Down Expand Up @@ -536,7 +558,7 @@ async def startIntercom(self, media: MediaObject) -> None:
self.intercom_session = ArloCameraWebRTCIntercomSession(self)
await self.intercom_session.initialize_push_to_talk(media)

self.logger.info("Intercom ready")
self.logger.info("Intercom initialized")

@async_print_exception_guard
async def stopIntercom(self) -> None:
Expand Down Expand Up @@ -746,17 +768,31 @@ async def initialize_push_to_talk(self, media: MediaObject) -> None:
session_id, offer_sdp
)

candidates = self.arlo_pc.WaitAndGetICECandidates()
self.logger.debug(f"Gathered {len(candidates)} candidates")
for candidate in candidates:
candidate = scrypted_arlo_go.WebRTCICECandidateInit(
scrypted_arlo_go.WebRTCICECandidate(handle=candidate).ToJSON()
).Candidate
self.logger.debug(f"Sending candidate to Arlo: {candidate}")
self.provider.arlo.NotifyPushToTalkCandidate(
self.arlo_basestation, self.arlo_device,
session_id, candidate,
)
def trickle_candidates():
count = 0
try:
while True:
candidate = self.arlo_pc.GetNextICECandidate()
candidate = scrypted_arlo_go.WebRTCICECandidateInit(
scrypted_arlo_go.WebRTCICECandidate(handle=candidate.handle).ToJSON()
).Candidate
self.logger.debug(f"Sending candidate to Arlo: {candidate}")
self.provider.arlo.NotifyPushToTalkCandidate(
self.arlo_basestation, self.arlo_device,
session_id, candidate,
)
count += 1
except RuntimeError as e:
if str(e) == "no more candidates":
self.logger.debug(f"End of candidates, found {count} candidate(s)")
else:
self.logger.exception("Exception while processing trickle candidates")
except Exception:
self.logger.exception("Exception while processing trickle candidates")

# we can trickle candidates asynchronously so the caller to startIntercom
# knows we are ready to receive packets
threading.Thread(target=trickle_candidates).start()

@async_print_exception_guard
async def shutdown(self) -> None:
Expand Down Expand Up @@ -840,7 +876,15 @@ async def initialize_push_to_talk(self, media: MediaObject) -> None:
self.intercom_ffmpeg_subprocess = HeartbeatChildProcess("FFmpeg", self.camera.logger_server_port, ffmpeg_path, *ffmpeg_args)
self.intercom_ffmpeg_subprocess.start()

self.arlo_sip.Start()
def sip_start():
try:
self.arlo_sip.Start()
except Exception:
self.logger.exception("Exception starting sip call")

# do remaining setup asynchronously so the caller to startIntercom
# can start sending packets
threading.Thread(target=sip_start).start()

@async_print_exception_guard
async def shutdown(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion plugins/arlo/src/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ sseclient==0.0.22
aiohttp==3.8.4
requests==2.28.2
cachetools==5.3.0
scrypted-arlo-go==0.3.0
scrypted-arlo-go==0.4.0
cloudscraper==1.2.71
curl-cffi==0.5.6; platform_machine != 'armv7l'
async-timeout==4.0.2
--extra-index-url=https://www.piwheels.org/simple/
--extra-index-url=https://bjia56.github.io/scrypted-arlo-go/
Expand Down

0 comments on commit 8eb533c

Please sign in to comment.