# Term Project: Checkpoint #2

**Group 8:** Palvi Sabherwal, Hannah Shakouri, Emily Thai

## EXPERIMENT

*Provide a high-level explanation of the experiment(s) that you want to run through netUnicorn/netReplica. What type of data do you need to collect?*

Our project’s goal is to predict download times of video chunks on video streaming platforms to minimize the interruptions caused by fluctuations in network performance. The input of our model will be historic QoS metrics, including throughput, latency, and packet loss. As for the training data we plan on running through netUnicorn/netReplica, we will collect time-series data on historical QoS metrics for video streaming sessions. 

## DATA COLLECTION PIPELINES
*Provide (pseudo)code for the pipeline(s) that you will run for your data collection.*

### Import Statements
*For each task in your pipeline, provide a reference to the implementation of this task that you will use for your data collection.*

These imports statements are our reference to the implementations used in our pipeline tasks. 

In [24]:
import os
import time
import pandas as pd

from netunicorn.client.remote import RemoteClient, RemoteClientException
from netunicorn.base import Experiment, ExperimentStatus, Pipeline
from netunicorn.library.tasks.capture.tcpdump import StartCapture, StopNamedCapture
from netunicorn.library.tasks.upload.fileio import UploadToFileIO
from netunicorn.library.tasks.upload.webdav import UploadToWebDav
from netunicorn.library.tasks.basic import SleepTask
from netunicorn.library.tasks.measurements.ookla_speedtest import SpeedTest
from netunicorn.library.tasks.video_watchers.youtube_watcher import WatchYouTubeVideo
from netunicorn.library.tasks.video_watchers.vimeo_watcher import WatchVimeoVideo
from netunicorn.library.tasks.video_watchers.twitch_watcher import WatchTwitchStream

### Set Up netUnicorn
Choosing a device for our project. Using our group's netUnicorn API credentials.

In [25]:
NETUNICORN_ENDPOINT = os.environ.get('NETUNICORN_ENDPOINT', 'https://pinot.cs.ucsb.edu/netunicorn')
NETUNICORN_LOGIN = os.environ.get('NETUNICORN_LOGIN', 'cs190n8')       
NETUNICORN_PASSWORD = os.environ.get('NETUNICORN_PASSWORD', 'kfazTdrx')

In [20]:
client = RemoteClient(endpoint=NETUNICORN_ENDPOINT, login=NETUNICORN_LOGIN, password=NETUNICORN_PASSWORD)
print("Health Check: {}".format(client.healthcheck()))
nodes = client.get_nodes()
print(nodes)

Health Check: True
[[snl-server-5], <Uncountable node pool with next node template: [aws-fargate-A-cs190n8-, aws-fargate-B-cs190n8-, aws-fargate-ARM64-cs190n8-]>]


In [26]:
working_node = 'snl-server-5'

### Collecting Network Data for Video Streaming
In our collecting network data pipeline we will be collecting packet captures while streaming video for YouTube.

In [27]:
from netunicorn.executor import get_local_executor
executor = get_local_executor(pipeline)

INFO:executor_local:Parsed configuration: Gateway located on fake
INFO:executor_local:Current directory: /home/psabherwal/cs190n-fall-2024/project


In [28]:
pipeline = Pipeline()

# Flag to enable early stopping -- so if any task fails pipeline would go on working
# pipeline.early_stopping = False

# Generate Data for YouTube
pipeline.then(StartCapture(filepath="/tmp/youtube_1min.pcap", name="1min_captureYT"))
for _ in range(5):
    pipeline.then(WatchYouTubeVideo("https://www.youtube.com/watch?v=0g1Q4fBDp2U&pp=ygUMMSBtaW4gdmlkZW9z", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="1min_captureYT"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/youtube_2min.pcap", name="2min_captureYT"))
for _ in range(5):
    pipeline.then(WatchYouTubeVideo("https://www.youtube.com/watch?v=0CmtDk-joT4&ab_channel=Koi", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="2min_captureYT"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/youtube_5min.pcap", name="5min_captureYT"))
for _ in range(5):
    pipeline.then(WatchYouTubeVideo("https://www.youtube.com/watch?v=6-2ra25RVRs&pp=ygUMNSBtaW4gdmlkZW9z", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="5min_captureYT"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/youtube_10min.pcap", name="10min_captureYT"))
for _ in range(5):
    pipeline.then(WatchYouTubeVideo("https://www.youtube.com/watch?v=vCkhJeom7zU&pp=ygUNMTAgbWluIHZpZGVvcw%3D%3D", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="10min_captureYT"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/youtube_15min.pcap", name="15min_captureYT"))
for _ in range(5):
    pipeline.then(WatchYouTubeVideo("https://www.youtube.com/watch?v=co47u19cbds&pp=ygUNMTUgbWluIHZpZGVvcw%3D%3D", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="15min_captureYT"))

# pipeline.then(UploadToWebDav(filepaths={"/tmp/youtube_1min.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/youtube_capture", username="test", password="test"))
# pipeline.then(UploadToWebDav(filepaths={"/tmp/youtube_2min.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/youtube_capture", username="test", password="test"))
# pipeline.then(UploadToWebDav(filepaths={"/tmp/youtube_5min.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/youtube_capture", username="test", password="test"))
# pipeline.then(UploadToWebDav(filepaths={"/tmp/youtube_10min.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/youtube_capture", username="test", password="test"))
# pipeline.then(UploadToWebDav(filepaths={"/tmp/youtube_15min.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/youtube_capture", username="test", password="test"))

Pipeline(3efd2398-1da5-4705-925e-1a42205c17ea): {'root': [<netunicorn.library.tasks.capture.tcpdump.StartCapture object at 0x7fcf8a24f9a0>], 1: [<netunicorn.library.tasks.video_watchers.youtube_watcher.WatchYouTubeVideo object at 0x7fcf8a24f730>], 2: [<netunicorn.library.tasks.video_watchers.youtube_watcher.WatchYouTubeVideo object at 0x7fcf8a24f640>], 3: [<netunicorn.library.tasks.video_watchers.youtube_watcher.WatchYouTubeVideo object at 0x7fcf8a24f610>], 4: [<netunicorn.library.tasks.video_watchers.youtube_watcher.WatchYouTubeVideo object at 0x7fcf8a24f5b0>], 5: [<netunicorn.library.tasks.video_watchers.youtube_watcher.WatchYouTubeVideo object at 0x7fcf8a24f550>], 6: [<netunicorn.library.tasks.capture.tcpdump.StopNamedCapture object at 0x7fcf8a24f670>], 7: [<netunicorn.library.tasks.basic.SleepTask with name 5fff0e6f-adde-4d33-ac22-62a71b7efe30>], 8: [<netunicorn.library.tasks.capture.tcpdump.StartCapture object at 0x7fcf8a24f700>], 9: [<netunicorn.library.tasks.video_watchers.youtu

In [29]:
executor()

ERROR:executor_local:asyncio.run() cannot be called from a running event loop
Traceback (most recent call last):
  File "/srv/netunicorn/netunicorn-executor/src/netunicorn/executor/executor.py", line 134, in __call__
    asyncio.run(self.execute())
  File "/usr/lib/python3.10/asyncio/runners.py", line 33, in run
    raise RuntimeError(
RuntimeError: asyncio.run() cannot be called from a running event loop
CRITICAL:executor_local:Failed to execute the graph. Shutting down.
INFO:executor_local:Skipping reporting results due to execution graph setting.


In [30]:
pipeline = Pipeline()
# Flag to enable early stopping -- so if any task fails pipeline would go on working
# pipeline.early_stopping = False

# Generate Data for Vimeo
pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/vimeo_1min.pcap", name="1min_captureVM"))
for _ in range(5):
    pipeline.then(WatchVimeoVideo("https://vimeo.com/820625227?autoplay=1", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="1min_captureVM"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/vimeo_2min.pcap", name="2min_captureVM"))
for _ in range(5):
    pipeline.then(WatchVimeoVideo("https://vimeo.com/867196026", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="2min_captureVM"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/vimeo_5min.pcap", name="5min_captureVM"))
for _ in range(5):
    pipeline.then(WatchVimeoVideo("https://vimeo.com/391703912", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="5min_captureVM"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/vimeo_10min.pcap", name="10min_captureVM"))
for _ in range(5):
    pipeline.then(WatchVimeoVideo("https://vimeo.com/675873896", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="10min_captureVM"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/vimeo_15min.pcap", name="15min_captureVM"))
for _ in range(5):
    pipeline.then(WatchVimeoVideo("https://vimeo.com/1031379349", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="15min_captureVM"))

#pipeline.then(UploadToWebDav(filepaths={"/tmp/vimeo_1min.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/vimeo_capture", username="test", password="test"))
#pipeline.then(UploadToWebDav(filepaths={"/tmp/vimeo_2min.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/vimeo_capture", username="test", password="test"))
#pipeline.then(UploadToWebDav(filepaths={"/tmp/vimeo_5min.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/vimeo_capture", username="test", password="test"))
#pipeline.then(UploadToWebDav(filepaths={"/tmp/vimeo_10min.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/vimeo_capture", username="test", password="test"))
#pipeline.then(UploadToWebDav(filepaths={"/tmp/vimeo_15min.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/vimeo_capture", username="test", password="test"))

Pipeline(2e55f1be-6e0a-4520-bd8e-6129df6bb187): {'root': [<netunicorn.library.tasks.basic.SleepTask with name 729d6002-4479-4e3c-be01-a8111592331c>], 1: [<netunicorn.library.tasks.capture.tcpdump.StartCapture object at 0x7fcf8a20add0>], 2: [<netunicorn.library.tasks.video_watchers.vimeo_watcher.WatchVimeoVideo object at 0x7fcf8a20a4a0>], 3: [<netunicorn.library.tasks.video_watchers.vimeo_watcher.WatchVimeoVideo object at 0x7fcf8a20a560>], 4: [<netunicorn.library.tasks.video_watchers.vimeo_watcher.WatchVimeoVideo object at 0x7fcf8a209840>], 5: [<netunicorn.library.tasks.video_watchers.vimeo_watcher.WatchVimeoVideo object at 0x7fcf8a209bd0>], 6: [<netunicorn.library.tasks.video_watchers.vimeo_watcher.WatchVimeoVideo object at 0x7fcf8a208d60>], 7: [<netunicorn.library.tasks.capture.tcpdump.StopNamedCapture object at 0x7fcf8a20ada0>], 8: [<netunicorn.library.tasks.basic.SleepTask with name f0f45126-efc2-4d20-a5cd-7904ebfe23b2>], 9: [<netunicorn.library.tasks.capture.tcpdump.StartCapture ob

In [31]:
pipeline = Pipeline()
# Flag to enable early stopping -- so if any task fails pipeline would go on working
# pipeline.early_stopping = False

# Generate Data for Twitch
pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/twitch_capture1.pcap", name="1_captureTW"))
for _ in range(5):
    pipeline.then(WatchTwitchStream("https://www.twitch.tv/emilycc/clip/BoringGloriousGoblinOMGScoots-HzN323BdbgWMC8z2", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="1_captureTW"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/twitch_capture2.pcap", name="2_captureTW"))
for _ in range(5):
    pipeline.then(WatchTwitchStream("https://www.twitch.tv/chess24/clip/EvilHedonisticLadiesDBstyle-Qvx2Iv5rD18TBkMc", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="2_captureTW"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/twitch_capture3.pcap", name="3_captureTW"))
for _ in range(5):
    pipeline.then(WatchTwitchStream("https://www.twitch.tv/dinabelenkaya/clip/BlueArtisticLarkDendiFace-6GsaEv3Rt8vpRoTL", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="3_captureTW"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/twitch_capture4.pcap", name="4_captureTW"))
for _ in range(5):
    pipeline.then(WatchTwitchStream("https://www.twitch.tv/chess24/clip/PlayfulEndearingOpossumPeoplesChamp-d1sejwuAMspNlEz7", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="4_captureTW"))

pipeline.then(SleepTask(2))

pipeline.then(StartCapture(filepath="/tmp/twitch_capture5.pcap", name="5_captureTW"))
for _ in range(5):
    pipeline.then(WatchTwitchStream("", 10))
pipeline.then(StopNamedCapture(start_capture_task_name="5_captureTW"))

#pipeline.then(UploadToWebDav(filepaths={"/tmp/twitch_capture1.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/twitch_capture", username="test", password="test"))
#pipeline.then(UploadToWebDav(filepaths={"/tmp/twitch_capture2.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/twitch_capture", username="test", password="test"))
#pipeline.then(UploadToWebDav(filepaths={"/tmp/twitch_capture3.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/twitch_capture", username="test", password="test"))
#pipeline.then(UploadToWebDav(filepaths={"/tmp/twitch_capture4.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/twitch_capture", username="test", password="test"))
#pipeline.then(UploadToWebDav(filepaths={"/tmp/twitch_capture5.pcap"}, endpoint="http://snl-server-5.cs.ucsb.edu/cs190n/cs190n8/twitch_capture", username="test", password="test"))

Pipeline(cdec3207-a8a9-4c7e-b296-ab5d5a3e8039): {'root': [<netunicorn.library.tasks.basic.SleepTask with name 39b551b4-a0a3-4e33-ac94-f9f4474fda21>], 1: [<netunicorn.library.tasks.capture.tcpdump.StartCapture object at 0x7fcf8a20ab00>], 2: [<netunicorn.library.tasks.video_watchers.twitch_watcher.WatchTwitchStream object at 0x7fcf8a2099c0>], 3: [<netunicorn.library.tasks.video_watchers.twitch_watcher.WatchTwitchStream object at 0x7fcf8a20a590>], 4: [<netunicorn.library.tasks.video_watchers.twitch_watcher.WatchTwitchStream object at 0x7fcf8a20a170>], 5: [<netunicorn.library.tasks.video_watchers.twitch_watcher.WatchTwitchStream object at 0x7fcf8a24d930>], 6: [<netunicorn.library.tasks.video_watchers.twitch_watcher.WatchTwitchStream object at 0x7fcf8a24d900>], 7: [<netunicorn.library.tasks.capture.tcpdump.StopNamedCapture object at 0x7fcf8a20bfd0>], 8: [<netunicorn.library.tasks.basic.SleepTask with name f8d02915-20bf-4887-9405-c1ebf1a80d93>], 9: [<netunicorn.library.tasks.capture.tcpdump.

In [7]:
client = RemoteClient(endpoint=NETUNICORN_ENDPOINT, login=NETUNICORN_LOGIN, password=NETUNICORN_PASSWORD)
print("Health Check: {}".format(client.healthcheck()))
nodes = client.get_nodes()
print(nodes)

Health Check: True
[[snl-server-5], <Uncountable node pool with next node template: [aws-fargate-A-cs190n8-, aws-fargate-B-cs190n8-, aws-fargate-ARM64-cs190n8-]>]


In [32]:
working_nodes = nodes.filter(lambda node: node.name.startswith(working_node)).take(1)

# Creating the experiment
experiment = Experiment().map(pipeline, working_nodes)
print(experiment)

 - Deployment: Node=snl-server-5, executor_id=, prepared=False, error=None


### Preparing the Experiment
We will use a predefined DockerImage.

In [46]:
from netunicorn.base import DockerImage
for deployment in experiment:
    # you can explore the image on the DockerHub
    deployment.environment_definition = DockerImage(image='speeeday/chromium-speedtest:0.3.1')

In [47]:
experiment_label = "datacoll"

Now we can prepare the experiment, check for any errors and execute.

In [48]:
try:
    client.delete_experiment(experiment_label)
except RemoteClientException:
    pass

client.prepare_experiment(experiment, experiment_label)

while True:
    info = client.get_experiment_status(experiment_label)
    print(info.status)
    if info.status == ExperimentStatus.READY:
        break
    time.sleep(20)

ExperimentStatus.PREPARING
ExperimentStatus.PREPARING
ExperimentStatus.READY


In [49]:
for deployment in client.get_experiment_status(experiment_label).experiment:
    print(f"Prepared: {deployment.prepared}, error: {deployment.error}")

Prepared: True, error: None


In [50]:
client.start_execution(experiment_label)

while True:
    info = client.get_experiment_status(experiment_label)
    print(info.status)
    if info.status != ExperimentStatus.RUNNING:
        break
    time.sleep(20)

ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.FINISHED


In [51]:
from returns.pipeline import is_successful

for report in info.execution_result:
    print(f"Node name: {report.node.name}")
    print(f"Error: {report.error}")

    result, log = report.result  # report stores results of execution and corresponding log
    
    # result is a returns.result.Result object, could be Success of Failure
    print(f"Result is: {type(result)}")
    data = result.unwrap() if is_successful(result) else result.failure()
    for key, value in data.items():
        print(f"{key}: {value}")

    # we also can explore logs
    for line in log:
        print(line.strip())
    print()

Node name: snl-server-5
Error: Executor timed out


TypeError: cannot unpack non-iterable NoneType object

## DATA COLLECTION UPDATE
*Provide an update on the status of your data collection. Have you been able to successfully collect and label a packet trace collected through netunicorn for your project? If yes, how many traces have you collected? Do you plan to scale up your data collection any further?*

Currently, we have collected the data of five videos from each platform: YouTube, Vimeo, and Twitch. For YouTube and Vimeo, each video has a different duration (1 min, 2 min, 5 min, 10 min, and 15 min). For Twitch, each video is around 1 minute long. In total, we have collected 15 packet traces (.pcap files). Bellow we have labeled the packet traces we have collected so far. We are considering expanding our data collection (by 1-2 videos) for each platform.

In [16]:
df_youtube1 = pd.read_csv("/mnt/md0/cs190n/cs190n8/youtube_1min_capture_ISCX.csv")
df_youtube2 = pd.read_csv("/mnt/md0/cs190n/cs190n8/youtube_2min_capture_ISCX.csv")
df_youtube5 = pd.read_csv("/mnt/md0/cs190n/cs190n8/youtube_5min_capture_ISCX.csv")
df_youtube10 = pd.read_csv("/mnt/md0/cs190n/cs190n8/youtube_10min_capture_ISCX.csv")
df_youtube15 = pd.read_csv("/mnt/md0/cs190n/cs190n8/youtube_15min_capture_ISCX.csv")
#df_youtube = pd.concat([df_youtube1, df_youtube2, df_youtube5, df_youtube10, df_youtube15], axis=1)

df_vimeo1 = pd.read_csv("/mnt/md0/cs190n/cs190n8/vimeo_1min_capture_ISCX.csv")
df_vimeo2 = pd.read_csv("/mnt/md0/cs190n/cs190n8/vimeo_2min_capture_ISCX.csv")
df_vimeo5 = pd.read_csv("/mnt/md0/cs190n/cs190n8/vimeo_5min_capture_ISCX.csv")
df_vimeo10 = pd.read_csv("/mnt/md0/cs190n/cs190n8/vimeo_10min_capture_ISCX.csv")
df_vimeo15 = pd.read_csv("/mnt/md0/cs190n/cs190n8/vimeo_15min_capture_ISCX.csv")
#df_vimeo = pd.concat([df_vimeo1, df_vimeo2, df_vimeo5, df_vimeo10, df_vimeo15], axis=1)

df_twitch1 = pd.read_csv("/mnt/md0/cs190n/cs190n8/twitch_capture1_ISCX.csv")
df_twitch2 = pd.read_csv("/mnt/md0/cs190n/cs190n8/twitch_capture2_ISCX.csv")
df_twitch3 = pd.read_csv("/mnt/md0/cs190n/cs190n8/twitch_capture3_ISCX.csv")
df_twitch4 = pd.read_csv("/mnt/md0/cs190n/cs190n8/twitch_capture4_ISCX.csv")
df_twitch5 = pd.read_csv("/mnt/md0/cs190n/cs190n8/twitch_capture5_ISCX.csv")
#df_twitch = pd.concat([df_twitch1, df_twitch2, df_twitch3, df_twitch4, df_twitch5], axis=1)

#print(df_youtube.columns)
#print(df_vimeo.columns)
#print(df_twitch.columns)

FileNotFoundError: [Errno 2] No such file or directory: '/mnt/md0/cs190n-test/youtube_video_capture_ISCX.csv'

In [None]:
import pandas as pd
import numpy as np
import pickle
import argparse

In [None]:
VIDEO_DURATION = 180180
PKT_BYTES = 1500
MILLION = 1000000
PAST_CHUNKS = 8
FUTURE_CHUNKS = 5

In [None]:
def prepare_raw_data(video_sent_path, video_acked_path, time_start=None, time_end=None):
    """
    Load data from files and calculate chunk transmission times.
    """
    video_sent_df = pd.read_csv(video_sent_path)
    video_acked_df = pd.read_csv(video_acked_path)

    # Rename "time (ns GMT)" to "time" for convenience
    video_sent_df.rename(columns={'time (ns GMT)': 'time'}, inplace=True)
    video_acked_df.rename(columns={'time (ns GMT)': 'time'}, inplace=True)

    # Convert nanosecond timestamps to datetime
    video_sent_df['time'] = pd.to_datetime(video_sent_df['time'], unit='ns')
    video_acked_df['time'] = pd.to_datetime(video_acked_df['time'], unit='ns')

    # Filter by time range
    if time_start:
        time_start = pd.to_datetime(time_start)
        video_sent_df = video_sent_df[video_sent_df['time'] >= time_start]
        video_acked_df = video_acked_df[video_acked_df['time'] >= time_start]
    if time_end:
        time_end = pd.to_datetime(time_end)
        video_sent_df = video_sent_df[video_sent_df['time'] <= time_end]
        video_acked_df = video_acked_df[video_acked_df['time'] <= time_end]

    # Process the data
    return calculate_trans_times(video_sent_df, video_acked_df)

In [None]:
def calculate_trans_times(video_sent_df, video_acked_df):
    """
    Calculate transmission times from video_sent and video_acked datasets using session_id.
    """
    d = {}
    last_video_ts = {}

    for _, row in video_sent_df.iterrows():
        session = row['session_id']  # Use only session_id to track sessions
        if session not in d:
            d[session] = {}
            last_video_ts[session] = None

        video_ts = int(row['video_ts'])
        if last_video_ts[session] is not None:
            if video_ts != last_video_ts[session] + VIDEO_DURATION:
                continue

        last_video_ts[session] = video_ts
        d[session][video_ts] = {
            'sent_ts': pd.Timestamp(row['time']),
            'size': float(row['size']) / PKT_BYTES,
            'delivery_rate': float(row['delivery_rate']) / PKT_BYTES,
            'cwnd': float(row['cwnd']),
            'in_flight': float(row['in_flight']),
            'min_rtt': float(row['min_rtt']) / MILLION,
            'rtt': float(row['rtt']) / MILLION,
        }

    for _, row in video_acked_df.iterrows():
        session = row['session_id']  # Use only session_id
        if session not in d:
            continue

        video_ts = int(row['video_ts'])
        if video_ts not in d[session]:
            continue

        dsv = d[session][video_ts]
        sent_ts = dsv['sent_ts']
        acked_ts = pd.Timestamp(row['time'])
        dsv['acked_ts'] = acked_ts
        dsv['trans_time'] = (acked_ts - sent_ts).total_seconds()

    return d

In [None]:
prepare_raw_data("/mnt/md0/cs190n/video_sent.csv", "/mnt/md0/cs190n/video_acked.csv")

In [None]:
def append_past_chunks(ds, next_ts, row):
    i = 1
    past_chunks = []
    while i <= PAST_CHUNKS:
        ts = next_ts - i * VIDEO_DURATION
        if ts in ds and 'trans_time' in ds[ts]:
            past_chunks = [ds[ts]['delivery_rate'],
                           ds[ts]['cwnd'], ds[ts]['in_flight'],
                           ds[ts]['min_rtt'], ds[ts]['rtt'],
                           ds[ts]['size'], ds[ts]['trans_time']] + past_chunks
        else:
            nts = ts + VIDEO_DURATION  # padding with the nearest ts
            padding = [ds[nts]['delivery_rate'],
                       ds[nts]['cwnd'], ds[nts]['in_flight'],
                       ds[nts]['min_rtt'], ds[nts]['rtt']]
            if nts == next_ts:
                padding += [0, 0]  # next_ts is the first chunk to send
            else:
                padding += [ds[nts]['size'], ds[nts]['trans_time']]
            break
        i += 1
    if i != PAST_CHUNKS + 1:  # break in the middle; padding must exist
        while i <= PAST_CHUNKS:
            past_chunks = padding + past_chunks
            i += 1
    row += past_chunks

In [None]:
def prepare_input_output(d):
    ret = [{'in': [], 'out': []} for _ in range(5)]  # FUTURE_CHUNKS = 5

    for session in d:
        ds = d[session]

        for next_ts in ds:
            if 'trans_time' not in ds[next_ts]:
                continue

            row = []

            # Append past chunks
            append_past_chunks(ds, next_ts, row)

            # Append the TCP info of the next chunk
            row += [ds[next_ts]['delivery_rate'],
                    ds[next_ts]['cwnd'], ds[next_ts]['in_flight'],
                    ds[next_ts]['min_rtt'], ds[next_ts]['rtt']]

            # Generate FUTURE_CHUNKS rows
            for i in range(5):  # FUTURE_CHUNKS = 5
                row_i = row.copy()

                ts = next_ts + i * VIDEO_DURATION
                if ts in ds and 'trans_time' in ds[ts]:
                    row_i += [ds[ts]['size']]

                    ret[i]['in'].append(row_i)
                    ret[i]['out'].append(ds[ts]['trans_time'])

    return ret

In [None]:
def save_processed_data(output_file, processed_data):
    """
    Save processed data to a file.
    """
    with open(output_file, 'wb') as f:
        pickle.dump(processed_data, f)
    print(f"Processed data saved to {output_file}")

In [None]:
if __name__ == '__main__':
    DEFAULT_VIDEO_SENT_PATH = '/mnt/md0/cs190n/video_sent.csv'
    DEFAULT_VIDEO_ACKED_PATH = '/mnt/md0/cs190n/video_acked.csv'
    DEFAULT_OUTPUT_FILE = '/home/satyandra/output.pkl'
    
    #Latest datasets can be found at https://puffer.stanford.edu/results/
    
    parser = argparse.ArgumentParser(description="Process video streaming datasets.")
    parser.add_argument('--video_sent_path', type=str, help='Path to the video_sent dataset CSV file.')
    parser.add_argument('--video_acked_path', type=str, help='Path to the video_acked dataset CSV file.')
    parser.add_argument('--output_file', type=str, help='Path to save the processed data.')
    parser.add_argument('--time_start', type=str, default=None, help='Start time for filtering data (RFC3339 format).')
    parser.add_argument('--time_end', type=str, default=None, help='End time for filtering data (RFC3339 format).')
    #args = parser.parse_args()
    #processed_data = prepare_input_output(prepare_raw_data(args.video_sent_path, args.video_acked_path,
    #    time_start=args.time_start, time_end=args.time_end))
    # save_processed_data(args.output_file, processed_data)
    processed_data = prepare_input_output(prepare_raw_data(DEFAULT_VIDEO_SENT_PATH, DEFAULT_VIDEO_ACKED_PATH,
        time_start=None, time_end=None))
    save_processed_data(DEFAULT_OUTPUT_FILE, processed_data)

## Packet Trace Features
*Provide a list of features that you will extract from the packet traces for your model.*

Bellow are the features we will extract from our packet traces for our model.

## Training the Model
*Please provide a high-level explanation of the model type that you plan to train / evaluate. Provide a link to the python sci-kit implementation that you plan to use.*

We plan on using the **RandomForestClassifier** to train and evaluate our data collection. The **RandomForestClassifier** is suitable for our project because it can handle high-dimensional data, such as packet trace features, and provides a measure of feature importance. This can be useful in understanding the significance of different features in classifying network traffic.

https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html

In [4]:
%pip install scikit-learn

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [1]:
# required imports
import sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.tree import plot_tree

In [None]:
# separate the data frame to features and answers
target_variable = 'Label'
train_features = list(set(df.columns) - {target_variable})
x_train = df[train_features]
y_train = df[target_variable]

In [None]:
# and start training a classifier
clf = RandomForestClassifier()
clf.fit(x_train, y_train)

In [None]:
y_pred = clf.predict(x_train.values)
print(metrics.classification_report(y_train, y_pred))

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
import pandas as pd

# Assuming `df` is your pandas DataFrame with 6 feature columns and a label column
# Separate features and labels
X = df.drop(columns=["Label"]).values  # Replace 'label' with the name of your label column
y = df["Label"].values

# Encode labels (Vimeo, Twitch, Others) to numerical values
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale the features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Convert data to torch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)

# Define the neural network classifier
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(9, 64)  # Input layer (6 features)
        self.fc2 = nn.Linear(64, 32)  # Hidden layer
        self.fc3 = nn.Linear(32, 3)  # Output layer (3 classes)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Instantiate the model, define loss and optimizer
model = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 200
for epoch in range(num_epochs):
    # Forward pass
    outputs = model(X_train)
    loss = criterion(outputs, y_train)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (epoch+1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# Evaluation
model.eval()
with torch.no_grad():
    test_outputs = model(X_test)
    _, predicted = torch.max(test_outputs, 1)
    accuracy = (predicted == y_test).sum().item() / len(y_test)
    print(f'Accuracy on test set: {accuracy * 100:.2f}%')