# 15 分を超える動画をダウンロードする

---

## Guide page
このサンプルを実行する場合は、必ず以下の**ガイドページ**を確認してください。
- https://users.soracom.io/ja-jp/guides/soracom-cloud-camera-services/api-examples-download-videos-longer-than-limits/

## License

Since the library is not explicitly installed, we adhere to the default license.

- **MIT License**
  - https://github.com/soracom-labs/sora-cam-api-examples/blob/main/LICENSE

In [None]:
# @title ステップ 1: サンプルコードの実行に必要なライブラリや定数を設定する

# --------------- Definition ---------------

from urllib.parse import urljoin, urlparse, quote, unquote
from datetime import datetime, timedelta, timezone
from typing import Final, Union
from google.colab import files

import ipywidgets as widgets
import requests
import shutil
import glob
import json
import sys
import os


# API Execution Result Class
class APIResult():
    response: dict
    additions: dict

    def __init_vars(self):
        self.response = None
        self.additions = None

    def __init__(self, response: dict, additions: dict):
        self.__init_vars()
        self.response = response
        self.additions = additions

    def get_display_response(self) -> str:
        if self.response != None:
            return json.dumps(self.response, indent=4, ensure_ascii=False)
        return ''

    def get_display_additions(self) -> str:
        if self.additions != None:
            return json.dumps(self.additions, indent=4, ensure_ascii=False)
        return ''

    def check_success(self) -> bool:
        if self.additions != None:
            return True if self.additions['status_code'] == 200 else False
        return False


# Get Contents Execution Result Class
class ContentsResult():
    content: bytes
    additions: dict

    def __init_vars(self):
        self.content = None
        self.additions = None

    def __init__(self, content: bytes, additions: dict):
        self.__init_vars()
        self.content = content
        self.additions = additions

    def get_display_additions(self) -> str:
        if self.additions != None:
            return json.dumps(self.additions, indent=4, ensure_ascii=False)
        return ''

    def check_success(self) -> bool:
        if self.additions != None:
            return True if self.additions['status_code'] == 200 else False
        return False


# Export Time Management Class
class ExportTimeInfo():
    time_from: datetime
    time_to: datetime

    def init_vars(self):
        self.time_from = None
        self.time_to = None

    def valid_vars(self) -> bool:
        return True if self.time_from != None and self.time_to != None else False

    def __init__(self, time_from: datetime, time_to: datetime):
        self.init_vars()
        self.time_from = time_from
        self.time_to = time_to

    def __get_range_limit(self) -> int:
        # As of June 2023, there is a limit of 15 minutes per video export.
        # https://users.soracom.io/ja-jp/tools/api/reference/#/SoraCam/getSoraCamDeviceStreamingVideo
        EXPORT_RANGE_LIMIT_SECONDS: Final = 900
        return EXPORT_RANGE_LIMIT_SECONDS

    def get_from_ms(self) -> Union[int, None]:
        if self.time_from != None:
            return int(self.time_from.timestamp() * 1000)
        return None

    def get_to_ms(self) -> Union[int, None]:
        if self.time_to != None:
            return int(self.time_to.timestamp() * 1000)
        return None

    def get_export_sec(self) -> Union[int, None]:
        if self.valid_vars():
            return (self.time_to - self.time_from).total_seconds()
        return None

    def check_time_limits(self, range_limit: bool = True) -> bool:
        if not self.valid_vars():
            return False

        jst_now = datetime.now(JST)

        # Future time cannot be specified.
        if jst_now.timestamp() < self.time_from.timestamp() or jst_now.timestamp() < self.time_to.timestamp():
            print()
            print(
                '➕➕➕ ⛔ A future date and time has been specified. Please confirm the date and time again. ➕➕➕')
            return False

        # If there is no difference between start and end time
        if self.get_export_sec() <= 0:
            print()
            print(
                '➕➕➕ ⛔ The date and time specified is incorrect. Please check the date and time specified. ➕➕➕')
            return False

        # Factoring in API unit output limits
        if range_limit:
            if self.get_export_sec() > self.__get_range_limit():
                print()
                print(
                    '➕➕➕ ⛔ The specified date and time range exceeds {} seconds. Please specify within the box.➕➕➕'.format(self.__get_range_limit()))
                return False

        return True

    def create_time_list(self) -> list:
        time_list = list()

        if not self.valid_vars():
            return time_list

        start = self.time_from
        end = self.time_to

        if not self.check_time_limits(False):
            return time_list

        # Use as is if not caught by the output limit of the export unit.
        if (end - start).total_seconds() <= self.__get_range_limit():
            time_list.append(ExportTimeInfo(start, end))
            return time_list

        # Divide by output unit.
        while True:
            # Add Output Units Only
            end = start + timedelta(seconds=self.__get_range_limit())

            # The calculated time is the future time and should be the specified time.
            if (self.time_to - end).total_seconds() <= 0:
                time_list.append(ExportTimeInfo(start, self.time_to))
                break

            time_list.append(ExportTimeInfo(start, end))
            start = end

        return time_list


# Definition of constants

# Japan Time Zone
JST: Final = timezone(timedelta(hours=+9), 'JST')

# Working directory when creating a ZIP file
ZIP_WORK_DIR: Final = './__zip_work'


# Process start message display
def print_start_msg() -> None:
    print()
    print('# ===== 🚩 Start processing ===== ', datetime.now(JST))

# Process end message display
def print_end_msg() -> None:
    print()
    print('# ===== 🍵 End processing ===== ', datetime.now(JST))

# Combine URL Strings
def merge_url(base: str, url: str) -> str:
    print()
    print('# 🧰 Combine URL Strings')
    print('# base = ', base)
    print('# url = ', url)

    parsed = urlparse(base)
    if parsed.path != '':
        if not parsed.path.endswith('/'):
            # add '/' because it is missing at the end
            base += '/'

    return urljoin(base, url.replace('/', '', 1))

# Send HTTP POST request
def send_post_request(url: str, headers: dict, data: dict, log: bool = False) -> APIResult:
    print()
    print('# 📡 Send HTTP POST request')
    print('# url = ', url)

    if log:
        print('# headers = ', headers)
        print('# data = ', data)

    # Send Request
    resp = requests.post(url, headers=headers, data=json.dumps(data))

    # Display request datetime in JST
    # Mon, 06 Feb 2023 02:03:09 GMT
    req_time = datetime.strptime(
        resp.headers['Date'], '%a, %d %b %Y %H:%M:%S %Z').astimezone(JST)
    print('# ⏰ Request datetime = ', req_time,
          int(req_time.timestamp() * 1000))

    # Preparation for results return
    additions = dict()
    additions['status_code'] = resp.status_code
    additions['headers'] = resp.headers

    if resp.status_code != 200:
        print()
        print('# ❌ POST request failed')
        print('# Status Code = ', resp.status_code)
        print('# Raw Messages = ', resp.json())
        additions['raw_messages'] = resp.json

    return APIResult(resp.json(), additions)

# Send HTTP GET request
def send_get_request(url: str, headers: dict, params: dict = None, log: bool = False) -> APIResult:
    print()
    print('# 📡 Send HTTP GET request')
    print('# url = ', url)

    if log:
        print('# headers = ', headers)
    if params != None:
        print('# params = ', params)

    # Send Request
    resp = requests.get(url, headers=headers, params=params)

    # Display request datetime in JST
    # Mon, 06 Feb 2023 02:03:09 GMT
    req_time = datetime.strptime(
        resp.headers['Date'], '%a, %d %b %Y %H:%M:%S %Z').astimezone(JST)
    print('# ⏰ Request datetime = ', req_time,
          int(req_time.timestamp() * 1000))

    # Preparation for results return
    additions = dict()
    additions['status_code'] = resp.status_code
    additions['headers'] = resp.headers

    if resp.status_code != 200:
        print()
        print('# ❌ GET request failed')
        print('# Status Code = ', resp.status_code)
        print('# Raw Messages = ', resp.json())
        additions['raw_messages'] = resp.json

    return APIResult(resp.json(), additions)

# Retrieving Content via HTTP GET Request
def send_get_content(url: str) -> ContentsResult:
    print()
    print('# 💾 Retrieve contents')
    print('# url = ', url)

    # Send Request
    resp = requests.get(url)

    # Display request datetime in JST
    # Mon, 06 Feb 2023 02:03:09 GMT
    req_time = datetime.strptime(
        resp.headers['Date'], '%a, %d %b %Y %H:%M:%S %Z').astimezone(JST)
    print('# ⏰ Request datetime = ', req_time,
          int(req_time.timestamp() * 1000))

    # Preparation for results return
    additions = dict()
    additions['status_code'] = resp.status_code
    additions['headers'] = resp.headers

    if resp.status_code != 200:
        print()
        print('# ❌ GET contents failed')
        print('# Status Code = ', resp.status_code)
        print('# Raw Messages = ', resp.json())
        additions['raw_messages'] = resp.json

    return ContentsResult(resp.content, additions)

# Extract filenames from URL strings
def extract_filename_from_URL(url: str) -> str:
    # Extract filenames
    name = urlparse(url).path.rsplit('/', 1)[1]

    if name == None or name == '':
        return ''

    # Decode and re-encode
    return quote(unquote(name))

# Initialize directory
def init_dir(path: str, new: bool = True) -> None:
    print()
    print('# 📦 Initialize directory')
    print('# path = ', path)
    print('# new = ', new)

    # If the directory does not exist, create it.
    os.makedirs(path, exist_ok=True)

    # Get a list of files
    name = '**'
    files = glob.glob(os.path.join(path, name), recursive=True)

    print()
    print('# 🗑️ Delete target = ', files)

    # Delete files
    shutil.rmtree(path)

    # Create New
    if new:
        os.makedirs(path, exist_ok=True)

# ZIP the selected directory and download it locally
def download_dir_locally(root_path: str, out_path: str, init: bool = True) -> None:
    print()
    print('# 🪂 Compress the directory and download it.')
    print('# root_path = ', root_path)
    print('# out_path = ', out_path)

    # Get a list of files
    name = '**'
    file_list = glob.glob(os.path.join(root_path, name), recursive=True)
    print('# files = ', file_list)

    # Initialize output destination directory
    if init:
        init_dir(out_path)

    # ZIP file name
    zip_name = datetime.now(JST).strftime('_%Y%m%d_%H%M%S_archive_')

    # Compress an entire dir
    archive = shutil.make_archive(os.path.join(
        out_path, zip_name), format='zip', root_dir=root_path)
    files.download(archive)

    print()
    print('# 🙏 Please wait while downloading completes.')
    print('# 📚 Download Target = ',  archive)


# API Client Class
# Combine API-related implementations into this class
class APIClient():
    endpoint_url: str
    api_key: str
    token: str

    def __init_vars(self):
        self.endpoint_url = ''
        self.api_key = ''
        self.token = ''

    def __init__(self, endpoint_url: str):
        self.__init_vars()
        self.endpoint_url = endpoint_url

    def __del__(self):
        self.__init_vars()

    def __get_auth_headers(self):
        return {
            'Content-Type': 'application/json',
            'X-Soracom-API-Key': self.api_key,
            'X-Soracom-Token': self.token,
        }

    # /auth
    # Authenticate API access and issue API key and API token
    # https://users.soracom.io/ja-jp/tools/api/reference/#/Auth/auth
    def authenticate_user(self, email: str, password: str, opid: str, name: str, mfa_code: str = None, timeout: int = 3600) -> APIResult:
        api_path = '/auth/'
        url = merge_url(self.endpoint_url, api_path)
        headers = {'Content-Type': 'application/json'}

        # authentication uses the following values
        # Root: email, password
        # SAM: opid, name, password
        data = dict()

        # prepare the value for the Root case
        if email != None and email != '':
            if password != None and password != '':
                data['email'] = email
                data['password'] = password

        # prepare the value for the SAM case
        # If both are valid, SAM is preferred
        if opid != None and opid != '':
            if name != None and name != '':
                if password != None and password != '':
                    data['operatorId'] = opid
                    data['userName'] = name
                    data['password'] = password

        # set the API key expiration time (Default 1 hour)
        data['tokenTimeoutSeconds'] = timeout

        # MFA Authentication Code
        if mfa_code != None and mfa_code != '':
            data['mfaOTPCode'] = mfa_code

        # Send Request
        return send_post_request(url, headers=headers, data=data)

    # /sora_cam/devices
    # Get the list of sora-cam compatible cameras
    # https://users.soracom.io/ja-jp/tools/api/reference/#/SoraCam/listSoraCamDevices

    def list_sora_cam_devices(self) -> APIResult:
        api_path = '/sora_cam/devices'
        url = merge_url(self.endpoint_url, api_path)
        headers = self.__get_auth_headers()

        # Send Request
        return send_get_request(url, headers=headers)

    # /sora_cam/devices/{device_id}
    # Get information about cameras compatible with sora-cam
    # https://users.soracom.io/ja-jp/tools/api/reference/#/SoraCam/getSoraCamDevice

    def get_sora_cam_device(self, id: str) -> APIResult:
        api_path = '/sora_cam/devices/' + id
        url = merge_url(self.endpoint_url, api_path)
        headers = self.__get_auth_headers()

        # Send Request
        return send_get_request(url, headers=headers)

    # /sora_cam/devices/{device_id}/exports/usage
    # Get the number of still images that can be exported from a sora-cam compatible camera and the exportable time of the recorded video
    # https://users.soracom.io/ja-jp/tools/api/reference/#/SoraCam/getSoraCamDeviceExportUsage

    def get_sora_cam_export_usage(self, id: str) -> APIResult:
        api_path = '/sora_cam/devices/' + id + '/exports/usage'
        url = merge_url(self.endpoint_url, api_path)
        headers = self.__get_auth_headers()

        # Send Request
        return send_get_request(url, headers=headers)

    # /sora_cam/devices/{device_id}/stream
    # Get information about downloading streaming video (real-time video / recorded video)
    # https://users.soracom.io/ja-jp/tools/api/reference/#/SoraCam/getSoraCamDeviceStreamingVideo

    def get_sora_cam_streaming_video(self, id: str, time_from: int = None, time_to: int = None) -> APIResult:
        api_path = '/sora_cam/devices/' + id + '/stream'
        url = merge_url(self.endpoint_url, api_path)
        headers = self.__get_auth_headers()

        # Set parameters as needed.
        params = None
        if time_from != None and time_from > 0:
            if time_to != None and time_to > 0:
                params = {
                    'from': time_from,
                    'to': time_to,
                }

        # Send Request
        return send_get_request(url, headers=headers, params=params)

    # /sora_cam/devices/{device_id}/videos/exports
    # Start the process of exporting recorded video stored in Cloud Always-on Recording
    # https://users.soracom.io/ja-jp/tools/api/reference/#/SoraCam/exportSoraCamDeviceRecordedVideo

    def export_sora_cam_recorded_video(self, id: str, time_from: int, time_to: int) -> APIResult:
        api_path = '/sora_cam/devices/' + id + '/videos/exports'
        url = merge_url(self.endpoint_url, api_path)
        headers = self.__get_auth_headers()

        data = {
            'from': time_from,
            'to': time_to,
        }

        # Send Request
        return send_post_request(url, headers=headers, data=data)

    # /sora_cam/devices/{device_id}/videos/exports/{export_id}
    # Get the current status of the process of exporting recorded video stored in Cloud Always-on Recording
    # https://users.soracom.io/ja-jp/tools/api/reference/#/SoraCam/getSoraCamDeviceExportedVideo

    def get_sora_cam_exported_video(self, id: str, export_id: str) -> APIResult:
        api_path = '/sora_cam/devices/' + id + '/videos/exports/' + export_id
        url = merge_url(self.endpoint_url, api_path)
        headers = self.__get_auth_headers()

        # Send Request
        return send_get_request(url, headers=headers)


# --------------- Execution ---------------
# Process start message display
print_start_msg()

print()
print('# ℹ️ Python Version = ', sys.version)

# Process end message display
print_end_msg()


In [None]:
# @title ステップ 2: SORACOM API を使うための認証処理をする

# --------------- Definition ---------------
# @markdown # Fill in the required information and run.
# @markdown - - -

# @markdown Set the SORACOM API endpoint.
endpoint_url = 'https://api.soracom.io/v1'  # @param {type:"string"}

# @markdown Select user to login to SORACOM User Console.
login_type = "SAM User"  # @param ["Root User", "SAM User"]

# @markdown Login using multi-factor authentication.
use_mfa = False #@param {type:"boolean"}

# @markdown - - -


# --------------- Execution ---------------
# Process start message display
print_start_msg()

# init API client
# We will continue to use this variable when calling the API in the following sections
api_client: APIClient = APIClient(endpoint_url)

# Prepare UI for login
# Root User email address input field
input_email = widgets.Text(
    value=None,
    placeholder='Enter Email Address',
    disabled=False
)

# SAM User operator ID input field
input_operator_Id = widgets.Text(
    value=None,
    placeholder='Enter Operator ID',
    disabled=False
)

# SAM User user name input field
input_user_name = widgets.Text(
    value=None,
    placeholder='Enter User Name',
    disabled=False
)

# User password input field
input_password = widgets.Password(
    value=None,
    placeholder='Enter Password',
    disabled=False
)

# MFA code input field
input_mfa_code = widgets.Text(
    value=None,
    placeholder='Enter MFA Code',
    disabled=False
)

# Login button
login_button = widgets.Button(
    description='Login',
    disabled=False,
    button_style='info',  # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Login',
    icon='user'  # (FontAwesome names without the `fa-` prefix)
)

# UI display by login type
if login_type == 'Root User':
    print()
    print('➕➕➕ 🔰 Login as the Root User. Fill in the required fields. Then press the Login button. ➕➕➕')
    print()

    display(widgets.Label(value="📧 Email Address: "))
    display(input_email)

elif login_type == 'SAM User':
    print()
    print('➕➕➕ 🔰 Login as the SAM User. Fill in the required fields. Then press the Login button. ➕➕➕')
    print()

    display(widgets.Label(value="🪪 Operator ID: "))
    display(input_operator_Id)
    display(widgets.Label(value="📛 User Name: "))
    display(input_user_name)

display(widgets.Label(value="🔑 Password: "))
display(input_password)

if use_mfa:
    display(widgets.Label(value="🔒 MFA Code: "))
    display(input_mfa_code)

print()
display(login_button)

# Process end message display
print_end_msg()


# Function called when a button is pressed
def on_login_button_handler(change):
    operator_Id = input_operator_Id.value
    user_name = input_user_name.value
    password = input_password.value
    email = input_email.value
    mfa_code = None

    # Check input values
    if login_type == 'Root User':
        if email == None or email == '' or password == None or password == '':
            print()
            print(
                '➕➕➕ ⛔ The login information for the Root User is missing. Please check your input. ➕➕➕')
            return
    elif login_type == 'SAM User':
        if operator_Id == None or operator_Id == '' or password == None or password == '' or user_name == None or user_name == '':
            print()
            print(
                '➕➕➕ ⛔ The login information for the SAM User is missing. Please check your input. ➕➕➕')
            return

    # Check MFA Authentication Code
    if use_mfa:
        mfa_code = input_mfa_code.value
        if not (mfa_code.isdecimal() and mfa_code.isascii()):
            print()
            print(
                '➕➕➕ ⛔ Please enter only numbers for the MFA verification code. ➕➕➕')
            return

    # Perform Authentication
    api_result = api_client.authenticate_user(
        email, password, operator_Id, user_name, mfa_code)

    if not api_result.check_success():
        print()
        print('➕➕➕ ⛔ API authentication failed. Please check your input. ➕➕➕')
        return

    auth_resp = api_result.response
    if auth_resp['apiKey'] != None and auth_resp['apiKey'] != '':
        if auth_resp['token'] != None and auth_resp['token'] != '':
            print()
            print('# 🔑 API access has been authenticated 💯')

            # Keep your API keys
            api_client.api_key = auth_resp['apiKey']
            api_client.token = auth_resp['token']

            # Clear input values to prevent reuse
            operator_Id = ''
            input_operator_Id.value = ''
            user_name = ''
            input_user_name.value = ''
            password = ''
            input_password.value = ''
            email = ''
            input_email.value = ''
            mfa_code = ''
            input_mfa_code = ''

    # Process end message display
    print_end_msg()


# Handler Setup
login_button.on_click(on_login_button_handler)


In [None]:
# @title ステップ 3: カメラを 1 台選択する

# --------------- Definition ---------------

# Returns an edited list of devices for the dropdown
def create_device_list_for_dropdown(devices: list) -> list:
    result = list()

    if len(devices) <= 0:
        return result

    for item in devices:
        # The following order according to the user console display
        # name / connected / firmwareVersion / productDisplayName / deviceId
        status = '🌈 Connected' if item['connected'] else '🌀 Disconnected'
        display = '{} / {} / {} / {} / {}'.format(
            item['name'], status, item['firmwareVersion'], item['productDisplayName'], item['deviceId'])
        result.append((display, item))

    return result

# Display remaining time for video export
def display_remaining_export_time(id: str) -> None:
    # Time remaining for video export
    usgae = None
    api_result = api_client.get_sora_cam_export_usage(id)

    if api_result.check_success():
        usgae = api_result.response

    print('')
    print('# 🆓 Exportable Times = ', api_result.get_display_response())
    print('# 🔎 Time remaining of month = ', '{} hours {} minutes {} seconds'.format(int(
        usgae['video']['remainingSeconds'] / 3600), int((usgae['video']['remainingSeconds'] % 3600) / 60), int(usgae['video']['remainingSeconds'] % 60)))


# --------------- Execution ---------------
# Process start message display
print_start_msg()

# Get a list of cameras
device_list = None
api_result = api_client.list_sora_cam_devices()

if api_result.check_success():
    device_list = api_result.response

print()
print('# 📷 Camera List = ', device_list)

# Camera Dropdown list
select_devices = widgets.Dropdown(
    options=create_device_list_for_dropdown(device_list),
    disabled=False,
    value=None,
)

print()
print('➕➕➕ 🔰 Select from Camera List ➕➕➕')
print()
display(widgets.Label(value="📷 Camera List: "))
display(select_devices)

# Select the first camera in the list
device_selected: dict = device_list[0]

# Process end message display
print_end_msg()

# Function called when a Dropdown is selected
def on_select_devices_handler(change):
    selected_Item = change['new']

    # Get details about the selected camera
    device_info = None
    api_result = api_client.get_sora_cam_device(selected_Item['deviceId'])

    if api_result.check_success():
        device_info = api_result.response

    print('')
    print('# 📸 Selected Camera Details = ', api_result.get_display_response())

    # Display remaining time for video export
    display_remaining_export_time(selected_Item['deviceId'])

    # Maintain information about selected devices
    global device_selected
    device_selected = device_info

    # Process end message display
    print_end_msg()


# Handler Setup
select_devices.observe(on_select_devices_handler, names='value')


In [None]:
# @title ステップ 4: ストリーミング再生の開始時刻と終了時刻を設定する

# --------------- Definition ---------------

# Define variables to hold values
stream_time_info: ExportTimeInfo = ExportTimeInfo(None, None)

# --------------- Execution ---------------
# Process start message display
print_start_msg()

# Prepare a list since time picker is not available
hours = [i for i in range(24)]
minutes = [i for i in range(60)]
seconds = [i for i in range(60)]
jst_now = datetime.now(JST)

# Start date
stream_start_date = widgets.DatePicker(
    description='Start date:',
    value=jst_now.date(),
    disabled=False
)

# Start time
stream_strat_hours = widgets.Dropdown(
    options=hours,
    value=jst_now.hour,
    description='Start time:',
    disabled=False,
)

stream_strat_minutes = widgets.Dropdown(
    options=minutes,
    value=jst_now.minute,
    disabled=False,
)

stream_strat_seconds = widgets.Dropdown(
    options=seconds,
    value=jst_now.second,
    disabled=False,
)

# End date
stream_end_date = widgets.DatePicker(
    description='End date:',
    value=jst_now.date(),
    disabled=False
)

# End time
stream_end_hours = widgets.Dropdown(
    options=hours,
    value=jst_now.hour,
    description='End time:',
    disabled=False,
)

stream_end_minutes = widgets.Dropdown(
    options=minutes,
    value=jst_now.minute,
    disabled=False,
)

stream_end_seconds = widgets.Dropdown(
    options=seconds,
    value=jst_now.second,
    disabled=False,
)

stream_time_button = widgets.Button(
    description='Time setting',
    disabled=False,
    button_style='info',  # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Time setting',
    icon='clock'  # (FontAwesome names without the `fa-` prefix)
)

stream_real_button = widgets.Button(
    description='Real Time',
    disabled=False,
    button_style='success',  # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Real Time',
    icon='rocket-launch'  # (FontAwesome names without the `fa-` prefix)
)

# Display the time setting UI
print()
print('➕➕➕ 🔰 Set the time for streaming playback.  ➕➕➕')
print()
display(widgets.HBox([stream_start_date, stream_strat_hours, widgets.Label(
    value='：'), stream_strat_minutes, widgets.Label(value='：'), stream_strat_seconds]))
display(widgets.HBox([stream_end_date, stream_end_hours, widgets.Label(
    value='：'), stream_end_minutes, widgets.Label(value='：'), stream_end_seconds]))
print()
display(widgets.HBox(
    [stream_time_button, widgets.Label(value='  '), stream_real_button]))

# Process end message display
print_end_msg()


# Function called when a button is pressed
def on_stream_time_button_handler(change):
    start_date = stream_start_date.value
    strat_hour = stream_strat_hours.value
    strat_minute = stream_strat_minutes.value
    strat_second = stream_strat_seconds.value

    # ISO format for start time
    # '2018-12-31T05:00:30.001000+09:00'
    start_datetime_iso = '{}T{:0=2}:{:0=2}:{:0=2}.000000+09:00'.format(
        start_date, strat_hour, strat_minute, strat_second)
    start_datetime = datetime.fromisoformat(start_datetime_iso)

    end_date = stream_end_date.value
    end_hour = stream_end_hours.value
    end_minute = stream_end_minutes.value
    end_second = stream_end_seconds.value

    # ISO format for end time
    # '2018-12-31T05:00:30.001000+09:00'
    end_datetime_iso = '{}T{:0=2}:{:0=2}:{:0=2}.000000+09:00'.format(
        end_date, end_hour, end_minute, end_second)
    end_datetime = datetime.fromisoformat(end_datetime_iso)

    time_info = ExportTimeInfo(start_datetime, end_datetime)
    print()
    print('# start_datetime = ', time_info.time_from)
    print('# end_datetime = ', time_info.time_to)
    print('# export_range = ', time_info.time_to - time_info.time_from)

    # Check the entered time
    if not time_info.check_time_limits():
        return

    # Holds the entered value
    stream_time_info.time_from = time_info.time_from
    stream_time_info.time_to = time_info.time_to

    print()
    print('# 📼 Set the start and end times for streaming playback. 🆗')
    print('# Performing a playback will consume the {} seconds export time.'.format(time_info.get_export_sec()))

    # Display remaining time for video export
    display_remaining_export_time(device_selected['deviceId'])

    # Process end message display
    print_end_msg()


# Function called when a button is pressed
def on_stream_real_button_handler(change):
    # It is treated as real time if no time is specified.
    stream_time_info.init_vars()

    print()
    print('# 📹 Set the real time for streaming playback. 🆗')
    print('# Performing a playback will consume the maximum time for the unit export.')

    # Display remaining time for video export
    display_remaining_export_time(device_selected['deviceId'])

    # Process end message display
    print_end_msg()


# Handler Setup
stream_time_button.on_click(on_stream_time_button_handler)

# Handler Setup
stream_real_button.on_click(on_stream_real_button_handler)



In [None]:
# @title ステップ 5: 設定した開始時刻と終了時刻でストリーミング再生する

# --------------- Definition ---------------

from google.colab.output import eval_js
from IPython.display import HTML

# Display HTML for streaming video viewing
def display_stream_player(url: str, expiry_time: int) -> None:
    video_html = HTML('''
        <style>
            video {
                width: 960px;
                height: 540px;
            }
        </style>
        <div id="url"></div>
        <div id="time"></div>
        <div>
            <video id="videoPlayer" controls></video>
        </div>
        <script src="//cdn.dashjs.org/latest/dash.all.debug.js"></script>
        <script>
          function init_video(url, time){
            var player = dashjs.MediaPlayer().create();
            player.initialize(document.querySelector("#videoPlayer"), url, true);
            document.getElementById("url").innerHTML = '<p> 🌏 url = ' + url + '</p>';
            document.getElementById("time").innerHTML = '<p>⏳ expiry_time = <b>' + time + '</b></p>';
          }
          init_video("%s", "%s")
        </script>
  ''' % (url, expiry_time))

    print()
    display(video_html)


# Load the video
def laod_stream_video(id: str, time_from: int = None, time_to: int = None, init: bool = True) -> None:
    stream_info = None
    api_result = api_client.get_sora_cam_streaming_video(
        id, time_from, time_to)

    if api_result.check_success():
        stream_info = api_result.response

    video_url = stream_info['playList'][0]['url']
    expiry_time = datetime.fromtimestamp(stream_info['expiryTime'] / 1000, JST)

    print()
    print('# 📺 Video Information = ', stream_info)
    print('# video_url = ', video_url)
    print('# expiry_time = ', expiry_time)

    # Display remaining time for video export
    display_remaining_export_time(device_selected['deviceId'])

    if init:
        # Display HTML for streaming video viewing
        display_stream_player(video_url, expiry_time)
    else:
        # Reflects values to the Javascript
        eval_js('init_video("{}", "{}")'.format(video_url, expiry_time))


# --------------- Execution ---------------
# Process start message display
print_start_msg()

# Reload button
stream_reload_button = widgets.Button(
    description='Reload video',
    disabled=False,
    button_style='info',  # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Reload video',
    icon='rotate-right'  # (FontAwesome names without the `fa-` prefix)
)

stream_type_label = widgets.Label(value='ℹ️ Real-time video is playing.')

# リアルタイムなのか範囲していなのか？
if stream_time_info.valid_vars():
    stream_type_label.value = 'ℹ️ The specified time period is played back.（{} - {}）'.format(
        stream_time_info.time_from, stream_time_info.time_to)

# Initial display of video
laod_stream_video(device_selected['deviceId'], stream_time_info.get_from_ms(
), stream_time_info.get_to_ms(), True)
print()
display(widgets.HBox([stream_reload_button,
        widgets.Label(value='　'), stream_type_label]))

# Process end message display
print_end_msg()

# Function called when a button is pressed
def on_stream_reload_button_handler(change):
    # Video reload process
    laod_stream_video(device_selected['deviceId'], stream_time_info.get_from_ms(
    ), stream_time_info.get_to_ms(), False)

    # Process end message display
    print_end_msg()


# Handler Setup
stream_reload_button.on_click(on_stream_reload_button_handler)


In [None]:
# @title ステップ 6: 動画ダウンロードの開始時刻と終了時刻を設定する

# --------------- Definition ---------------

# Define variables to hold values
export_time_list: list = None

# --------------- Execution ---------------
# Process start message display
print_start_msg()

# Prepare a list since time picker is not available
hours = [i for i in range(24)]
minutes = [i for i in range(60)]
seconds = [i for i in range(60)]
jst_now = datetime.now(JST)

# Start date
export_start_date = widgets.DatePicker(
    description='Start date:',
    value=jst_now.date(),
    disabled=False
)

# Start time
export_strat_hours = widgets.Dropdown(
    options=hours,
    value=jst_now.hour,
    description='Start time:',
    disabled=False,
)

export_strat_minutes = widgets.Dropdown(
    options=minutes,
    value=jst_now.minute,
    disabled=False,
)

export_strat_seconds = widgets.Dropdown(
    options=seconds,
    value=jst_now.second,
    disabled=False,
)

# End date
export_end_date = widgets.DatePicker(
    description='End date:',
    value=jst_now.date(),
    disabled=False
)

# End time
export_end_hours = widgets.Dropdown(
    options=hours,
    value=jst_now.hour,
    description='End time:',
    disabled=False,
)

export_end_minutes = widgets.Dropdown(
    options=minutes,
    value=jst_now.minute,
    disabled=False,
)

export_end_seconds = widgets.Dropdown(
    options=seconds,
    value=jst_now.second,
    disabled=False,
)

# Export button
export_time_button = widgets.Button(
    description='Time setting',
    disabled=False,
    button_style='info',  # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Time setting',
    icon='clock'  # (FontAwesome names without the `fa-` prefix)
)

# Display the time setting UI
print()
print('➕➕➕ 🔰 Set time period for video exports ➕➕➕')
print()

display(widgets.HBox([export_start_date, export_strat_hours, widgets.Label(
    value='：'), export_strat_minutes, widgets.Label(value='：'), export_strat_seconds]))
display(widgets.HBox([export_end_date, export_end_hours, widgets.Label(
    value='：'), export_end_minutes, widgets.Label(value='：'), export_end_seconds]))
print()
display(export_time_button)

# Process end message display
print_end_msg()


# Function called when a button is pressed
def on_export_time_button_handler(change):
    start_date = export_start_date.value
    strat_hour = export_strat_hours.value
    strat_minute = export_strat_minutes.value
    strat_second = export_strat_seconds.value

    # ISO format for start time
    # '2018-12-31T05:00:30.001000+09:00'
    start_datetime_iso = '{}T{:0=2}:{:0=2}:{:0=2}.000000+09:00'.format(
        start_date, strat_hour, strat_minute, strat_second)
    start_datetime = datetime.fromisoformat(start_datetime_iso)

    end_date = export_end_date.value
    end_hour = export_end_hours.value
    end_minute = export_end_minutes.value
    end_second = export_end_seconds.value

    # ISO format for end time
    # '2018-12-31T05:00:30.001000+09:00'
    end_datetime_iso = '{}T{:0=2}:{:0=2}:{:0=2}.000000+09:00'.format(
        end_date, end_hour, end_minute, end_second)
    end_datetime = datetime.fromisoformat(end_datetime_iso)

    time_info = ExportTimeInfo(start_datetime, end_datetime)
    print()
    print('# start_datetime = ', time_info.time_from)
    print('# end_datetime = ', time_info.time_to)
    print('# export_range = ', time_info.time_to - time_info.time_from)

    # 入力された時刻をチェックする
    if not time_info.check_time_limits(False):
        return

    # Calculate the period to export
    global export_time_list
    export_time_list = time_info.create_time_list()
    print('# list_count = ', len(export_time_list))

    print()
    print('# 📼 Set the time period for exporting videos. 🆗')
    print('# Performing a playback will consume the {} seconds export time.'.format(time_info.get_export_sec()))

    # Display remaining time for video export
    display_remaining_export_time(device_selected['deviceId'])

    # Process end message display
    print_end_msg()


# Handler Setup
export_time_button.on_click(on_export_time_button_handler)


In [None]:
# @title ステップ 7: 設定した開始時刻と終了時刻で動画をダウンロードする

# --------------- Definition ---------------

from pprint import pprint

import time

# Define where to store downloaded files
EXPORT_VIDEOS_DIR: Final = './export_videos'
EXPORT_ZIP_DIR: Final = './export_zips'


# Download a video within a designated range.
def download_range_video(id: str, time_list: list, video_path: str, zip_path: str) -> tuple[list, list]:

    # Display the progress bar and wait for the specified time
    def wait_for_download(wait_sec: int) -> bool:
        print()
        print('# Waiting time estimate = ', wait_sec, '(seconds)')

        progress = widgets.IntProgress(
            value=0,
            min=0,
            max=wait_sec,
            description='Waiting:',
            bar_style='info',  # 'success', 'info', 'warning', 'danger' or ''
            # style={'bar_color': 'maroon'},
            orientation='horizontal'
        )

        progress_text = widgets.Label(value="0 %")
        display(widgets.HBox([progress, progress_text]))

        start = datetime.now(JST)
        for i in range(wait_sec):
            per = ((i + 1) / wait_sec) * 100
            progress_text.value = str(int(per)) + ' %'
            progress.value = i + 1
            time.sleep(1)
        end = datetime.now(JST)

        print('# Actual waiting time = ',
              (end - start).total_seconds(), '(seconds)')

        return True

     # Get and extract the ZIP file from the export URL
    def get_export_video(url: str, video_path: str, zip_path: str) -> Union[dict, None]:
        print()
        print('# 🎥 Download Video')
        print('# url = ', url)
        print('# zip_path = ', zip_path)
        print('# video_path = ', video_path)

        # Request to retrieve content
        contents_result = send_get_content(url)

        # failure
        if not contents_result.check_success():
            return contents_result.additions

        # Save as a ZIP file
        with open(zip_path, "wb") as f:
            f.write(contents_result.content)

        # Unzip the ZIP file
        shutil.unpack_archive(zip_path, video_path)

        return None

    # Define fixed values for waiting time
    WAIT_TIME_FACTOR: Final = 0.777
    WAIT_TIME_LOWER_LIMIT: Final = 10

    # List of successes and failures
    success_list = list()
    failure_list = list()

    print()
    print('# 📼 Start downloading the video.')
    print('# Total downloads = ', len(time_list))

    for index, item in enumerate(time_list):
        print()
        print('# ℹ️ Current entry = {}/{}'.format(index+1, len(time_list)))
        print('# From = ', item.time_from)
        print('# To = ', item.time_to)
        print('# Length = ', item.get_export_sec(), '(seconds)')

        # Record execution status
        state_records = list()
        state_records.append(vars(item))

        # Request video export
        requested = None
        api_result = api_client.export_sora_cam_recorded_video(
            id, item.get_from_ms(), item.get_to_ms())

        state_records.append(vars(api_result))

        if not api_result.check_success():
            print()
            print('# 🚫 API request to export video failed.')
            # Errors are logged and the next task is executed
            failure_list.append(state_records)
            print('# 🔝 Processing for loop continues')
            continue

        requested = api_result.response
        print()
        print('# 🏮 Export Requested = ', api_result.get_display_response())

        # Must wait for it to be available for download
        # Generally, it tends to be possible to download after waiting 70%~80% of the specified time (of course, it depends on the server processing status).
        wait_sec = int(item.get_export_sec() * WAIT_TIME_FACTOR)

        # Wait until you can download it.
        export_status = None
        while True:
            # Display progress to prevent the system from being detected as stopped after a long period of inactivity.
            if wait_for_download(wait_sec):
                # Check to see if it is available for download
                api_result = api_client.get_sora_cam_exported_video(
                    id, requested['exportId'])

                state_records.append(vars(api_result))

                if not api_result.check_success():
                    print()
                    print('# 🚫 API request to check export status failed.')
                    # Errors are logged and the next task is executed
                    failure_list.append(state_records)
                    print('# 🔝 Processing while loop break')
                    break

                export_status = api_result.response
                print()
                print('# 🔍 Export Status = ', api_result.get_display_response())

                # Download available only if 'status' is 'completed'
                # [ initializing, processing, completed, retrying, failed, limitExceeded, expired ]
                # https://users.soracom.io/ja-jp/tools/api/reference/#/SoraCam/listSoraCamDeviceVideoExports
                if export_status['status'] == 'completed':
                    print()
                    print('# ✅ The video is now available for download.')

                    # Perform the actual download.
                    zip_url = export_status['url']
                    zip_file_path = os.path.join(
                        zip_path, extract_filename_from_URL(zip_url))

                    video_result = get_export_video(
                        zip_url, video_path, zip_file_path)

                    if video_result != None:
                        print()
                        print('# 🚫 Video download request failed.')
                        # Errors are logged and the next task is executed
                        state_records.append(video_result)
                        failure_list.append(state_records)
                        print('# 🔝 Processing while loop break')
                        break

                    # If you are successful up to the download, record it and move on to the next task.
                    success_list.append(state_records)
                    print()
                    print('# 💮 It all worked. while loop break')
                    break

                # For obvious errors
                if export_status['status'] == 'failed' or export_status['status'] == 'limitExceeded' or export_status['status'] == 'expired':
                    print()
                    print('# 🚫 Export status is an obvious failure.')
                    # Errors are logged and the next task is executed
                    failure_list.append(state_records)
                    print('# 🔝 Processing while loop break')
                    break

                # If it does not download, wait again
                wait_sec = int(wait_sec * 0.25)
                wait_sec = WAIT_TIME_LOWER_LIMIT if wait_sec <= WAIT_TIME_LOWER_LIMIT else wait_sec

                print()
                print('# 🔁 Wait again = ', datetime.now(JST))

    return success_list, failure_list


# --------------- Execution ---------------
# Process start message display
print_start_msg()

# Initialize directory to store download files
init_dir(EXPORT_VIDEOS_DIR)
init_dir(EXPORT_ZIP_DIR)

# Download Video
success_list, failure_list = download_range_video(
    device_selected['deviceId'], export_time_list, EXPORT_VIDEOS_DIR, EXPORT_ZIP_DIR)

# Displays successful and unsuccessful attempts, respectively.
if len(failure_list) > 0:
    print()
    print('➕➕➕ ⛔ Some errors have been found during execution. Please check the content. ➕➕➕')
    print()
    print('# failure_count = ', len(failure_list))
    pprint(failure_list)

    if len(success_list) > 0:
        print()
        print('# 💬 Some of them are successful. Please check the contents.')
        print()
        print('# success_count = ', len(success_list))
        pprint(success_list)


# Process end message display
print_end_msg()


In [None]:
# @title ステップ 8: ローカル環境にダウンロードするディレクトリを選択する

# --------------- Execution ---------------
# Process start message display
print_start_msg()

USER_CONTENT_DIR: Final = '/content/'

path = USER_CONTENT_DIR
name = '**'

print()
print('# path = ', path)
print('# name = ', name)

# Get Dir list
file_list = glob.glob(os.path.join(path, name), recursive=True)
dir_list = [f for f in file_list if os.path.isdir(f)]
dir_list.sort()

print('# 🗂️ directories = ', dir_list)

# Dir Dropdown list
select_dirs = widgets.Dropdown(
    options=dir_list,
    disabled=False,
    value=None,
)
print()
print('➕➕➕ 🔰 Please select one from the list of directories below. ➕➕➕')
print()

display(widgets.Label(value="🗂️ Directories:"))
display(select_dirs)

# Process end message display
print_end_msg()

# Select the first dir in the list
dir_selected = dir_list[0]

# Function called when a Dropdown is selected
def on_select_dirs_handler(change):
    selected_item = change['new']

    print('')
    print('# 🎯 Selected Dir = ', selected_item)

    # Get a list of files in a dir
    files = glob.glob(os.path.join(selected_item, '*'))
    print('# 📄 File List = ', files)

    if selected_item == USER_CONTENT_DIR:
      print()
      print('# 🧨 The root of the content folder is specified. Compression and download will take some time. Please check before executing.')

    global dir_selected
    dir_selected = selected_item

    # Process end message display
    print_end_msg()


# Handler Setup
select_dirs.observe(on_select_dirs_handler, names='value')



In [None]:
# @title ステップ 9: 選択したディレクトリをローカル環境にダウンロードする

# --------------- Definition ---------------
# @markdown # Fill in the required information and run.
# @markdown - - -

# @markdown For videos, Check the box if you want to retrieve merged files.
concat_mp4_videos = True  # @param {type:"boolean"}

# @markdown - - -

import subprocess


# Merge and download MP4 files
def concat_mp4_files(path: str) -> None:
    MP4_WORK_DIR: Final = './_concat_mp4_'

    print()
    print('# path = ', path)

    # In dir List of MP4 files
    files = glob.glob(os.path.join(path, '*.mp4'))
    print('# MP4 File List = ', files)

    # If the file is less than or equal to 1, no merging is required
    if len(files) <= 1:
        print()
        print('# 🔙 No multiple MP4s.')
        return

    # Output Filler Information
    init_dir(MP4_WORK_DIR)
    mp4_name = datetime.now(JST).strftime('_%Y%m%d_%H%M%S_concat_.mp4')
    mp4_file = os.path.join(MP4_WORK_DIR, mp4_name)

    # Actual merging
    # Videos with different FPS may be downloaded
    # Re-encoding and merging with '-vsync' and '-filter_complex' options
    # Quality may be worse than the original due to re-encoding
    # '-vsync' is deprecated, but '-fps_mode' was not available because of the version 'ffmpeg version 4.2.7-0ubuntu0.1'
    # ffmpeg -i 'A1.mp4' -i 'B1.mp4' -filter_complex "concat=n=2:v=1:a=1" -fps_mode vfr output.mp4
    # https://trac.ffmpeg.org/wiki/Concatenate
    files.sort()
    cmds = ['ffmpeg']

    for f in files:
        cmds.append('-i')
        cmds.append(f)

    cmds.append('-filter_complex')
    cmds.append('concat=n={}:v=1:a=1'.format(len(files)))
    cmds.append('-vsync')
    cmds.append('vfr')
    cmds.append(mp4_file)

    print('# ffmpeg commands = ', cmds)

    # merging
    start = datetime.now(JST)
    print()
    print('# Start merging videos = ', start)

    res = subprocess.run(cmds, capture_output=True)

    end = datetime.now(JST)
    print('# Finish merging videos = ', end)
    print('# Actual waiting time = ',
          (end - start).total_seconds(), '(seconds)')

    print()
    print("# return code: {}".format(res.returncode))
    print("# stdout: {}".format(res.stdout.decode()))
    print("# stderr: {}".format(res.stderr.decode()))

    # Download merged file
    download_dir_locally(MP4_WORK_DIR, ZIP_WORK_DIR, False)


# --------------- Execution ---------------
# Process start message display
print_start_msg()

# Download the whole specified directory
download_dir_locally(dir_selected, ZIP_WORK_DIR)

# Merge files for MP4 format files
if concat_mp4_videos:
    concat_mp4_files(dir_selected)

# Process end message display
print_end_msg()
