SparkSession introduced in version 2.0, It is an entry point to underlying PySpark functionality in order to programmatically create PySpark RDD, DataFrame. It’s object spark is default available in pyspark-shell and it can be created programmatically using SparkSession. 

In [1]:
import pyspark
from pyspark.sql import SparkSession
# instantiate a spark Session
spark = SparkSession.builder.master("local[*]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

Cluster Types:
1. Standalone – a simple cluster manager included with Spark that makes it easy to set up a cluster.
2. Apache Mesos – Mesons is a Cluster manager that can also run Hadoop MapReduce and PySpark applications.
3. Hadoop YARN – the resource manager in Hadoop 2. This is mostly used, cluster manager.
4. Kubernetes – an open-source system for automating deployment, scaling, and management of containerized applications.

Accumulator : are write-only and initialize once variables where only tasks that are running on workers are allowed to update and updates from the workers get propagated automatically to the driver program. But, only the driver program is allowed to access the Accumulator variable using the value property.

In [2]:
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
accuSum=spark.sparkContext.accumulator(0)
def countFun(x):
    global accuSum
    accuSum+=x
rdd.foreach(countFun)
print(accuSum.value)

15


Spark RDD repartition() is used to increase or decrease the RDD/DataFrame partitions whereas the PySpark coalesce() is used to only decrease the number of partitions in an efficient way. 

In [13]:
# In RDD, you can create parallelism at the time of the creation of an RDD using parallelize(), textFile() and wholeTextFiles().
rdd1 = spark.sparkContext.parallelize(list(range(0,20)),6)
print("parallelize " +str(rdd.getNumPartitions()))
rdd1.saveAsTextFile("partition2")

parallelize 6


In [14]:
rdd2 = rdd1.repartition(4)
print("Repartition size : "+str(rdd2.getNumPartitions()))
rdd2.saveAsTextFile("re-partition2")

rdd3 = rdd1.coalesce(4)
print("Repartition size : "+str(rdd3.getNumPartitions()))
rdd3.saveAsTextFile("coalesce2")

# --> Key Difference : coalesce() is optimized or improved version of repartition() where the movement of the data across the partitions is lower using 

Repartition size : 4
Repartition size : 4


 Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks. Instead of sending this data along with every task, PySpark distributes broadcast variables to the workers using efficient broadcast algorithms to reduce communication costs.

In [21]:
broadcastVar = spark.sparkContext.broadcast({0:'0',1:'1'})
broadcastVar.value

{0: '0', 1: '1'}

RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable distributed collections of objects. Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster. <br>
PySpark RDD Benefits: 
1. In-Memory Processing
2. Immutability
3. Fault Tolerance
4. Lazy Evolution
5. Partitioning

RDD transformations – Transformations are lazy operations, instead of updating an RDD, these operations return another RDD. Transformations are lazy meaning they don’t execute until you call an action on RDD.


In [26]:
rdd = spark.sparkContext.textFile("test.txt")

flatMap()transformation flattens the RDD after applying the function and returns a new RDD. On the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.

In [32]:
rdd2 = rdd.flatMap(lambda x: x.split(" "))

map() transformation is used the apply any complex operations like adding a column, updating a column e.t.c, the output of map transformations would always have the same number of records as input. <br>In our word count example, we are adding a new column with value 1 for each word, the result of the RDD is PairRDDFunctions which contains key-value pairs, word of type String as Key and 1 of type Int as value.

In [33]:
rdd3 = rdd2.map(lambda x: (x,1))

reduceByKey() merges the values for each key with the function specified. <br> In our example, it reduces the word string by applying the sum function on value. The result of our RDD contains unique words and their count. 

In [37]:
rdd4 = rdd3.reduceByKey(lambda a,b: a+b)

sortByKey() transformation is used to sort RDD elements on key.<br> In our example, first, we convert RDD[(String,Int]) to RDD[(Int, String]) using map transformation and apply sortByKey which ideally does sort on an integer value.

In [42]:
rdd5 = rdd4.map(lambda x: (x[1],x[0])).sortByKey()

filter() transformation is used to filter the records in an RDD. <br> In our example we are filtering all words starts with “a”.

In [58]:
rdd6 = rdd3.filter(lambda x : 'a' in x[0])

RDD actions – operations that trigger computation and return RDD values to a driver program.

count() – Returns the number of records in an RDD

In [60]:
print("Count : "+str(rdd6.count()))

Count : 144


first() – Returns the first record.

In [62]:
firstRec = rdd5.first()
print("First Record : "+str(firstRec[0]) + ","+ firstRec[1])

First Record : 9,Project


max() – Returns max record.

In [64]:
datMax = rdd5.max()
print("Max Record : "+str(datMax[0]) + ","+ datMax[1])

Max Record : 27,with


reduce() – Reduces the records to single, we can use this to count or sum.

In [75]:
# Action - reduce
totalWordCount = rdd5.reduce(lambda a,b: (a[0]+b[0],a[1]+b[1]))
print("dataReduce Record : "+str(totalWordCount[0]))

dataReduce Record : 522


take() – Returns the record specified as an argument.

In [76]:
data3 = rdd6.take(3)

In [77]:
data3

[('Wonderland', 1), ('Carroll', 1), ('anyone', 1)]

Pair RDD is a key-value pair This is mostly used RDD type, 
1. ShuffledRDD 
2. DoubleRDD 
3. SequenceFileRDD  
4. HadoopRDD 
5. ParallelCollectionRDD 


**Shuffling** is a mechanism PySpark uses to redistribute the data across different executors and even across machines. PySpark shuffling triggers when we perform certain transformation operations like groupByKey(), reduceByKey(), join() on RDDS

**DataFrame** is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.

In [2]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [5]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [4]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



PySpark SQL is one of the most used PySpark modules which is used for processing structured columnar data format. Once you have a DataFrame created, you can interact with the data by using SQL syntax. 

In [6]:
df.createOrReplaceTempView("PERSON_DATA")
df2 = spark.sql("SELECT * from PERSON_DATA")
df2.printSchema()
df2.show()


root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [7]:
groupDF = spark.sql("SELECT gender, count(*) from PERSON_DATA group by gender")
groupDF.show()

+------+--------+
|gender|count(1)|
+------+--------+
|     M|       3|
|     F|       2|
+------+--------+



In [11]:
columns = ["language","users_count"]
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
rdd = spark.sparkContext.parallelize(data)

In [15]:
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()
dfFromRDD1.show()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



In [40]:
#create a dataframe with schema
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)


root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



In [18]:
# Defining the same schema using DDL is much simpler:
schema_ddl = " firstname STRING, middlename STRING, lastname STRING, id STRING, gender STRING, salary INT "

In [19]:
df = spark.createDataFrame(data=data2,schema=schema_ddl)
df.printSchema()
df.show(truncate=False)


root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



In [21]:
from pyspark.sql import Row

row=Row("James",40)
print(row[0] +","+str(row[1]))


James,40


In [22]:
row=Row(name="Alice", age=11)
print(row.name) 

Alice


In [55]:
data = [("{0:'0',1:'1'}",),("{2:'2',3:'3'}",)]

In [56]:
df_k = spark.createDataFrame(data=data)


In [57]:
df_k.show()

+-------------+
|           _1|
+-------------+
|{0:'0',1:'1'}|
|{2:'2',3:'3'}|
+-------------+



In [61]:
import json
def parse_tweets(raw_records):
    for pdf in raw_records:
        d=json.loads(str(pdf.data))
    # yield pdf.data.apply(lambda record: record.split("\t"))
    # yield pdf.data.str.split(pat="\t", expand=True)
        print(list(d.values()))

In [62]:
parse_tweets(df_k)

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [85]:
for row in df_k.collect():
    j=json.loads(row._1)
    print(j)

JSONDecodeError: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)

In [82]:
df_k.collect()

[Row(_1="{0:'0',1:'1'}"), Row(_1="{2:'2',3:'3'}")]