In [None]:
@ Yousef ALKHA

# Pyspark Evaluation

In [None]:
import os
import pyspark
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql import Window
from pyspark.sql.types import DoubleType

In [None]:
print("Spark Version:", pyspark.__version__)

In [None]:
conf = SparkConf().setMaster("local[*]")\
                                .set("spark.ui.port", 4040)\
                                .set("spark.memory.fraction", "0.9")\
                                .set("spark.memory.storageFraction", "0.4")\
                                .set("spark.driver.memory", "15g")\
                                .set("spark.executor.memory", "15g")\
                                .set("spark.executor.cores", 5)\
                                .set("spark.driver.cores", 5)\
                                .set("spark.executor.instances", 3)\
                                .setAppName("PYSPARK_VS_PANDAS_VS_POLARS")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")

# Data Operations

## Loading

In [None]:
main_dir, _ = os.path.split(os.getcwd())
data_dir = os.path.join(main_dir, "data")
internet_session_data_path = os.path.join(data_dir, "internet_sessions")

In [None]:
%%time
for i in range(5):
    spark_df = spark.read.format("csv").option("header",True).load(internet_session_data_path)
    spark_df.show(5, False)

In [None]:
spark_df.cache()
print(spark_df.count())

## Column Selection

In [None]:
%%time
spark_df.select("username", "private_ip", "terminatecause", "statustype").show(5)

## Row Selection

In [None]:
%%time
# window_spec = Window.orderBy("username")
# spark_df_temp = spark_df.withColumn("row_number", func.row_number().over(window_spec))
# spark_df_temp.filter(spark_df_temp["row_number"]==51235).show(5)


spark_df_temp = spark_df.withColumn("row_id", func.monotonically_increasing_id())
spark_df_temp.filter(spark_df_temp.row_id == 51235).show(5)

## Filtering

In [None]:
%%time
spark_df.filter((spark_df['port_begin'] > 27224) & (spark_df['port_end'] < 60363)).show(5)

## Grouping & Aggregating

In [None]:
%%time
spark_df.groupBy('username').agg(func.sum(func.col('download')).alias('download'), 
                                       func.sum(func.col('upload')).alias('upload')).show(5)

## Joining

In [None]:
ids_df = spark.read.format("csv").option("header", True).load(os.path.join(data_dir, "ids", "username_id.csv"))
ids_df = func.broadcast(ids_df)

In [None]:
%%time
spark_df.join(ids_df, on='username', how="inner").show(5)

## Pivoting

In [None]:
%%time
spark_df_temp = spark_df.fillna({"statustype": "Empty", "terminatecause": "Empty"})
spark_df_temp = spark_df_temp.groupBy("statustype").pivot("terminatecause").agg(func.count("username"))
spark_df_temp.show(5)

## Sorting

In [None]:
%%time
spark_df.orderBy('username').show(5)

## Applying External Functions

In [None]:
def bytes_to_gb(value):
    if value is None:
        return None  # Return None for null values
    return value / (1024 ** 3)

In [None]:
%%time
bytes_to_gb_udf = func.udf(bytes_to_gb, DoubleType())
spark_df = spark_df.withColumn("download", func.col("download").cast("int"))
spark_df.withColumn("download", bytes_to_gb_udf(spark_df["download"])).show(5)

## String Operation

In [None]:
%%time
spark_df.filter(spark_df['username'].rlike('10|20')).show(5)

## Datetime Operation

In [None]:
%%time
spark_df.withColumn("start_time", func.to_timestamp(spark_df["start_time"].cast("string"), "yyyyMMddHHmmss")).show(5)

## Writing

In [None]:
%%time
spark_df.coalesce(14).write.format("csv").option("header",True).mode("overwrite").save(os.path.join(data_dir, "output", "pyspark_data"))

# Done