Skip to content

raulnegrean/dsps-benchmark

Repository files navigation

Benchmarking Distributed Stream Processing Systems [DSPS]

This project benchmarks 2 DSPSs using two pipeline types to compare how these systems compare in various scenarios. To achieve this, the following DSPSs were selected:

Furthermore, two data pipelines were conceived each providing an implementation for each DSPS:

  1. Taxi Pipeline: leverages the New York City (NYC) Taxi & Limousine Commission (TLC) data sets to analyse traffic congestion per NYC borough by computing the average speed for batches of 1000 yellow cab taxi rides.
  2. Wikipedia Pipeline: computes top 10 reference backlinks per batches of 1000 randomly selected wikipedia pages.

Requirements

Python virtual environment

For better isolation, a python virtual environment is strongly recommended. For more details on virtual environments check out the python documentation.

  1. Create a new virtual environment:
python3 -m venv .venv
  1. Activate the virtual environment:
source .venv/bin/activate
  1. To deactivate the virtual environment run the following command:
deactivate

Python dependencies

Once your virtual environment is active, install the project dependencies using the following command:

pip install -r requirements.txt

Warning Installing the python dependencies is required to be able to run the ansible playbooks as well as the various python scripts (e.g. generate reports, charts, download parquet NYCTLC data sets, etc.).

Provisioning Cluster

Use the provided ansible playbooks to provision your cluster.

NTP Synchronization

Warning

The NTP server & client provisioning task works under the assumption that workers are using systemd-timesyncd as an NTP client.

After provisioning the cluster, the manager node runs an NTP server. To check the server status run the following command on the manager node:

sudo systemctl status ntp

This command should output:

● ntp.service - Network Time Service
     Loaded: loaded (/lib/systemd/system/ntp.service; enabled; vendor preset: enabled)
     Active: active (running) since Sun 2023-12-17 00:27:50 UTC; 29min ago
       Docs: man:ntpd(8)
    Process: 239119 ExecStart=/usr/lib/ntp/ntp-systemd-wrapper (code=exited, status=0/SUCCESS)
   Main PID: 239138 (ntpd)
      Tasks: 2 (limit: 4432)
     Memory: 1.6M
     CGroup: /system.slice/ntp.service
             └─239138 /usr/sbin/ntpd -p /var/run/ntpd.pid -g -u 112:119

Dec 17 00:27:56 ubuntu-1 ntpd[239138]: Soliciting pool server 216.232.132.95
Dec 17 00:27:57 ubuntu-1 ntpd[239138]: Soliciting pool server 185.125.190.58
Dec 17 00:27:57 ubuntu-1 ntpd[239138]: Soliciting pool server 2607:f748:203:1e::123
Dec 17 00:27:58 ubuntu-1 ntpd[239138]: Soliciting pool server 185.125.190.57
Dec 17 00:27:59 ubuntu-1 ntpd[239138]: Soliciting pool server 2620:2d:4000:1::41
Dec 17 00:37:49 ubuntu-1 ntpd[239138]: 91.189.91.157 local addr 192.168.0.211 -> <null>
Dec 17 00:37:50 ubuntu-1 ntpd[239138]: 185.125.190.56 local addr 192.168.0.211 -> <null>
Dec 17 00:38:03 ubuntu-1 ntpd[239138]: 142.4.192.253 local addr 192.168.0.211 -> <null>
Dec 17 00:39:05 ubuntu-1 ntpd[239138]: 192.99.168.180 local addr 192.168.0.211 -> <null>
Dec 17 00:39:13 ubuntu-1 ntpd[239138]: 216.232.132.95 local addr 192.168.0.211 -> <null>

Next, we can use the ntpq cli tool to query the NTP server & monitor the NTP operations (see man ntpq for details).

E.g. to see the list of clients run:

ntpq -c mrulist

This command should output:

Ctrl-C will stop MRU retrieval and display partial results.
Retrieved 20 unique MRU entries and 0 updates.
lstint avgint rstr r m v  count rport remote address
==============================================================================
    16     25  390 . 3 4      7 37372 ubuntu-4
    16     25  390 . 3 4      7 56918 ubuntu-2
    16     25  390 . 3 4      7 32942 ubuntu-3
    49     16  380 . 4 4      9   123 time.cloudflare.com
    50     16  380 . 4 4      9   123 nms.switch.ca
    52     15  380 . 4 4      9   123 ntp.nyy.ca
    53     15  380 . 4 4      9   123 ntp2.torix.ca
    53     15  380 . 4 4      9   123 time.nrc.ca
    54     16  380 . 4 4      9   123 backoffice-1.incentre.net
    55     15  380 . 4 4      9   123 speedtest.switch.ca
    55     15  380 . 4 4      9   123 vps-a532d6d8.vps.ovh.ca
    55     17  380 . 4 4      8   123 ca.funfile.org
    56     45  380 . 4 4      3   123 185.125.190.57 (prod-ntp-4.ntp4.ps5.canonical.com)
    56     15  380 . 4 4      9   123 s216-232-132-102.bc.hsia.telus.net
    56     15  380 . 4 4      9   123 71-17-253-178.regn.static.sasknet.sk.ca
    56     15  380 . 4 4      9   123 drax.kayaks.hungrycats.org
    56     15  380 . 4 4      9   123 64.ip-54-39-23.net
    58     15  380 . 4 4      9   123 time.cloudflare.com
    59     15  380 . 4 4      9   123 muug.ca
    60     32  380 . 4 4      4   123 alphyn.canonical.com

To see the server ntp status, run:

ntptime

This command should output:

ntp_gettime() returns code 0 (OK)
  time e928dbde.1da1db88  Sun, Dec 17 2023  2:30:54.115, (.115751227),
  maximum error 289040 us, estimated error 254 us, TAI offset 37
ntp_adjtime() returns code 0 (OK)
  modes 0x0 (),
  offset 246.770 us, frequency -16.635 ppm, interval 1 s,
  maximum error 289040 us, estimated error 254 us,
  status 0x2001 (PLL,NANO),
  time constant 7, precision 0.001 us, tolerance 500 ppm,

The consult the status on the worker ntp client, run the following command on the worker nodes:

sudo systemctl status systemd-timesyncd.service

This command should output:

● systemd-timesyncd.service - Network Time Synchronization
     Loaded: loaded (/lib/systemd/system/systemd-timesyncd.service; enabled; vendor preset: enabled)
     Active: active (running) since Sun 2023-12-17 01:42:17 UTC; 50min ago
       Docs: man:systemd-timesyncd.service(8)
   Main PID: 266825 (systemd-timesyn)
     Status: "Initial synchronization to time server 192.168.0.211:123 (192.168.0.211)."
      Tasks: 2 (limit: 4432)
     Memory: 1.5M
     CGroup: /system.slice/systemd-timesyncd.service
             └─266825 /lib/systemd/systemd-timesyncd

Dec 17 01:42:17 ubuntu-2 systemd[1]: Starting Network Time Synchronization...
Dec 17 01:42:17 ubuntu-2 systemd[1]: Started Network Time Synchronization.
Dec 17 01:42:33 ubuntu-2 systemd-timesyncd[266825]: Initial synchronization to time server 192.168.0.211:123 (192.168.0.211).

To consult the client ntp sync status, run the following command:

timedatectl timesync-status

This command should output:

       Server: 192.168.0.211 (192.168.0.211)
Poll interval: 32s (min: 16s; max 32s)      
         Leap: normal                       
      Version: 4                            
      Stratum: 2                            
    Reference: CE6C0084                     
    Precision: 1us (-23)                    
Root distance: 196.380ms (max: 5s)          
       Offset: +4.258ms                     
        Delay: 5.176ms                      
       Jitter: 13.837ms                     
 Packet count: 4   

To consult the client config, run:

timedatectl show-timesync --all

This should output:

LinkNTPServers=
SystemNTPServers=192.168.0.211
FallbackNTPServers=ntp.ubuntu.com
ServerName=192.168.0.211
ServerAddress=192.168.0.211
RootDistanceMaxUSec=5s
PollIntervalMinUSec=16s
PollIntervalMaxUSec=32s
PollIntervalUSec=32s
NTPMessage={ Leap=0, Version=4, Mode=4, Stratum=3, Precision=-23, RootDelay=12.924ms, RootDispersion=8.956ms, Reference=36271740, OriginateTimestamp=Sun 2023-12-17 02:34:26 UTC, ReceiveTimestamp=Sun 2023-12-17 02:34:26 UTC, TransmitTimestamp=Sun 2023-12-17 02:34:26 UTC, DestinationTimestamp=Sun 2023-12-17 02:34:26 UTC, Ignored=no PacketCount=97, Jitter=3.226ms }

Pipeline Execution

See build & execution details here:

Reporting Scripts

The results folder contains the execution reports of each pipeline run as well as as each tracked metric chart.

A python virtual environment is strongly recommended when installing the script dependencies maintained in the requirements.txt file. See details here.

Execution reports

To generate an execution report after an Apache Flink or Apache Storm pipeline run, launch the following script:

scripts/generate-report.py pipeline_run_type source_logs_url sink_logs_url

Where

  • pipeline_run_type is one of the following:
    1. tf: taxi pipeline flink execution
    2. ts: taxi pipeline storm execution
    3. tss: taxi pipeline storm-streaming execution
    4. wf: wikipedia pipeline flink execution
    5. ws: wikipedia pipeline storm execution
    6. wss: wikipedia pipeline storm-streaming execution
  • source_logs_url is the URL to worker node that executed the source (or spout in storm lingo) logs
  • sink_logs_url is the URL to worker node that executed the sink (or bolt in storm lingo) logs

E.g.

scripts/generate-report.py \
    ts \
    'http://192.168.0.214:8000/api/v1/log?file=taxi-topology-1-1661017722%2F6700%2Fworker.log' \
    'http://192.168.0.212:8000/api/v1/log?file=taxi-topology-1-1661017722%2F6700%2Fworker.log&length=1000000'

The script will then download the execution logs, extract the execution details, and generate the report in your current working directory.

To get the flink logs, navigate to the flink UI, click on the source or sink operator in the pipeline view, navigate to the task manager logs, right click on the web page and select the Inspect option to open your browser's developer tools & select the network tab. Then, in the UI, deselect & reselect the logs tab & you should see the request sent by flink to fetch the logs. Use this endpoint in as an input to this script.

E.g.

scripts/generate-report.py \
    tf \
    'http://192.168.0.211:8081/taskmanagers/192.168.0.213:38153-7f9519/log' \
    'http://192.168.0.211:8081/taskmanagers/192.168.0.213:38153-7f9519/log'

Charts

To generate the tracked metric svg images, lauch the following script:

scripts/generate-charts.py chart_type reports_dir

Where

  • chart_type is one of the following:
    1. ct generate controlled throughput charts
    2. mc generate maximum throughput charts
  • reports_dir path to directory containing the metric or throughput relevant execution reports

E.g.

scripts/generate-charts.py metrics ./results/executions/controlled-throughput

The script will then read the exqecution reports, extract the execution data points by pipeline run type (e.g. tf, ts, tss, etc.) and throughput value, and generate the charts in your current working directory.

Meetings

See meeting notes here.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published