# Spark Cluster Test
Testing parallel processing with multiple workers

In [1]:
from pyspark.sql import SparkSession
import time

In [2]:
spark = SparkSession.builder \
    .appName("NotebookTest") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/03 21:56:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
print(f"Spark {spark.version}")
print(f"Master: {spark.sparkContext.master}")

Spark 3.5.7
Master: spark://spark-master:7077


In [4]:
import os 
path= '/opt/spark/work-dir/data/sample.txt'
print(os.path.exists('/opt/spark/work-dir/data/sample.txt'))

True


In [5]:
text_df = spark.read.text(path)
print(f"Lines: {text_df.count()}")
text_df.show(5, truncate=False)

Lines: 30
+---------------------------------------------------------------------------+
|value                                                                      |
+---------------------------------------------------------------------------+
|Sahar works with distributed data processing systems every day.            |
|Kshitij manages the Spark cluster configuration and deployment.            |
|Rabina develops machine learning pipelines using PySpark.                  |
|                                                                           |
|Apache Spark is a unified analytics engine for large-scale data processing.|
+---------------------------------------------------------------------------+
only showing top 5 rows



In [6]:
data = [("Sahar", 24, "Data Engineer"),
        ("Kshitij", 27, "DevOps Engineer"),
        ("Rabina", 29, "ML Engineer"),
        ("Manu", 26, "Backend Developer"),
        ("Ola", 29, "Frontend Developer")]

test_df = spark.createDataFrame(data, ["name", "age", "role"])
test_df.show()

+-------+---+------------------+
|   name|age|              role|
+-------+---+------------------+
|  Sahar| 24|     Data Engineer|
|Kshitij| 27|   DevOps Engineer|
| Rabina| 29|       ML Engineer|
|   Manu| 26| Backend Developer|
|    Ola| 29|Frontend Developer|
+-------+---+------------------+



                                                                                

In [8]:
test_df.groupBy("role").count().show()

+------------------+-----+
|              role|count|
+------------------+-----+
|   DevOps Engineer|    1|
| Backend Developer|    1|
|Frontend Developer|    1|
|     Data Engineer|    1|
|       ML Engineer|    1|
+------------------+-----+



In [9]:
large_range = spark.range(0, 1000000)
result = large_range.filter(large_range.id % 2 == 0).count()
print(f"Even numbers: {result}")

Even numbers: 500000


In [10]:
rdd = spark.sparkContext.parallelize(range(100))
squared = rdd.map(lambda x: x * x).collect()
print(f"Sum of squares (0-99): {sum(squared)}")

Sum of squares (0-99): 328350


In [11]:
spark.stop()

### Check AWS Connection 

In [7]:
spark_aws = SparkSession.builder \
    .appName("S3ParquetReader") \
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
    .getOrCreate()

In [8]:
aws_remote_df = spark_aws.read.parquet("s3a://ubs-datasets/FRACTAL/data/train/")

25/11/04 08:53:15 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

In [9]:
aws_remote_df.head(5) 

                                                                                

[Row(xyz=[745049.947, 6332500.528, 1091.963], Intensity=628, ReturnNumber=1, NumberOfReturns=2, ScanDirectionFlag=0, EdgeOfFlightLine=0, Classification=5, Synthetic=0, KeyPoint=0, Withheld=0, Overlap=0, ScanAngleRank=21.0, UserData=6, PointSourceId=602, GpsTime=307259829.0763777, ScanChannel=2, Red=14848, Green=17920, Blue=16640, Infrared=33536, wkb=bytearray(b'\x01\xe9\x03\x00\x00\x1b/\xdd\xe4\xb3\xbc&A\x83\xc0\xca!\x15(XA1\x08\xac\x1c\xda\x0f\x91@')),
 Row(xyz=[745049.988, 6332500.674, 1090.763], Intensity=821, ReturnNumber=1, NumberOfReturns=2, ScanDirectionFlag=0, EdgeOfFlightLine=0, Classification=5, Synthetic=0, KeyPoint=0, Withheld=0, Overlap=0, ScanAngleRank=21.0, UserData=6, PointSourceId=602, GpsTime=307259829.0763785, ScanChannel=2, Red=16384, Green=20736, Blue=18944, Infrared=34048, wkb=bytearray(b'\x01\xe9\x03\x00\x00\xd1"\xdb\xf9\xb3\xbc&A\xe5\xd0"+\x15(XAd;\xdfO\r\x0b\x91@')),
 Row(xyz=[745049.998, 6332502.006, 1093.5], Intensity=1141, ReturnNumber=1, NumberOfReturns=2, 

In [10]:
aws_remote_df.columns

['xyz',
 'Intensity',
 'ReturnNumber',
 'NumberOfReturns',
 'ScanDirectionFlag',
 'EdgeOfFlightLine',
 'Classification',
 'Synthetic',
 'KeyPoint',
 'Withheld',
 'Overlap',
 'ScanAngleRank',
 'UserData',
 'PointSourceId',
 'GpsTime',
 'ScanChannel',
 'Red',
 'Green',
 'Blue',
 'Infrared',
 'wkb']

### Test spark measure

In [None]:
from sparkmeasure import StageMetrics
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkMeasureTest") \
    .master("spark://spark-master:7077") \
    .getOrCreate()


stagemetrics = StageMetrics(spark)

stagemetrics.begin()

df = spark.range(1000000)
df.selectExpr("id", "id % 10 as key").groupBy("key").count().show()

stagemetrics.end()

stagemetrics.print_report()

spark.stop()