In [16]:
import scapy.all as sp
from scapy.all import rdpcap, TCP, IP
from decimal import Decimal, getcontext
import numpy as np
import pandas as pd
import glob
import os
import dnslib

In [2]:
!pip install scapy
!pip install dnslib



# COMMENTED OUT

## Iterating through a pcap trace

In [4]:
pcap_file = "RequetDataSet/A/PCAP_FILES/baseline_Jan17_exp_30.pcap"
# Load the pcap into memory
trace = sp.rdpcap(pcap_file)
# Iterate over the first ten packets
for packet in trace[:1]:
  # Print the IP HEADER
  if packet.haslayer(sp.IP):
    packet.show()
  if sp.TCP in packet:
    print(str(packet[sp.TCP])[:4*packet[sp.TCP].dataofs])

# Try print the TCP Header Only

###[ Ethernet ]### 
  dst       = b8:8d:12:10:aa:8e
  src       = 14:91:82:29:3b:57
  type      = IPv4
###[ IP ]### 
     version   = 4
     ihl       = 5
     tos       = 0x0
     len       = 60
     id        = 0
     flags     = DF
     frag      = 0
     ttl       = 64
     proto     = tcp
     chksum    = 0xb6ac
     src       = 192.168.1.1
     dst       = 192.168.1.190
     \options   \
###[ TCP ]### 
        sport     = microsoft_ds
        dport     = 52202
        seq       = 1667711449
        ack       = 3063488696
        dataofs   = 10
        reserved  = 0
        flags     = SAE
        window    = 14480
        chksum    = 0xba18
        urgptr    = 0
        options   = [('MSS', 1460), ('SAckOK', b''), ('Timestamp', (6670394, 1021018435)), ('NOP', None), ('WScale', b'')]

TCP 192.168.1.1:microsoft_ds > 192.168.1


In [6]:
with sp.PcapReader(pcap_file) as trace:
    count = 0
    for packet in trace:
        count += 1
    print(f"Total packets read: {count}")

Total packets read: 111795


In [18]:
with sp.PcapReader(pcap_file) as trace:
    for packet in trace:
        if packet.haslayer(sp.UDP) and packet[sp.UDP].sport == 53:
            print("DNS response packet found")
            break

    print("No DNS response packet")

DNS response packet found
No DNS response packet


## BASIC packets analysis

In [112]:
def feature_extract(pcap_file):

    def decimal_mean(values):
        """Calculate the mean of a list of Decimal values."""
        total = sum(values)
        count = Decimal(len(values))
        return total / count
    
    def decimal_std(values):
        """Calculate the mean of a list of Decimal values."""
        total = sum(values)
        count = Decimal(len(values))
        
        mean = total/ count
        var = sum((x - mean) ** 2 for x in values) / len(values)
        std = var.sqrt()
        return std

    # Load the pcap file
    packets = rdpcap(pcap_file)

    # Filter TCP packets related to video streaming
    tcp_packets = [pkt for pkt in packets if TCP in pkt and IP in pkt]

    # Define lists to store data for analysis
    chunk_sizes = []
    inter_arrival_times = []
    transfer_times = []
    retransmissions = set()

    total_bits = 0
    start_time = float('inf')
    end_time = float('-inf')

    # Tracking variables
    last_ack = 0
    last_seq = {}
    last_time = tcp_packets[0].time
    chunk_start_time = None

    # Analyzing packets
    for i, packet in enumerate(tcp_packets):
        if TCP in packet:
            total_bits += len(packet[
                            TCP].payload) * 8  # Convert bytes to bits
            start_time = min(start_time, packet.time)
            end_time = max(end_time, packet.time)
        
        seq = packet[TCP].seq
        ack = packet[TCP].ack
        length = len(packet[TCP].payload)
        time = packet.time

        # Track retransmissions
        if seq in last_seq and time - last_seq[seq] < 1:
            retransmissions.add(seq)

        last_seq[seq] = time

        # Track chunk sizes and transfer times
        if ack > last_ack:
            if chunk_start_time is not None:
                transfer_time = time - chunk_start_time
                transfer_times.append(transfer_time)
            
            chunk_size = ack - last_ack
            chunk_sizes.append(chunk_size)
            
            inter_arrival_time = time - last_time
            inter_arrival_times.append(inter_arrival_time)
            
            chunk_start_time = time
            last_time = time

        last_ack = ack

    # Calculate desired metrics
    total_bytes = sum(chunk_sizes)
    max_bytes_per_chunk = np.max(chunk_sizes) if chunk_sizes else 0
    min_bytes_per_chunk = np.min(chunk_sizes) if chunk_sizes else 0
    std_bytes_per_chunk = np.std(chunk_sizes),
    average_bytes_per_chunk = np.mean(chunk_sizes) if chunk_sizes else 0
    average_transfer_time = decimal_mean(transfer_times) if transfer_times else 0
    std_transfer_time = decimal_std(transfer_times) if transfer_times else 0
    average_time_between_chunks = decimal_mean(inter_arrival_times[1:]) if len(inter_arrival_times) > 1 else 0  # Skip first
    std_time_between_chunks = decimal_std(inter_arrival_times[1:]) if len(inter_arrival_times) > 1 else 0
    retransmitted_chunk_numbers = len(retransmissions)

    total_duration = end_time - start_time

    # Calculate the bitrate in bits per second
    if total_duration > 0:
        bitrate = total_bits / total_duration
    else:
        bitrate = 0

    return [pcap_file.name.split('/')[-1], total_bytes, 
            average_bytes_per_chunk, max_bytes_per_chunk, min_bytes_per_chunk, std_bytes_per_chunk, 
            average_transfer_time, std_transfer_time, 
            average_time_between_chunks, std_time_between_chunks, 
            retransmitted_chunk_numbers, bitrate]

    # Print the results
    # print("Total Bytes:", total_bytes)
    # print("Average Bytes per Chunk:", average_bytes_per_chunk)
    # print("Average Chunk Transfer Time:", average_transfer_time)
    # print("Average Time Elapsed Between Chunks:", average_time_between_chunks)
    # print("Retransmitted Chunk Numbers:", retransmitted_chunk_numbers)
    # print(f"Estimated Video Bitrate: {bitrate} bits per second")

# Further handling to calculate GET request response time
# This requires identifying HTTP GET requests and their corresponding responses, which would involve deeper packet inspection and protocol-specific parsing.

In [113]:
df = pd.DataFrame(columns=['name', 'total_bytes', 
                           'average_bytes_per_chunk', 'max_bytes_per_chunk', 'min_bytes_per_chunk', 'std_bytes_per_chunk', 
                           'average_transfer_time', 'std_transfer_time',
                           'average_time_between_chunks', 'std_time_between_chunks', 
                           'retransmitted_chunk_numbers', 'bitrate'])
        

[Stage 17:>                 (0 + 1) / 1][Stage 18:>                 (0 + 1) / 8]

In [5]:
!pip install tqdm



In [None]:
# from tqdm import tqdm
# import glob
# import concurrent.futures


# def process_file(filename):
#     with open(filename, 'rb') as f:
#         return feature_extract(f)

# # 文件路径列表
# files = list(glob.glob("PCAP_FILES/baseline_*.pcap"))

# # 使用 ThreadPoolExecutor 来并行处理文件
# with concurrent.futures.ThreadPoolExecutor(max_workers=24) as executor:
#     # 将进度条与 executor.map 结合使用
#     results = list(tqdm(executor.map(process_file, files), total=len(files), desc="Processing PCAP files"))

# # 结果存储
# row_list1 = results

Processing PCAP files:   0%|                                                      | 0/348 [04:40<?, ?it/s]


# SPARK 

In [114]:
from pyspark import SparkContext
import sys
import time
from datetime import datetime
import csv
import math
import os
import json
import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from sklearn import preprocessing, model_selection, metrics
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

def initialize_spark_context(APP_NAME="RegressivePrediction: XGBModel"):
    spark_conf = (
        SparkConf()
        .setAppName(APP_NAME)
        .setAll(
            [
                ("spark.dynamicAllocation.enabled", "true"),
                ("spark.dynamicAllocation.maxExecutors", "10"),
                ("spark.executor.memory", "3g"),
                ("spark.executor.cores", "2"),
                ("spark.executor.memoryOverhead", "3000"),
                ("spark.driver.memory", "4g"),
                ("spark.driver.maxResultSize", "2g"),
                ("spark.python.worker.memory", "2g"),
                ("spark.sql.shuffle.partitions", "20"),
                ("spark.sql.sources.partitionOverWriteMode", "dynamic"),
                ("spark.network.timeout", "600s"),
                ("spark.executor.heartbeatInterval", "120s"),
            ]
        )
    )
    sc = SparkContext(conf=spark_conf)
    sc.setLogLevel("ERROR")
    return sc


In [115]:
sc = initialize_spark_context("pcap_feature_extract")

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pcap_feature_extract, master=local[*]) created by __init__ at /tmp/ipykernel_28/1655754873.py:39 

In [None]:
from scapy.all import rdpcap, TCP, IP
import numpy as np
from decimal import Decimal

In [636]:
def feature_extract(pcap_file):

    def decimal_mean(values):
        """Calculate the mean of a list of Decimal values."""
        total = sum(values)
        count = Decimal(len(values))
        return total / count
    
    def decimal_std(values):
        """Calculate the mean of a list of Decimal values."""
        total = sum(values)
        count = Decimal(len(values))
        
        mean = total/ count
        var = sum((x - mean) ** 2 for x in values) / len(values)
        std = var.sqrt()
        return std
    
    filename = pcap_file.split('/')[-1]

    # Load the pcap file
    try: 
        packets = rdpcap(pcap_file)

        # Filter TCP packets related to video streaming
        tcp_packets = [pkt for pkt in packets if TCP in pkt and IP in pkt]

        # Define lists to store data for analysis
        chunk_sizes = []
        inter_arrival_times = []
        transfer_times = []
        retransmissions = set()

        total_bits = 0
        start_time = float('inf')
        end_time = float('-inf')

        # Tracking variables
        last_ack = 0
        last_seq = {}
        last_time = tcp_packets[0].time
        chunk_start_time = None

        result = []
        
        # Analyzing packets
        for i, packet in enumerate(tcp_packets):
            if TCP in packet:
                total_bits += len(packet[
                                TCP].payload) * 8  # Convert bytes to bits
                start_time = min(start_time, packet.time)
                end_time = max(end_time, packet.time)
            
            seq = packet[TCP].seq
            ack = packet[TCP].ack
            length = len(packet[TCP].payload)
            time = packet.time
            old_time = time
            
            # Track retransmissions
            if seq in last_seq and time - last_seq[seq] < 1:
                retransmissions.add(seq)

            last_seq[seq] = time

            # Track chunk sizes and transfer times
            if ack > last_ack:
                if chunk_start_time is not None:
                    transfer_time = time - chunk_start_time
                    transfer_times.append(transfer_time)
                
                chunk_size = ack - last_ack
                chunk_sizes.append(chunk_size)
                
                inter_arrival_time = time - last_time
                inter_arrival_times.append(inter_arrival_time)
                
                chunk_start_time = time
                last_time = time

            last_ack = ack
            
            if time - old_time > 1:
                old_time = time
            

                # Calculate desired metrics
                total_bytes  = sum(chunk_sizes)
                max_bytes_per_chunk = np.max(chunk_sizes) if chunk_sizes else 0
                min_bytes_per_chunk = np.min(chunk_sizes) if chunk_sizes else 0
                std_bytes_per_chunk = np.std(chunk_sizes) if chunk_sizes else 0
                average_bytes_per_chunk = np.mean(chunk_sizes) if chunk_sizes else 0
                average_transfer_time = decimal_mean(transfer_times) if transfer_times else 0
                std_transfer_time = decimal_std(transfer_times) if transfer_times else 0
                average_time_between_chunks = decimal_mean(inter_arrival_times[1:]) if len(inter_arrival_times) > 1 else 0  # Skip first
                std_time_between_chunks = decimal_std(inter_arrival_times[1:]) if len(inter_arrival_times) > 1 else 0
                retransmitted_chunk_numbers = len(retransmissions)

                total_duration = end_time - start_time

                # Calculate the bitrate in bits per second
                if total_duration > 0:
                    bitrate = total_bits / total_duration
                else:
                    bitrate = 0
                
                
                if chunk_sizes:
                    last_10_chunk_sizes = chunk_sizes[-10:]  # Get the last 10 or fewer chunk sizes
                    avg_last_10 = np.mean(last_10_chunk_sizes)
                    max_last_10 = np.max(last_10_chunk_sizes)
                    min_last_10 = np.min(last_10_chunk_sizes)
                    std_last_10 = np.std(last_10_chunk_sizes)
                    last_100_chunk_sizes = chunk_sizes[-100:]  # Get the last 100 or fewer chunk sizes
                    avg_last_100 = np.mean(last_100_chunk_sizes)
                    max_last_100 = np.max(last_100_chunk_sizes)
                    min_last_100 = np.min(last_100_chunk_sizes)
                    std_last_100 = np.std(last_100_chunk_sizes)
                    first_10_chunk_sizes = chunk_sizes[:10]  # Get the first 100 or fewer chunk sizes
                    avg_first_10 = np.mean(first_10_chunk_sizes)
                    max_first_10 = np.max(first_10_chunk_sizes)
                    min_first_10 = np.min(first_10_chunk_sizes)
                    std_first_10 = np.std(first_10_chunk_sizes)
                    first_100_chunk_sizes = chunk_sizes[:100]  # Get the first 10 or fewer chunk sizes
                    avg_first_100 = np.mean(first_100_chunk_sizes)
                    max_first_100 = np.max(first_100_chunk_sizes)
                    min_first_100 = np.min(first_100_chunk_sizes)
                    std_first_100 = np.std(first_100_chunk_sizes)
                    
                perc_50_all, perc_75_all, perc_85_all, perc_90_all = np.percentile(chunk_sizes, [50, 75, 85, 90])
                perc_50_l10, perc_75_l10, perc_85_l10, perc_90_l10 = np.percentile(last_10_chunk_sizes, [50, 75, 85, 90])
                perc_50_l100, perc_75_l100, perc_85_l100, perc_90_l100 = np.percentile(last_100_chunk_sizes, [50, 75, 85, 90])
                perc_50_f10, perc_75_f10, perc_85_f10, perc_90_f10 = np.percentile(first_10_chunk_sizes, [50, 75, 85, 90])
                perc_50_f100, perc_75_f100, perc_85_f100, perc_90_f100 = np.percentile(first_100_chunk_sizes, [50, 75, 85, 90])
                    

                result.append( [filename, total_bytes, 
                        average_bytes_per_chunk, max_bytes_per_chunk, min_bytes_per_chunk, std_bytes_per_chunk, 
                        average_transfer_time, std_transfer_time, 
                        average_time_between_chunks, std_time_between_chunks, 
                        retransmitted_chunk_numbers, bitrate,
                        
                        perc_50_all, perc_75_all, perc_85_all, perc_90_all, 
                        
                        avg_last_10, max_last_10, min_last_10, std_last_10, 
                        perc_50_l10, perc_75_l10, perc_85_l10, perc_90_l10,
                        
                        avg_last_100, max_last_100, min_last_100, std_last_100, 
                        perc_50_l100, perc_75_l100, perc_85_l100, perc_90_l100,
                        
                        avg_first_10, max_first_10, min_first_10, std_first_10,
                        perc_50_f10, perc_75_f10, perc_85_f10, perc_90_f10,
                        
                        avg_first_100, max_first_100, min_first_100, std_first_100,
                        perc_50_f100, perc_75_f100, perc_85_f100, perc_90_f100 ])

        return result
    
    except Exception as e:
        # print(f'Error handling file {filename}: {e}')
        return [filename] + [0]*19

In [61]:
a1, a2, a3, a4 = np.percentile([1,2,3,4,5,6,7,8,9,10], [50,75,85,90])
aaa = [1,2,3,4,5,6,7,8,9,10]

aaa[-20:]

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [637]:
# 使用glob找到所有pcap文件并创建一个RDD
# files = glob.glob("PCAP_FILES/baseline_*.pcap")
files = '1646709513-h2-servers.c5s3wgsts.hdumb.pharos-exp-a-720-dash-http2.pcap'
pcap_files_rdd = sc.parallelize(files)

# 使用map将feature_extract应用到所有文件
results_rdd = pcap_files_rdd.map(feature_extract)

In [638]:
files

'1646709513-h2-servers.c5s3wgsts.hdumb.pharos-exp-a-720-dash-http2.pcap'

In [639]:
results_rdd = results_rdd.map(lambda x: [
    x[0],  # File - String
    x[1],  # TotalBytes
    x[2],  # AvgBytesPerChunk - Convert numpy.float64 to float
    x[3],  # MaxBytesPerChunk
    x[4],  # MinBytesPerChunk
    float(x[5]),  # StdBytesPerChunk
    float(x[6]),  # AvgTransferTime
    float(x[7]),  # StdTransferTime
    float(x[8]),  # AvgTimeBetweenChunks
    float(x[9]),  # StdTimeBetweenChunks
    x[10],  # RetransmittedChunks
    float(x[11]),  # Bitrate
    x[12], 
    x[13],
    x[14], 
    x[15], 
    x[16],
    x[17], 
    x[18],  
    x[19],  
    x[20],
    x[21],
    x[22],
    x[23],
    x[24],
    x[25],
    x[26],
    x[27],
    x[28],
    x[29],
    x[30],
    x[31],
    x[32],
    x[33],
    x[34],
    x[35],
    x[36],
    x[37],
    x[38],
    x[39],
    x[40],
    x[41],
    x[42],
    x[43],
    x[44],
    x[45],
    x[46],
    x[47],
])

In [640]:
results_rdd.cache()

PythonRDD[163] at RDD at PythonRDD.scala:53

In [644]:
# results_rdd.take(1)
results_rdd.collect()

24/04/22 10:12:34 ERROR Executor: Exception in task 6.0 in stage 39.0 (TID 161)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_28/516252179.py", line 22, in <lambda>
IndexError: list index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.a

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 39.0 failed 1 times, most recent failure: Lost task 6.0 in stage 39.0 (TID 161) (964601d7582c executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_28/516252179.py", line 22, in <lambda>
IndexError: list index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_28/516252179.py", line 22, in <lambda>
IndexError: list index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [643]:
# 存储为文本文件，每个元素为一行
results_rdd.map(lambda x: ','.join(map(str, x))).saveAsTextFile("PCAP_textFile_test1")

24/04/22 10:12:05 ERROR Executor: Exception in task 1.0 in stage 38.0 (TID 148)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_28/516252179.py", line 22, in <lambda>
IndexError: list index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.a

Py4JJavaError: An error occurred while calling o2388.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:105)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1578)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1578)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1564)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:551)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 1 times, most recent failure: Lost task 0.0 in stage 38.0 (TID 147) (964601d7582c executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_28/516252179.py", line 22, in <lambda>
IndexError: list index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83)
	... 50 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_28/516252179.py", line 22, in <lambda>
IndexError: list index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [623]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split

# 创建SparkSession对象
spark = SparkSession.builder.appName("Read Text Data").getOrCreate()

# 读取文本文件
text_df = spark.read.text("PCAP_textFile_test/part-*")

# 解析数据列
# 假设你有11个数据列，分别是字符串，整数，浮点数等类型
parsed_df = text_df.select(
    split(col("value"), ",").getItem(0).alias("File").cast("string"),
    split(col("value"), ",").getItem(1).alias("TotalBytes").cast("long"),
    split(col("value"), ",").getItem(2).alias("AvgBytesPerChunk").cast("float"),
    split(col("value"), ",").getItem(3).alias("MaxBytesPerChunk").cast("float"),
    split(col("value"), ",").getItem(4).alias("MinBytesPerChunk").cast("int"),
    split(col("value"), ",").getItem(5).alias("StdBytesPerChunk").cast("int"),
    split(col("value"), ",").getItem(6).alias("AvgTransferTime").cast("float"),
    split(col("value"), ",").getItem(7).alias("StdTransferTime").cast("float"),
    split(col("value"), ",").getItem(8).alias("AvgTimeBetweenChunks").cast("float"),
    split(col("value"), ",").getItem(9).alias("StdTimeBetweenChunks").cast("float"),
    split(col("value"), ",").getItem(10).alias("RetransmittedChunks").cast("int"),
    split(col("value"), ",").getItem(11).alias("Bitrate").cast("float"),
    split(col("value"), ",").getItem(12).alias("50PercChunkSizeforAll").cast("float"),
    split(col("value"), ",").getItem(13).alias("75PercChunkSizeforAll").cast("float"),
    split(col("value"), ",").getItem(14).alias("85PercChunkSizeforAll").cast("float"),
    split(col("value"), ",").getItem(15).alias("90PercChunkSizeforAll").cast("float"), 
    split(col("value"), ",").getItem(16).alias("AvgChunksizeforlast10").cast("float"), 
    split(col("value"), ",").getItem(17).alias("MaxChunksizeforlast10").cast("float"),
    split(col("value"), ",").getItem(18).alias("MinChunksizeforlast10").cast("float"),
    split(col("value"), ",").getItem(19).alias("StdChunksizeforlast10").cast("float"),
    split(col("value"), ",").getItem(20).alias("50PercChunkSizeforlast10").cast("float"),
    split(col("value"), ",").getItem(21).alias("75PercChunkSizeforlast10").cast("float"),
    split(col("value"), ",").getItem(22).alias("85PercChunkSizeforlast10").cast("float"),
    split(col("value"), ",").getItem(23).alias("90PercChunkSizeforlast10").cast("float"),
    split(col("value"), ",").getItem(24).alias("AvgChunksizeforlast100").cast("float"), 
    split(col("value"), ",").getItem(25).alias("MaxChunksizeforlast100").cast("float"),
    split(col("value"), ",").getItem(26).alias("MinChunksizeforlast100").cast("float"),
    split(col("value"), ",").getItem(27).alias("StdChunksizeforlast100").cast("float"),
    split(col("value"), ",").getItem(28).alias("50PercChunkSizeforlast100").cast("float"),
    split(col("value"), ",").getItem(29).alias("75PercChunkSizeforlast100").cast("float"),
    split(col("value"), ",").getItem(30).alias("85PercChunkSizeforlast100").cast("float"),
    split(col("value"), ",").getItem(31).alias("90PercChunkSizeforlast100").cast("float"),
    split(col("value"), ",").getItem(32).alias("AvgChunksizeforfirst10").cast("float"), 
    split(col("value"), ",").getItem(33).alias("MaxChunksizeforfirst10").cast("float"),
    split(col("value"), ",").getItem(34).alias("MinChunksizeforfirst10").cast("float"),
    split(col("value"), ",").getItem(35).alias("StdChunksizeforfirst10").cast("float"),
    split(col("value"), ",").getItem(36).alias("50PercChunkSizeforfirst10").cast("float"),
    split(col("value"), ",").getItem(37).alias("75PercChunkSizeforfirst10").cast("float"),
    split(col("value"), ",").getItem(38).alias("85PercChunkSizeforfirst10").cast("float"),
    split(col("value"), ",").getItem(39).alias("90PercChunkSizeforfirst10").cast("float"),
    split(col("value"), ",").getItem(40).alias("AvgChunksizeforfirst100").cast("float"), 
    split(col("value"), ",").getItem(41).alias("MaxChunksizeforfirst100").cast("float"),
    split(col("value"), ",").getItem(42).alias("MinChunksizeforfirst100").cast("float"),
    split(col("value"), ",").getItem(43).alias("StdChunksizeforfirst100").cast("float"),
    split(col("value"), ",").getItem(44).alias("50PercChunkSizeforfirst100").cast("float"),
    split(col("value"), ",").getItem(45).alias("75PercChunkSizeforfirst100").cast("float"),
    split(col("value"), ",").getItem(46).alias("85PercChunkSizeforfirst100").cast("float"),
    split(col("value"), ",").getItem(47).alias("90PercChunkSizeforfirst100").cast("float"),
)

# 显示解析后的DataFrame的结构和前几行数据以验证
parsed_df.printSchema()
parsed_df.show(5)

# 可选：保存DataFrame为Parquet或其他格式进行更高效的后续处理
# parsed_df.write.parquet("/path/to/output/parquetFile")

root
 |-- File: string (nullable = true)
 |-- TotalBytes: long (nullable = true)
 |-- AvgBytesPerChunk: float (nullable = true)
 |-- MaxBytesPerChunk: float (nullable = true)
 |-- MinBytesPerChunk: integer (nullable = true)
 |-- StdBytesPerChunk: integer (nullable = true)
 |-- AvgTransferTime: float (nullable = true)
 |-- StdTransferTime: float (nullable = true)
 |-- AvgTimeBetweenChunks: float (nullable = true)
 |-- StdTimeBetweenChunks: float (nullable = true)
 |-- RetransmittedChunks: integer (nullable = true)
 |-- Bitrate: float (nullable = true)
 |-- 50PercChunkSizeforAll: float (nullable = true)
 |-- 75PercChunkSizeforAll: float (nullable = true)
 |-- 85PercChunkSizeforAll: float (nullable = true)
 |-- 90PercChunkSizeforAll: float (nullable = true)
 |-- AvgChunksizeforlast10: float (nullable = true)
 |-- MaxChunksizeforlast10: float (nullable = true)
 |-- MinChunksizeforlast10: float (nullable = true)
 |-- StdChunksizeforlast10: float (nullable = true)
 |-- 50PercChunkSizeforlast

In [30]:
!pip install pandas pyarrow

Collecting pyarrow
  Downloading pyarrow-16.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (40.9 MB)
[K     |████████████████████████████████| 40.9 MB 6.4 MB/s eta 0:00:01
Installing collected packages: pyarrow
Successfully installed pyarrow-16.0.0


In [624]:
df = parsed_df.toPandas()

In [625]:
df['File'] = df['File'].str.split('/').apply(lambda x: x[-1])

In [626]:
df['File'] = df['File'].str.split('.').apply(lambda x: x[0]).str.split('_').apply(lambda x: [x[1], x[3]])

IndexError: list index out of range

In [627]:
df

Unnamed: 0,File,TotalBytes,AvgBytesPerChunk,MaxBytesPerChunk,MinBytesPerChunk,StdBytesPerChunk,AvgTransferTime,StdTransferTime,AvgTimeBetweenChunks,StdTimeBetweenChunks,...,85PercChunkSizeforfirst10,90PercChunkSizeforfirst10,AvgChunksizeforfirst100,MaxChunksizeforfirst100,MinChunksizeforfirst100,StdChunksizeforfirst100,50PercChunkSizeforfirst100,75PercChunkSizeforfirst100,85PercChunkSizeforfirst100,90PercChunkSizeforfirst100
0,1646706373-c0.caubryvid.hdumb.pharos-exp-a-108...,60063569938243,1009489000.0,3632635000.0,12,1102731619,0.0034,0.065089,0.0034,0.065089,...,2344391000.0,2473216000.0,575507968.0,3632635000.0,31.0,1031856000.0,2896.0,12308.0,2344187000.0,2344317000.0
1,1646706557-h2-servers.c5s3wgsts.hdumb.pharos-e...,5977254163182,314741400.0,2577952000.0,21,420428466,0.010658,0.165065,0.010658,0.165065,...,914255800.0,1080626000.0,391436256.0,2577952000.0,35.0,497489100.0,2736.0,914117248.0,914189400.0,914222400.0
2,1646709513-h2-servers.c5s3wgsts.hdumb.pharos-e...,918067395896,32819770.0,1352488000.0,21,48223321,0.007241,0.125652,0.007241,0.125652,...,144116200.0,264954000.0,97011280.0,1352488000.0,44.0,144561200.0,143781248.0,143974144.0,144060700.0,144086000.0


In [628]:
df.to_csv('test_new.csv', index=False)

In [172]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split

# 创建SparkSession对象
spark = SparkSession.builder.appName("Read Text Data").getOrCreate()

# 读取文本文件
text_df = spark.read.text("/path/to/output/textFile/part-*")

# 解析数据列
# 假设你有11个数据列，分别是字符串，整数，浮点数等类型
parsed_df = text_df.select(
    split(col("value"), ",").getItem(0).alias("File").cast("tuple"),
    split(col("value"), ",").getItem(1).alias("TotalBytes").cast("int"),
    split(col("value"), ",").getItem(2).alias("AvgBytesPerChunk").cast("float"),
    split(col("value"), ",").getItem(3).alias("StdBytesPerChunk").cast("float"),
    split(col("value"), ",").getItem(4).alias("MaxBytesPerChunk").cast("int"),
    split(col("value"), ",").getItem(5).alias("MinBytesPerChunk").cast("int"),
    split(col("value"), ",").getItem(6).alias("AvgTransferTime").cast("float"),
    split(col("value"), ",").getItem(7).alias("StdTransferTime").cast("float"),
    split(col("value"), ",").getItem(8).alias("AvgTimeBetweenChunks").cast("float"),
    split(col("value"), ",").getItem(9).alias("StdTimeBetweenChunks").cast("float"),
    split(col("value"), ",").getItem(10).alias("RetransmittedChunks").cast("int"),
    split(col("value"), ",").getItem(11).alias("Bitrate").cast("float")
)

# 显示解析后的DataFrame的结构和前几行数据以验证
parsed_df.printSchema()
parsed_df.show(5)

# 可选：保存DataFrame为Parquet或其他格式进行更高效的后续处理
parsed_df.write.parquet("/path/to/output/parquetFile")


AnalysisException: Path does not exist: file:/path/to/output/textFile/part-*

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# 初始化SparkSession
spark = SparkSession.builder.appName("PCAP Analysis").getOrCreate()

# 定义DataFrame的schema
schema = StructType([
    StructField("File", StringType(), True),
    StructField("TotalBytes", IntegerType(), True),
    StructField("AvgBytesPerChunk", DoubleType(), True),
    StructField("MaxBytesPerChunk", IntegerType(), True),
    StructField("MinBytesPerChunk", IntegerType(), True),
    StructField("StdBytesPerChunk", DoubleType(), True),
    StructField("AvgTransferTime", DoubleType(), True),
    StructField("StdTransferTime", DoubleType(), True),
    StructField("AvgTimeBetweenChunks", DoubleType(), True),
    StructField("StdTimeBetweenChunks", DoubleType(), True),
    StructField("RetransmittedChunks", IntegerType(), True),
    StructField("Bitrate", DoubleType(), True)
])

# 创建DataFrame
df = spark.createDataFrame(results_rdd, schema)

In [16]:
df.printSchema()

root
 |-- File: string (nullable = true)
 |-- TotalBytes: integer (nullable = true)
 |-- AvgBytesPerChunk: double (nullable = true)
 |-- MaxBytesPerChunk: integer (nullable = true)
 |-- MinBytesPerChunk: integer (nullable = true)
 |-- StdBytesPerChunk: double (nullable = true)
 |-- AvgTransferTime: double (nullable = true)
 |-- StdTransferTime: double (nullable = true)
 |-- AvgTimeBetweenChunks: double (nullable = true)
 |-- StdTimeBetweenChunks: double (nullable = true)
 |-- RetransmittedChunks: integer (nullable = true)
 |-- Bitrate: double (nullable = true)



In [17]:
df.show()

24/04/22 02:26:36 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_3662/879752465.py", line 52, in feature_extract
  File "<__array_function__ internals>", line 200, in mean
  File "/usr/local/lib/python3.8/dist-packages/numpy/core/fromnumeric.py", line 3464, in mean
    return _methods._mean(a, axis=axis, dtype=dtype,
  File "/usr/local/lib/python3.8/dist-packages/numpy/core/_methods.py", line 194, in _mean
    ret = ret

Py4JJavaError: An error occurred while calling o160.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (b8b27a73c601 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_3662/879752465.py", line 52, in feature_extract
  File "<__array_function__ internals>", line 200, in mean
  File "/usr/local/lib/python3.8/dist-packages/numpy/core/fromnumeric.py", line 3464, in mean
    return _methods._mean(a, axis=axis, dtype=dtype,
  File "/usr/local/lib/python3.8/dist-packages/numpy/core/_methods.py", line 194, in _mean
    ret = ret / rcount
  File "/usr/local/lib/python3.8/dist-packages/scapy/utils.py", line 131, in __truediv__
    return EDecimal(Decimal.__truediv__(self, Decimal(other)))
TypeError: conversion from numpy.int64 to Decimal is not supported

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_3662/879752465.py", line 52, in feature_extract
  File "<__array_function__ internals>", line 200, in mean
  File "/usr/local/lib/python3.8/dist-packages/numpy/core/fromnumeric.py", line 3464, in mean
    return _methods._mean(a, axis=axis, dtype=dtype,
  File "/usr/local/lib/python3.8/dist-packages/numpy/core/_methods.py", line 194, in _mean
    ret = ret / rcount
  File "/usr/local/lib/python3.8/dist-packages/scapy/utils.py", line 131, in __truediv__
    return EDecimal(Decimal.__truediv__(self, Decimal(other)))
TypeError: conversion from numpy.int64 to Decimal is not supported

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:349)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


## ORIGINAL 

In [None]:
row_list1

[['./RequetDataSet/A/PCAP_FILES/baseline_Jan17_exp_30.pcap',
  31421151522919,
  858665633.397617,
  4287286942,
  1,
  (660636362.0453285,),
  Decimal('0.01610678421512898994315697420'),
  Decimal('0.3274723228400772005875647423'),
  Decimal('0.01610678421512898994315697420'),
  Decimal('0.3274723228400772005875647423'),
  378,
  Decimal('6110.413269788803358477474430')]]

## Identifying the service type using DNS traffic

In [None]:
## DNS queries can be used to identify to which service a remote IP belongs to.

In [22]:
import scapy.all as sp
import dnslib

# YT for example
YT_DOMAINS = ["youtube"]

# READ THE PCAP AND INSERT INTO THIS ARRAY THE IP ADDRESS THAT BELONG TO YT
youtube_ips = []

pcap_file = "data/1659055888-c0.ipsecptp.tiered.pharos-ipsecptp-a-video.720.dash.http2.pcap"

with sp.PcapReader(pcap_file) as trace:
  for packet in trace:
    # DNS Packet
    if packet.haslayer(sp.UDP) and packet[sp.UDP].sport == 53:
      # Get DNS data
      raw = sp.raw(packet[sp.UDP].payload)
      # Process the DNS query
      dns = dnslib.DNSRecord.parse(raw)
      # Iterate over answers
      for a in dns.rr:
        # Check if it's a domain of interest
        question = str(a.rname)
        if any(s in question for s in youtube):
          # Check if it's an answer
          if a.rtype == 1 or a.rtype == 28:
            print("Query {} is a Youtube query. Appending IP {} to Youtube IPs".format(question, a.rdata))
            youtube_ips.append(str(a.rdata))

print("Youtube IPs: {}".format(youtube_ips))

Youtube IPs: []


In [7]:
import scapy.all as sp
import dnslib

all_domains_ips = {}

pcap_file = "data/1659055888-c0.ipsecptp.tiered.pharos-ipsecptp-a-video.720.dash.http2.pcap"

with sp.PcapReader(pcap_file) as trace:
    for packet in trace:
        if packet.haslayer(sp.UDP) and packet[sp.UDP].sport == 53:
            raw = sp.raw(packet[sp.UDP].payload)
            dns = dnslib.DNSRecord.parse(raw)
            for a in dns.rr:
                domain_name = str(a.rname)
                if a.rtype == 1 or a.rtype == 28:
                    if domain_name not in all_domains_ips:
                        all_domains_ips[domain_name] = []
                    all_domains_ips[domain_name].append(str(a.rdata))

for domain, ips in all_domains_ips.items():
    print(f"domain name：{domain}, IP addr: {ips}")

In [9]:
all_domains_ips

{}

## Collecting network counters

In [14]:
network_counters = {}

# You can use this dictionary to collect counters
def counters():
  return {"in_pkts": 0, "out_pkts": 0, "in_bytes": 0, "out_bytes": 0}

# header is called "length"
pcap_file = "data/1659055888-c0.ipsecptp.tiered.pharos-ipsecptp-a-video.720.dash.http2.pcap"

with sp.PcapReader(pcap_file) as trace:
    for packet in trace:
        if packet.haslayer(sp.IP):
            src_ip = packet[sp.IP].src
            dst_ip = packet[sp.IP].dst
            pkt_length = packet[sp.IP].len

            if src_ip not in network_counters:
                network_counters[src_ip] = counters()
                
            network_counters[src_ip]["out_pkts"] += 1
            network_counters[src_ip]["out_bytes"] += pkt_length

            if dst_ip not in network_counters:
                network_counters[dst_ip] = counters()

            network_counters[dst_ip]["in_pkts"] += 1
            network_counters[dst_ip]["in_bytes"] += pkt_length

for ip in network_counters:
  print("IP {} generated the following amout of traffic {}".format(ip, network_counters[ip]))

IP 10.0.1.1 generated the following amout of traffic {'in_pkts': 338, 'out_pkts': 672, 'in_bytes': 94040, 'out_bytes': 182176}
IP 10.0.13.1 generated the following amout of traffic {'in_pkts': 672, 'out_pkts': 338, 'in_bytes': 182176, 'out_bytes': 94040}
IP 10.0.3.1 generated the following amout of traffic {'in_pkts': 73991, 'out_pkts': 37991, 'in_bytes': 110695512, 'out_bytes': 2076816}
IP 10.0.15.1 generated the following amout of traffic {'in_pkts': 37991, 'out_pkts': 73991, 'in_bytes': 2076816, 'out_bytes': 110695512}


## Infer segment downloads

In [16]:
## init seg. tracking
def segment():
    return {"pkts": 0, "bytes": 0}

# Read pcap and records completed video segments for each IP
completed_video_segments = {}
ongoing_video_segment = {}

with sp.PcapReader(pcap_file) as trace:
    for packet in trace:
        # Process only TCP packets that carry a payload
        if packet.haslayer(sp.TCP) and packet.haslayer(sp.IP):
            ip_len = packet[sp.IP].len
            ip_header_len = 4 * packet[sp.IP].ihl
            tcp_header_len = 4 * packet[sp.TCP].dataofs
            payload_size = ip_len - ip_header_len - tcp_header_len

            if payload_size > 0:
                src_ip = packet[sp.IP].src
                dst_ip = packet[sp.IP].dst

                # Assume packets to a Youtube IP are video segment downloads
                # This is a simplified check and would normally require more robust identification logic
                if dst_ip in youtube_ips:  
                    if dst_ip not in ongoing_video_segment:
                        ongoing_video_segment[dst_ip] = segment()
                    
                    ongoing_video_segment[dst_ip]["pkts"] += 1
                    ongoing_video_segment[dst_ip]["bytes"] += payload_size

                    # Example condition to mark a segment as completed (e.g., based on packet counts or bytes)
                    if ongoing_video_segment[dst_ip]["pkts"] >= 100:  # This threshold is arbitrary
                        if dst_ip not in completed_video_segments:
                            completed_video_segments[dst_ip] = []
                        completed_video_segments[dst_ip].append(ongoing_video_segment[dst_ip])
                        ongoing_video_segment[dst_ip] = segment()  # Reset for a new segment

# After capturing all segments, print the number of segments found for each Youtube IP
for ip in completed_video_segments.keys():
    print(f"IP {ip} downloaded {len(completed_video_segments[ip])} segments")

In [19]:
completed_video_segments, youtube_ips

({}, [])