# Configuration of Spark Session to Spark Cluster

In [1]:
from pyspark.sql import SparkSession
from operator import add
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.119:7077") \
        .appName("tim-wywiol-A3")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .config("spark.cores.max", 4)\
        .getOrCreate()
# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/25 11:27:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/25 11:27:11 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


# Loading Data in SPARK RDD from HDFS

In [2]:
data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.119:9000/parking-citations.csv')\
    .cache()

                                                                                

# B1

In [3]:
data_frame.show()

[Stage 1:>                                                          (0 + 1) / 1]

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

                                                                                

In [14]:
#schema
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [4]:
data_frame.count()

                                                                                

13077724

In [5]:
data_frame.rdd.getNumPartitions()

16

In [3]:
data4 = data_frame.drop('VIN','Latitude', 'Longitude')

In [25]:
data4.orderBy('Fine amount', ascending=False).show()



+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+-------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+-------------------+-----------------+----------------------+
|   4359446453|2019-07-31T00:00:...|     612.0|    null|       null|            IL|             null|PTRB|        CM|   BK|     12800 HAMLIN ST|00315|  53.0|       80.69.4|           PK OVERSIZ|  

                                                                                

In [21]:
data4.filter(data4['Fine amount'] == '1100').show()

                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+--------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount|  Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+--------------------+-----------------+----------------------+
|   4510322972|2020-02-05T00:00:...|      1127|    null|       null|            CA|           202010|HOND|        PA|   GY|  2055 FIGUEROA ST N|00600|    56|     22511.57B|   DP- RO NOT PRESENT

                                                                                

In [4]:
from pyspark.sql.types import DecimalType
data_5= data4.withColumn("Fine amount",data4["Fine amount"].cast(DecimalType()))

In [5]:
data_5.orderBy('Fine amount', ascending=False).show()

                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+--------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount|  Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+--------------------+-----------------+----------------------+
|   4510804966|2020-02-07T00:00:...|       847|    null|       null|            CA|           202007|AUDI|        PA|   BK|  7257 SUNSET BLVD W|00461|    54|     22511.57B|   DP- RO NOT PRESENT

In [6]:
maxfine = data_5.agg({"Fine amount": "max"}).collect()[0]

                                                                                

In [7]:
print(maxfine)

Row(max(Fine amount)=Decimal('1100'))


In [8]:
numberOfMaxFine = data_5.filter(data_5['Fine amount'] == 1100).count()

                                                                                

In [9]:
print("\033[34m",numberOfMaxFine,"\033[0m", 'tickets have the max fine of 1100 money units')

[34m 626 [0m tickets have the max fine of 1100 money units


## B7

In [14]:
data_5.groupBy('Make').count().show()



22/02/25 11:32:57 ERROR TaskSetManager: Task 2 in stage 10.0 failed 4 times; aborting job


Py4JJavaError: An error occurred while calling o159.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 10.0 failed 4 times, most recent failure: Lost task 2.3 in stage 10.0 (TID 138) (192.168.2.119 executor 1): com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:188)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:269)
	at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
	at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:177)
	at org.apache.spark.storage.BlockManager.$anonfun$dropFromMemory$3(BlockManager.scala:1848)
	at org.apache.spark.storage.BlockManager.$anonfun$dropFromMemory$3$adapted(BlockManager.scala:1843)
	at org.apache.spark.storage.DiskStore.put(DiskStore.scala:70)
	at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1843)
	at org.apache.spark.storage.memory.MemoryStore.dropBlock$1(MemoryStore.scala:490)
	at org.apache.spark.storage.memory.MemoryStore.$anonfun$evictBlocksToFreeSpace$4(MemoryStore.scala:516)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
	at org.apache.spark.storage.memory.MemoryStore.evictBlocksToFreeSpace(MemoryStore.scala:507)
	at org.apache.spark.memory.StorageMemoryPool.freeSpaceToShrinkPool(StorageMemoryPool.scala:130)
	at org.apache.spark.memory.UnifiedMemoryManager.maybeGrowExecutionPool$1(UnifiedMemoryManager.scala:123)
	at org.apache.spark.memory.UnifiedMemoryManager.$anonfun$acquireExecutionMemory$1(UnifiedMemoryManager.scala:148)
	at org.apache.spark.memory.ExecutionMemoryPool.acquireMemory(ExecutionMemoryPool.scala:119)
	at org.apache.spark.memory.UnifiedMemoryManager.acquireExecutionMemory(UnifiedMemoryManager.scala:148)
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:147)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:297)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116)
	at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.acquirePage(RowBasedKeyValueBatch.java:127)
	at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.<init>(RowBasedKeyValueBatch.java:107)
	at org.apache.spark.sql.catalyst.expressions.VariableLengthRowBasedKeyValueBatch.<init>(VariableLengthRowBasedKeyValueBatch.java:186)
	at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.allocate(RowBasedKeyValueBatch.java:92)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_FastHashMap_0.<init>(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No space left on device
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
	at org.apache.spark.storage.CountingWritableChannel.write(DiskStore.scala:339)
	at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
	at java.nio.channels.Channels.writeFully(Channels.java:101)
	at java.nio.channels.Channels.access$000(Channels.java:61)
	at java.nio.channels.Channels$1.write(Channels.java:174)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
	... 45 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:188)
	at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
	at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:269)
	at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
	at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:177)
	at org.apache.spark.storage.BlockManager.$anonfun$dropFromMemory$3(BlockManager.scala:1848)
	at org.apache.spark.storage.BlockManager.$anonfun$dropFromMemory$3$adapted(BlockManager.scala:1843)
	at org.apache.spark.storage.DiskStore.put(DiskStore.scala:70)
	at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1843)
	at org.apache.spark.storage.memory.MemoryStore.dropBlock$1(MemoryStore.scala:490)
	at org.apache.spark.storage.memory.MemoryStore.$anonfun$evictBlocksToFreeSpace$4(MemoryStore.scala:516)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
	at org.apache.spark.storage.memory.MemoryStore.evictBlocksToFreeSpace(MemoryStore.scala:507)
	at org.apache.spark.memory.StorageMemoryPool.freeSpaceToShrinkPool(StorageMemoryPool.scala:130)
	at org.apache.spark.memory.UnifiedMemoryManager.maybeGrowExecutionPool$1(UnifiedMemoryManager.scala:123)
	at org.apache.spark.memory.UnifiedMemoryManager.$anonfun$acquireExecutionMemory$1(UnifiedMemoryManager.scala:148)
	at org.apache.spark.memory.ExecutionMemoryPool.acquireMemory(ExecutionMemoryPool.scala:119)
	at org.apache.spark.memory.UnifiedMemoryManager.acquireExecutionMemory(UnifiedMemoryManager.scala:148)
	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:147)
	at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:297)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116)
	at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.acquirePage(RowBasedKeyValueBatch.java:127)
	at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.<init>(RowBasedKeyValueBatch.java:107)
	at org.apache.spark.sql.catalyst.expressions.VariableLengthRowBasedKeyValueBatch.<init>(VariableLengthRowBasedKeyValueBatch.java:186)
	at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.allocate(RowBasedKeyValueBatch.java:92)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_FastHashMap_0.<init>(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: No space left on device
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
	at org.apache.spark.storage.CountingWritableChannel.write(DiskStore.scala:339)
	at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
	at java.nio.channels.Channels.writeFully(Channels.java:101)
	at java.nio.channels.Channels.access$000(Channels.java:61)
	at java.nio.channels.Channels$1.write(Channels.java:174)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
	... 45 more


In [57]:
### release the cores for another application!
spark_session.stop()