In [1]:
import time
import pyspark
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevel

In [2]:
class Power:
    def __init__(self, p):
        self.p = p
        time.sleep(2)
        
    def applyPower(self, x):
        return x**self.p
        
# map
def power_map(num):
    # apply num ** 5
    c = Power(5)
    return c.applyPower(num)

In [10]:
conf = pyspark.SparkConf().set("spark.driver.host", "localhost")
sc = SparkContext.getOrCreate()
sc.setLogLevel('OFF')
numbers = sc.textFile("../Data/numbers.txt", 5).map(lambda x : int(x))

In [11]:
sc

In [12]:
numbers.take(5)

[4, 0, 3, 6, 8]

## Without Persisting or Caching

In [35]:
start = time.time()
powered_num = numbers.map(lambda x: power_map(x))
powered_num.collect()
print("first ", time.time() - start)

start = time.time()
powered_num.collect()
print("second ", time.time() - start)

                                                                                

first  42.320249795913696




second  42.17726492881775


                                                                                

## With Persisting/Caching

In [36]:
powered_num.cache()
start = time.time()

powered_num.collect()

print("first ", time.time() - start)  # trigger re-evaluation



first  42.20078897476196


                                                                                

In [37]:
start = time.time()
powered_num.collect()
print("second ", time.time() - start)  # doesn't trigger re-evaluation

second  0.0867300033569336


## What happens?

In [30]:
powered_num.persist(StorageLevel.MEMORY_AND_DISK)

Py4JJavaError: An error occurred while calling o148.persist.
: java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level
	at org.apache.spark.rdd.RDD.persist(RDD.scala:176)
	at org.apache.spark.rdd.RDD.persist(RDD.scala:200)
	at org.apache.spark.api.java.JavaRDD.persist(JavaRDD.scala:51)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)


## How to solve it?

In [38]:
powered_num.unpersist()

PythonRDD[14] at collect at /var/folders/hz/5yfy3bjj1ts_fk0bmfk7xj080000gn/T/ipykernel_5262/3487759784.py:3

## Reading/writing from disk

In [39]:
powered_num.persist(StorageLevel.DISK_ONLY) #update
start = time.time()
powered_num.collect()
print("first ", time.time() - start)  # trigger reevaluation

start = time.time()
powered_num.collect()
print("second ", time.time() - start)  # doesn't trigger reevaluation



first  42.263171911239624
second  0.06051301956176758


                                                                                

In [40]:
sc.stop()