pyspark.SparkContext

Main entry point for Spark functionality.

pyspark.RDD

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.

pyspark.streaming.StreamingContext

Main entry point for Spark Streaming functionality.

pyspark.streaming.DStream

A Discretized Stream (DStream), the basic abstraction in Spark Streaming.

pyspark.sql.SparkSession

Main entry point for DataFrame and SQL functionality.

pyspark.sql.DataFrame

A distributed collection of data grouped into named columns.

PySpark is the Python API for Spark.

Public classes:

        SparkContext:

            Main entry point for Spark functionality.

        RDD:

            A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.

        Broadcast:

            A broadcast variable that gets reused across tasks.

        Accumulator:

            An “add-only” shared variable that tasks can only add values to.

        SparkConf:

            For configuring Spark.

        SparkFiles:

            Access files shipped with jobs.


SparkFiles:

    Access files shipped with jobs.

StorageLevel:

    Finer-grained cache persistence levels.

TaskContext:

    Information about the current running task, available on the workers and experimental.

RDDBarrier:

    Wraps an RDD under a barrier stage for barrier execution.

BarrierTaskContext:

    A TaskContext that provides extra info and tooling for barrier execution.



In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = r"/home/pyspark/spark-3.0.0-preview2-bin-hadoop2.7"

import findspark
findspark.init(r'/home/pyspark/spark-3.0.0-preview2-bin-hadoop2.7')

import pyspark
from pyspark.sql import SparkSession

In [14]:
from pyspark import SparkFiles
sc = pyspark.SparkContext()
sc.addFile("blogtexts.txt")

def func():

   with open(SparkFiles.get("blogtexts.txt")) as testFile:

       yield testFile.readline()

func()     



<generator object func at 0x7f7a1c213750>

In [15]:
next

<function next>

In [16]:
func

<function __main__.func()>

In [17]:
x = func()

In [18]:
next(x)

'Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you imagine how many drives / CDs / Blue-ray DVDs would be required to store them? It is difficult to imagine this scale of data generation even as a data science professional. While this pace of data generation is very exciting,  it has created entirely new set of challenges and has forced us to find new ways to handle Big Huge data effectively.\n'

In [19]:
next(x)

StopIteration: 

In [20]:
next(x)

StopIteration: 

In [21]:
def gen_func():

   with open(SparkFiles.get("blogtexts.txt")) as genFile:

       for i in genFile.readline():
            yield i

y = gen_func() 

In [22]:
next(y)

'T'

In [23]:
next(y)

'h'

In [24]:
next(y)

'i'

In [25]:
next(y)

'n'

In [26]:
sc.applicationId

'local-1604684039568'

In [27]:
xp = sc.parallelize([0, 2, 3, 4, 6], 5)


In [28]:
xp

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:247

In [29]:
xp.glom().collect()

[[0], [2], [3], [4], [6]]

In [30]:
type(xp)

pyspark.rdd.RDD

In [33]:
sc.range(5)

PythonRDD[4] at RDD at PythonRDD.scala:53

In [34]:
sc.parallelize(range(10))

PythonRDD[6] at RDD at PythonRDD.scala:53

In [35]:
import threading

from time import sleep

result = "Not Set"

lock = threading.Lock()

def map_func(x):

    sleep(100)

    raise Exception("Task should have been cancelled")

def start_job(x):

    global result

    try:

        sc.setJobGroup("job_to_cancel", "some description")

        result = sc.parallelize(range(x)).map(map_func).collect()

    except Exception as e:

        result = "Cancelled"

    lock.release()

def stop_job():

    sleep(5)

    sc.cancelJobGroup("job_to_cancel")

suppress = lock.acquire()

suppress = threading.Thread(target=start_job, args=(10,)).start()

suppress = threading.Thread(target=stop_job).start()

suppress = lock.acquire()

print(result)

Internally threads on PVM and JVM are not synced, and JVM thread can be reused for multiple threads on PVM, which fails to isolate local properties for each thread on PVM. 
To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). However, note that it cannot inherit the local properties from the parent thread although it isolates each thread on PVM and JVM with its own local properties. 
To work around this, you should manually copy and set the local properties from the parent thread to the child thread when you create another thread.


Cancelled


In [37]:
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))

combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))

sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)




(10, 4)

In [38]:
sc.parallelize([1,1]).aggregate((0, 0), seqOp, combOp)


(2, 2)

In [39]:
sc.parallelize([10,1]).aggregate((1, 0), seqOp, combOp)


(13, 2)

In [40]:
sc.parallelize([1,2,3,4]).aggregate((1, 1), seqOp, combOp)


(12, 6)

 aggregate(zeroValue, seqOp, combOp)[source]

    Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”

    The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

    The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U

In [49]:
from operator import add

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])


In [50]:
rdd

ParallelCollectionRDD[29] at readRDDFromFile at PythonRDD.scala:247

In [51]:
sorted(rdd.reduceByKeyLocally(add).items())

[('a', 2), ('b', 1)]

In [52]:
sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect()

[('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]

In [55]:
from pyspark.context import SparkContext



b = sc.broadcast([1, 2, 3, 4, 5])

b.value


[1, 2, 3, 4, 5]

In [56]:


sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()


b.unpersist()

In [58]:
from pyspark import SparkConf, SparkContext

from pyspark import BasicProfiler

class MyCustomProfiler(BasicProfiler):

    def show(self, id):

        print("My custom profiles for RDD:%s" % id)


sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)


sc.parallelize(range(1000)).count()


sc.stop()