In this example we will do the interesting stuff - we will watch videos from different video platforms and capturing the corresponding traffic.

Watching video is a pretty popular task, used for Quality of Experience research, Variable Bit Rate experiments, traffic-based video classification, etc. Let's collect a toy dataset using netunicorn platform for these tasks.

As usual, we need to import all needed classes and tasks. This time we are again using our beautiful netunicorn-library and preimplemented tasks for watching YouTube, Vimeo, and Twitch videos. Please, visit the corresponding GitHub repository (https://github.com/netunicorn/netunicorn-library) if you're interested in tasks details or want to contribute your tasks and pipelines.

_Important:_ for this example, we assume that you already explored the basic example and familiar with netunicorn principles.

In [2]:
import os
import time

from netunicorn.client.remote import RemoteClient, RemoteClientException
from netunicorn.base import Experiment, ExperimentStatus, Pipeline

# Tasks to start tcpdump and stop named tcpdump task
from netunicorn.library.tasks.capture.tcpdump import StartCapture, StopNamedCapture

# Upload to file.io - public anonymous temporary file storage
from netunicorn.library.tasks.upload.fileio import UploadToFileIO

# Tasks for watching the corresponding video platform
from netunicorn.library.tasks.video_watchers.youtube_watcher import WatchYouTubeVideo
from netunicorn.library.tasks.video_watchers.vimeo_watcher import WatchVimeoVideo
from netunicorn.library.tasks.video_watchers.twitch_watcher import WatchTwitchStream

As the next step, let's assemble our pipeline. We should start the traffic capture, then watch three absolutely random videos (or streams) from YouTube, Vimeo, and Twitch correspondingly, and then again stop the data capture and upload the resulting PCAP file to some cloud. We again will use `file.io` for data upload.

In [3]:
pipeline = (
    Pipeline()
    .then(StartCapture(filepath="/tmp/capture.pcap", name="capture"))
    .then(WatchYouTubeVideo("https://www.youtube.com/watch?v=dQw4w9WgXcQ", 10))
    .then(WatchVimeoVideo("https://vimeo.com/375468729", 10))
    .then(WatchTwitchStream("https://www.twitch.tv/shroud", 10))
    .then(StopNamedCapture(start_capture_task_name="capture"))
    .then(UploadToFileIO(filepath="/tmp/capture.pcap", expires="1d"))
)

Most of the other code is not different from any experiment definition. Let's use correct credentials for our infrastructure (in your case they could be different, if you don't have netunicorn instance in your organization - you can deploy your own locally for testing purposes, see https://netunicorn.cs.ucsb.edu/examples for details)

In [4]:
# if you have .env file locally for storing credentials, skip otherwise
if '.env' in os.listdir():
    from dotenv import load_dotenv
    load_dotenv(".env")

NETUNICORN_ENDPOINT = os.environ.get('NETUNICORN_ENDPOINT', 'https://pinot.cs.ucsb.edu/netunicorn')
NETUNICORN_LOGIN = os.environ.get('NETUNICORN_LOGIN', 'team_tz')
NETUNICORN_PASSWORD = os.environ.get('NETUNICORN_PASSWORD', 'throughput445')

client = RemoteClient(endpoint=NETUNICORN_ENDPOINT, login=NETUNICORN_LOGIN, password=NETUNICORN_PASSWORD)
client.healthcheck()

True

Let's get some nodes for execution. As usual, for demonstration purposes of this notebook we will take our Raspberry Pi nodes, but if your infrastructure is different - feel free to modify the next cell.

In [5]:
nodes = client.get_nodes()

# switch for showing our infrastructure vs you doing it locally on other nodes
if os.environ.get('NETUNICORN_ENDPOINT', 'https://pinot.cs.ucsb.edu/netunicorn') == 'https://pinot.cs.ucsb.edu/netunicorn':
    working_nodes = nodes.filter(lambda node: node.name.startswith("raspi")).take(3)
else:
    working_nodes = nodes.take(1)

working_nodes

[raspi-e4:5f:01:9c:ca:3a, raspi-e4:5f:01:56:d9:a3, raspi-e4:5f:01:a7:b1:c1]

In [8]:
# Creating the experiment
experiment = Experiment().map(pipeline, working_nodes)
experiment

 - Deployment: Node=raspi-e4:5f:01:56:d9:a2, executor_id=, prepared=False, error=None

When pipelines are assigned to nodes (and therefore, Deployments are created), each pipeline would have an "environment_definition" with a list of "preparation" commands to execute during Docker image compilation. We can explore these commands:

In [9]:
for line in experiment[0].environment_definition.commands:
    print(line)

sudo apt-get update
sudo apt-get install -y tcpdump
apt install -y python3-pip wget xvfb procps chromium chromium-driver
pip3 install selenium webdriver-manager
apt install -y python3-pip wget xvfb procps chromium chromium-driver
pip3 install selenium webdriver-manager
apt install -y python3-pip wget xvfb procps chromium chromium-driver
pip3 install selenium webdriver-manager
sudo apt-get update
sudo apt-get install -y tcpdump
sudo apt-get install -y procps
sudo apt-get install -y curl


Seems like we can optimize them a bit, as all watcher tasks use the same commands for configuration (at least, for now). Remember, that tasks do not know about each other in the pipeline, so they cannot resolve these duplications automatically.

But, instead, let's ask experiment to use a predefined Docker image. This image was created by taking a base `python:3.10.9-slim` image and installing all the programs from the current environment. You can create and optimize your own image and use it on netunicorn platform as well, as soon as it's published on any public Docker repository (e.g., DockerHub).

Please, note that any image used for the platform should have `netunicorn-executor` installed and `python3 -m netunicorn.executor` as the entrypoint.

In [10]:
from netunicorn.base import DockerImage
for deployment in experiment:
    # you can explore the image on the DockerHub
    deployment.environment_definition = DockerImage(image='netunicorn/chromium:0.3.0')

Ok, not let's define the experiment name and, as usual, prepare the experiment.

In [11]:
experiment_label = "video_watchers_example"

In [12]:
try:
    client.delete_experiment(experiment_label)
except RemoteClientException:
    pass

client.prepare_experiment(experiment, experiment_label)

while True:
    info = client.get_experiment_status(experiment_label)
    print(info.status)
    if info.status == ExperimentStatus.READY:
        break
    time.sleep(20)

ExperimentStatus.PREPARING
ExperimentStatus.PREPARING
ExperimentStatus.PREPARING
ExperimentStatus.PREPARING
ExperimentStatus.PREPARING
ExperimentStatus.READY


Verifying that everything is correct:

In [13]:
for deployment in client.get_experiment_status(experiment_label).experiment:
    print(f"Prepared: {deployment.prepared}, error: {deployment.error}")

Prepared: True, error: None


And starting the execution:

In [14]:
client.start_execution(experiment_label)

while True:
    info = client.get_experiment_status(experiment_label)
    print(info.status)
    if info.status != ExperimentStatus.RUNNING:
        break
    time.sleep(20)

ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.FINISHED


Moment of truth... /nervous drums sounds/...

In [15]:
from returns.pipeline import is_successful

for report in info.execution_result:
    print(f"Node name: {report.node.name}")
    print(f"Error: {report.error}")

    result, log = report.result  # report stores results of execution and corresponding log
    
    # result is a returns.result.Result object, could be Success of Failure
    print(f"Result is: {type(result)}")
    data = result.unwrap() if is_successful(result) else result.failure()
    for key, value in data.items():
        print(f"{key}: {value}")

    # we also can explore logs
    for line in log:
        print(line.strip())
    print()

Node name: raspi-e4:5f:01:56:d9:a2
Error: None
Result is: <class 'returns.result.Success'>
container_value: defaultdict(<class 'list'>, {'capture': [<Success: {'container_value': 11}>], 'c8d042c1-6a44-4f77-9c38-63042fff08f4': [<Success: {'container_value': 'Video finished by timeout: 10 seconds'}>], '6ee855d6-d827-4e1b-8ed5-7e87b8d160ba': [<Success: {'container_value': 'Video finished by timeout: 10 seconds'}>], '8407e24a-b46d-4412-9cad-7d33a0bf50fa': [<Success: {'container_value': 'Video probably finished by timeout: 10 seconds'}>], '189f2c36-c00f-434a-8178-c67724a74b6b': [<Success: {'container_value': b''}>], '1aa5fbc8-9eb7-488a-97d2-edfe56018b90': [<Success: {'container_value': '{"success":true,"status":200,"id":"e26eece0-7cfb-11ee-a149-2fef27717548","key":"egG8jteY1PFl","path":"/","nodeType":"file","name":"capture.pcap","title":null,"description":null,"size":44990882,"link":"https://file.io/egG8jteY1PFl","private":false,"expires":"2023-11-07T23:26:17.248Z","downloads":0,"maxDownloa

Well, what could go wrong? :) ~(everything...)~  
We successfully executed the experiment and watched 10 seconds of videos from YouTube, Vimeo, and Twitch, and recorded the network traffic for further analysis. Well, most likely it's the ad traffic from the corresponding platforms (as we intentionally did not used ad blockers in this example), but we did it anyway! :D  

We encourage everyone to implement corresponding tasks or tasks modifications as you need and publish them to [netunicorn-library](https://github.com/netunicorn/netunicorn-library). As usual, our documenation and other examples are available on https://netunicorn.cs.ucsb.edu.