In [1]:
import os
import time

from netunicorn.client.remote import RemoteClient, RemoteClientException
from netunicorn.base import Experiment, ExperimentStatus, Pipeline

# Tasks to start tcpdump and stop named tcpdump task
from netunicorn.library.tasks.capture.tcpdump import StartCapture, StopNamedCapture

# Upload to file.io - public anonymous temporary file storage
from netunicorn.library.tasks.upload.webdav import UploadToWebDav

# Tasks for watching the corresponding video platform
# from netunicorn.library.tasks.video_watchers.youtube_watcher import WatchYouTubeVideo
# from netunicorn.library.tasks.video_watchers.vimeo_watcher import WatchVimeoVideo
from netunicorn.library.tasks.video_watchers.twitch_watcher import WatchTwitchStream

# from background_traffic import StartBackgroundTraffic, StopNamedBackgroundTraffic

In [2]:
import signal
import subprocess
import time

from netunicorn.base import (
    Architecture,
    Failure,
    Node,
    Result,
    Success,
    Task,
    TaskDispatcher,
)
from netunicorn.library.tasks.tasks_utils import subprocess_run


class StartBackgroundTraffic(TaskDispatcher):
    def __init__(
            self, *args, **kwargs
    ):
        super().__init__(*args, **kwargs)

        self.linux_implementation = StartBackgroundTrafficLinuxImplementation(
            *args, **kwargs
        )

    def dispatch(self, node: Node) -> Task:
        if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
            return self.linux_implementation

        raise NotImplementedError(
            f"StartBackgroundTraffic is not implemented for {node.architecture}"
        )


class StartBackgroundTrafficLinuxImplementation(Task):
    requirements = ["pip install speedtest-cli"]

    def __init__(
            self, *args, **kwargs
    ):
        super().__init__(*args, **kwargs)

    def run(self) -> Result:
        signal.signal(signal.SIGCHLD, signal.SIG_IGN)

        proc = subprocess.Popen(
            ["sh", "-c", "while true; do speedtest-cli --simple --secure; echo asdf; done"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        time.sleep(2)
        if (exit_code := proc.poll()) is None:  # not finished yet
            return Success(proc.pid)

        text = ""
        if proc.stdout:
            text += proc.stdout.read().decode("utf-8") + "\n"
        if proc.stderr:
            text += proc.stderr.read().decode("utf-8")
        return Failure(f"sh terminated with return code {exit_code}" + text)


class StopNamedBackgroundTraffic(TaskDispatcher):
    def __init__(self, start_background_traffic_task_name: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_background_traffic_task_name = start_background_traffic_task_name
        self.linux_implementation = StopNamedBackgroundTrafficImplementation(
            background_traffic_task_name=self.start_background_traffic_task_name,
            *args,
            **kwargs,
        )

    def dispatch(self, node: Node) -> Task:
        if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
            return self.linux_implementation

        raise NotImplementedError(
            f"StopNamedBackgroundTraffic is not implemented for {node.architecture}"
        )


class StopNamedBackgroundTrafficImplementation(Task):
    requirements = [
        "sudo apt-get update",
        "sudo apt-get install -y procps",
    ]

    def __init__(self, background_traffic_task_name: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.background_traffic_task_name = background_traffic_task_name

    def run(self):
        signal.signal(signal.SIGCHLD, signal.SIG_IGN)
        pid = self.previous_steps.get(
            self.background_traffic_task_name, [Failure("Named StartBackgroundTraffic not found")]
        )[-1]
        if isinstance(pid, Failure):
            return pid

        pid = pid.unwrap()
        return subprocess_run(["kill", str(pid)])

In [3]:
# pipeline = (
#     Pipeline()
#     .then(StartCapture(filepath="youtube_capture.pcap", name="youtube_capture"))
#     .then(WatchTwitchStream("https://www.youtube.com/watch?v=dQw4w9WgXcQ", 10))
#     .then(WatchTwitchStream("https://www.youtube.com/watch?v=r5JYHXtt_rw", 10))
#     .then(WatchTwitchStream("https://www.youtube.com/watch?v=pxEV1A5mTYM", 10))
#     .then(WatchTwitchStream("https://www.youtube.com/watch?v=Ct6BUPvE2sM", 10))
#     .then(WatchTwitchStream("https://www.youtube.com/watch?v=KjtYZpqvt50", 10))
#     .then(StopNamedCapture(start_capture_task_name="youtube_capture"))
#     .then(UploadToWebDav(
#         filepaths={"youtube_capture.pcap"},
#         endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n4/A/youtube_capture",
#         username="uploader",
#         password="uploader"
#     ))
#     .then(StartCapture(filepath="vimeo_capture.pcap", name="vimeo_capture"))
#     .then(WatchTwitchStream("https://vimeo.com/375468729?autoplay=1", 10))
#     .then(WatchTwitchStream("https://vimeo.com/347119375?autoplay=1", 10))
#     .then(WatchTwitchStream("https://vimeo.com/297124334?autoplay=1", 10))
#     .then(WatchTwitchStream("https://vimeo.com/476306167?autoplay=1", 10))
#     .then(WatchTwitchStream("https://vimeo.com/515893651?autoplay=1", 10))
#     .then(StopNamedCapture(start_capture_task_name="vimeo_capture"))
#     .then(UploadToWebDav(
#         filepaths={"vimeo_capture.pcap"},
#         endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n4/A/vimeo_capture",
#         username="uploader",
#         password="uploader"
#     ))
#     .then(StartCapture(filepath="twitch_capture.pcap", name="twitch_capture"))
#     .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10))
#     .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10))
#     .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10))
#     .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10))
#     .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10))
#     .then(StopNamedCapture(start_capture_task_name="twitch_capture"))
#     .then(UploadToWebDav(
#         filepaths={"twitch_capture.pcap"},
#         endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n4/A/twitch_capture",
#         username="uploader",
#         password="uploader"
#     ))
# 
#     .then(StartBackgroundTraffic(name="background_traffic"))
#     .then(StartCapture(filepath="youtube_capture_speedtest.pcap", name="youtube_capture_speedtest"))
#     .then(WatchTwitchStream("https://www.youtube.com/watch?v=dQw4w9WgXcQ", 10))
#     .then(WatchTwitchStream("https://www.youtube.com/watch?v=r5JYHXtt_rw", 10))
#     .then(WatchTwitchStream("https://www.youtube.com/watch?v=pxEV1A5mTYM", 10))
#     .then(WatchTwitchStream("https://www.youtube.com/watch?v=Ct6BUPvE2sM", 10))
#     .then(WatchTwitchStream("https://www.youtube.com/watch?v=KjtYZpqvt50", 10))
#     .then(StopNamedCapture(start_capture_task_name="youtube_capture_speedtest"))
#     .then(UploadToWebDav(
#         filepaths={"youtube_capture_speedtest.pcap"},
#         endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n4/A/youtube_capture_speedtest",
#         username="uploader",
#         password="uploader"
#     ))
#     .then(StartCapture(filepath="vimeo_capture_speedtest.pcap", name="vimeo_capture_speedtest"))
#     .then(WatchTwitchStream("https://vimeo.com/375468729?autoplay=1", 10))
#     .then(WatchTwitchStream("https://vimeo.com/347119375?autoplay=1", 10))
#     .then(WatchTwitchStream("https://vimeo.com/297124334?autoplay=1", 10))
#     .then(WatchTwitchStream("https://vimeo.com/476306167?autoplay=1", 10))
#     .then(WatchTwitchStream("https://vimeo.com/515893651?autoplay=1", 10))
#     .then(StopNamedCapture(start_capture_task_name="vimeo_capture_speedtest"))
#     .then(UploadToWebDav(
#         filepaths={"vimeo_capture_speedtest.pcap"},
#         endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n4/A/vimeo_capture_speedtest",
#         username="uploader",
#         password="uploader"
#     ))
#     .then(StartCapture(filepath="twitch_capture_speedtest.pcap", name="twitch_capture_speedtest"))
#     .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10))
#     .then(WatchTwitchStream("https://twitch.tv/video/2298318732", 10))
#     .then(WatchTwitchStream("https://twitch.tv/video/2316652767", 10))
#     .then(WatchTwitchStream("https://twitch.tv/video/1867242354", 10))
#     .then(WatchTwitchStream("https://twitch.tv/video/2320975412", 10))
#     .then(StopNamedCapture(start_capture_task_name="twitch_capture_speedtest"))
#     .then(UploadToWebDav(
#         filepaths={"twitch_capture_speedtest.pcap"},
#         endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n4/A/twitch_capture_speedtest",
#         username="uploader",
#         password="uploader"
#     ))
#     .then(StopNamedBackgroundTraffic(start_background_traffic_task_name="background_traffic"))
# )
pipeline = (
    Pipeline()
    .then(StartCapture(filepath="youtube_capture_proxy.pcap", name="youtube_capture_proxy"))
    .then(WatchTwitchStream("https://www.youtube.com/watch?v=dQw4w9WgXcQ", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://www.youtube.com/watch?v=r5JYHXtt_rw", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://www.youtube.com/watch?v=pxEV1A5mTYM", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://www.youtube.com/watch?v=Ct6BUPvE2sM", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://www.youtube.com/watch?v=KjtYZpqvt50", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(StopNamedCapture(start_capture_task_name="youtube_capture_proxy"))
    .then(UploadToWebDav(
        filepaths={"youtube_capture_proxy.pcap"},
        endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n4/A/youtube_capture_proxy",
        username="uploader",
        password="uploader"
    ))
    .then(StartCapture(filepath="vimeo_capture_proxy.pcap", name="vimeo_capture_proxy"))
    .then(WatchTwitchStream("https://vimeo.com/375468729?autoplay=1", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://vimeo.com/347119375?autoplay=1", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://vimeo.com/297124334?autoplay=1", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://vimeo.com/476306167?autoplay=1", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://vimeo.com/515893651?autoplay=1", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(StopNamedCapture(start_capture_task_name="vimeo_capture_proxy"))
    .then(UploadToWebDav(
        filepaths={"vimeo_capture_proxy.pcap"},
        endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n4/A/vimeo_capture_proxy",
        username="uploader",
        password="uploader"
    ))
    .then(StartCapture(filepath="twitch_capture_proxy.pcap", name="twitch_capture_proxy"))
    .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(WatchTwitchStream("https://twitch.tv/video/2322690366", 10, webdriver_arguments=["--proxy-server=http://4.tcp.us-cal-1.ngrok.io:16686,direct://"]))
    .then(StopNamedCapture(start_capture_task_name="twitch_capture_proxy"))
    .then(UploadToWebDav(
        filepaths={"twitch_capture_proxy.pcap"},
        endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n4/A/twitch_capture_proxy",
        username="uploader",
        password="uploader"
    ))
)

In [4]:
netunicorn_login = 'cs190n4'
netunicorn_password = 'zvL89JkW'

NETUNICORN_ENDPOINT = os.environ.get('NETUNICORN_ENDPOINT', 'https://pinot.cs.ucsb.edu/netunicorn')
NETUNICORN_LOGIN = os.environ.get('NETUNICORN_LOGIN', netunicorn_login)
NETUNICORN_PASSWORD = os.environ.get('NETUNICORN_PASSWORD', netunicorn_password)

client = RemoteClient(endpoint=NETUNICORN_ENDPOINT, login=NETUNICORN_LOGIN, password=NETUNICORN_PASSWORD)

In [5]:
nodes = client.get_nodes()
working_nodes = nodes.filter(lambda node: node.name.startswith("aws")).take(10)
working_nodes

[aws-fargate-A-cs190n4-1,
 aws-fargate-B-cs190n4-2,
 aws-fargate-ARM64-cs190n4-3,
 aws-fargate-A-cs190n4-4,
 aws-fargate-B-cs190n4-5,
 aws-fargate-ARM64-cs190n4-6,
 aws-fargate-A-cs190n4-7,
 aws-fargate-B-cs190n4-8,
 aws-fargate-ARM64-cs190n4-9,
 aws-fargate-A-cs190n4-10]

In [6]:
experiment = Experiment().map(pipeline, working_nodes)
experiment

 - Deployment: Node=aws-fargate-A-cs190n4-1, executor_id=, prepared=False, error=None
 - Deployment: Node=aws-fargate-B-cs190n4-2, executor_id=, prepared=False, error=None
 - Deployment: Node=aws-fargate-ARM64-cs190n4-3, executor_id=, prepared=False, error=None
 - Deployment: Node=aws-fargate-A-cs190n4-4, executor_id=, prepared=False, error=None
 - Deployment: Node=aws-fargate-B-cs190n4-5, executor_id=, prepared=False, error=None
 - Deployment: Node=aws-fargate-ARM64-cs190n4-6, executor_id=, prepared=False, error=None
 - Deployment: Node=aws-fargate-A-cs190n4-7, executor_id=, prepared=False, error=None
 - Deployment: Node=aws-fargate-B-cs190n4-8, executor_id=, prepared=False, error=None
 - Deployment: Node=aws-fargate-ARM64-cs190n4-9, executor_id=, prepared=False, error=None
 - Deployment: Node=aws-fargate-A-cs190n4-10, executor_id=, prepared=False, error=None

In [7]:
for deployment in experiment:
    deployment.environment_definition.commands = list(dict.fromkeys(deployment.environment_definition.commands))

In [8]:
from netunicorn.base import DockerImage
for deployment in experiment:
    # you can explore the image on the DockerHub
    deployment.environment_definition = DockerImage(image='satyandraguthula/netunicorn_images')

In [9]:
experiment_label = "team-4-experiment-A"

In [10]:
try:
    client.delete_experiment(experiment_label)
except RemoteClientException:
    pass

client.prepare_experiment(experiment, experiment_label)

while True:
    info = client.get_experiment_status(experiment_label)
    print(info.status)
    if info.status == ExperimentStatus.READY:
        break
    time.sleep(20)

ExperimentStatus.PREPARING
ExperimentStatus.READY


In [13]:
for deployment in client.get_experiment_status(experiment_label).experiment:
    print(f"Prepared: {deployment.prepared}, error: {deployment.error}")

Prepared: True, error: None
Prepared: True, error: None
Prepared: True, error: None
Prepared: True, error: None
Prepared: True, error: None
Prepared: True, error: None
Prepared: True, error: None
Prepared: True, error: None
Prepared: True, error: None
Prepared: True, error: None


In [14]:
client.start_execution(experiment_label)

while True:
    info = client.get_experiment_status(experiment_label)
    print(info.status)
    if info.status != ExperimentStatus.RUNNING:
        break
    time.sleep(20)

ExperimentStatus.RUNNING
ExperimentStatus.FINISHED


In [16]:
from returns.pipeline import is_successful
from returns.result import Result

for report in info.execution_result:
    print(f"Node name: {report.node.name}")
    print(f"Error: {report.error}")

    result, log = report.result  # report stores results of execution and corresponding log
    
    # result is a returns.result.Result object, could be Success of Failure
    # or None is error occured during execution
    print(f"Result is: {type(result)}")
    if isinstance(result, Result):
        data = result.unwrap() if is_successful(result) else result.failure()
        for key, value in data.items():
            print(f"{key}: {value}")

    # we also can explore logs
    for line in log:
        print(line.strip())
    print()

Node name: aws-fargate-A-cs190n4-1
Error: None
Result is: <class 'returns.result.Failure'>
youtube_capture_proxy: [<Success: 7>]
c05358b2-50fa-45bf-b904-6c106be39359: [<Success: Video probably finished by timeout: 10 seconds>]
11d2b11a-b95f-4eba-9539-c89d20a2a2fa: [<Failure: Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/netunicorn/executor/utils.py", line 32, in decorator
    result = function(*args, **kwargs)
  File "/Users/tiger/PycharmProjects/ucsb-cs190n-f24-group-4/.venv/lib/python3.10/site-packages/netunicorn/library/tasks/video_watchers/twitch_watcher.py", line 103, in run
  File "/Users/tiger/PycharmProjects/ucsb-cs190n-f24-group-4/.venv/lib/python3.10/site-packages/netunicorn/library/tasks/video_watchers/twitch_watcher.py", line 39, in watch
  File "/usr/local/lib/python3.10/site-packages/selenium/webdriver/remote/webdriver.py", line 393, in get
    self.execute(Command.GET, {"url": url})
  File "/usr/local/lib/python3.10/site-packages/sele

TypeError: cannot unpack non-iterable NoneType object

In [15]:
directory = "/mnt/md0/cs190n/cs190n4/A"

for capture in os.listdir(directory):
    for execution in os.listdir(str(os.path.join(directory, capture))):
        if capture.ends("_proxy"):
            %mkdir -p ./data/A/{capture}
            for file in os.listdir(str(os.path.join(directory, capture, execution))):
                if file.endswith(".pcap"):
                    print(os.path.join(directory, capture, execution, file))
                    !docker run -v {os.path.join(directory, capture, execution)}:/tmp/input -v .:/tmp/output mielverkerken/cicflowmeter /tmp/input/{file} /tmp/output > /dev/null 2>&1
                    !mv ./{file}_Flow.csv ./data/A/{capture}/{execution}-{file}.csv