In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import time

In [2]:
def time_analysis(session):
    start = time.time()
    rdd = session.read.csv("hdfs://192.168.2.43:9000/user/root/data/*/merge.csv", header = True)
    rdd = rdd.withColumn("WND", rdd.WND.cast("double"))
    rdd.agg(F.avg(rdd.WND))
    end = time.time()
    print(f"{end-start:.2f} seconds")

**Test 1 - A single Apache Worker using a single core**

In [3]:
spark_session_1 = SparkSession.builder\
        .master("spark://192.168.2.43:7077") \
        .appName("experiment_2_1")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.instances", 1)\
        .config("spark.cores.max", 1)\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/21 18:56:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
time_analysis(spark_session_1)

                                                                                

19.05 seconds


In [None]:
spark_session_1.stop()

**Test 2 - A single Apache Worker using all cores**

In [3]:
spark_session_2 = SparkSession.builder\
        .master("spark://192.168.2.43:7077") \
        .appName("experiment_2_2")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.instances", 1)\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/21 18:57:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
time_analysis(spark_session_2)

                                                                                

17.82 seconds


In [None]:
spark_session_2.stop()

**Test 3 - Two Apache Workers using a single core**

In [3]:
spark_session_3 = SparkSession.builder\
        .master("spark://192.168.2.43:7077") \
        .appName("experiment_2_3")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.instances", 2)\
        .config("spark.cores.max", 2)\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/21 18:58:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
time_analysis(spark_session_3)

                                                                                

17.11 seconds


In [None]:
spark_session_3.stop()

**Test 4 - Two Apache Workers using a all cores**

In [3]:
spark_session_4 = SparkSession.builder\
        .master("spark://192.168.2.43:7077") \
        .appName("experiment_2_4")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.instances", 2)\
        .config("spark.cores.max", 4)\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/21 19:01:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
time_analysis(spark_session_4)

                                                                                

16.00 seconds


In [None]:
spark_session_4.stop()