<a href="https://colab.research.google.com/github/upendrak/pyspark_fundamentals/blob/main/PySpark_RDD_Basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark



# Initializing Spark using SparkContext
SparkContext is the entry point to any spark functionality. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside the executors on worker nodes

## SparkContext Example

In [2]:
from pyspark import SparkContext
sc = SparkContext("local", "Demo")

## sc.version
Check the version of PySpark installed

In [3]:
sc.version

'3.2.0'

## sc.pythonVer
Check the version of Python installed with PySpark

In [4]:
sc.pythonVer

'3.7'

# Creating an RDD in PySpark
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel.

Two different methods for creating RDD in PySpark
1.   Parallelizing an existing collection in your driver program
2.   referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat



### Creating RDD using `parallelize()`

In [5]:
words = ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "pandas",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark",
   "sparklyr",
   "koalas"]

words_rdd = sc.parallelize(words)

## type()
Check the type of object created

In [6]:
type(words)

list

# Applying Operations on RDD
## Transformation: Creates a new RDD. Examples: map, filter, groupby etc.,
## Action: Perform computations and send the result back to the driver. Examples: collect, take, first etc.,

## Common Actions
Below are some of the common Action used in Spark. Refer to the [RDD API docs](https://https://https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD) for more Actions

### collect()
All the elements in the RDD are returned

In [7]:
words_rdd.collect()

['scala',
 'java',
 'hadoop',
 'spark',
 'pandas',
 'spark vs hadoop',
 'pyspark',
 'pyspark and spark',
 'sparklyr',
 'koalas']

### first()
Returns the first element in an RDD

In [8]:
words_rdd.first()

'scala'

### take(n)
Returns the nth element in an RDD

In [9]:
words_rdd.take(3)

['scala', 'java', 'hadoop']

### count()
Counts the number of elements in an RDD

In [10]:
words_rdd.count()

10

### reduce()
Combine all elements to a single result of the same type

In [11]:
nums = sc.parallelize([1,2,4,5])
nums.reduce(lambda x, y: x + y)

12

## Common Transformations
Below are some of the common Transformations used in Spark. Refer to the [RDD API docs](https://https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD) for more Transformations

### map(f)
A new RDD is returned by applying a function to each element in an RDD

In [12]:
words_map = words_rdd.map(lambda x: (x,1))
for w in words_map.collect(): print(w)

('scala', 1)
('java', 1)
('hadoop', 1)
('spark', 1)
('pandas', 1)
('spark vs hadoop', 1)
('pyspark', 1)
('pyspark and spark', 1)
('sparklyr', 1)
('koalas', 1)


### flatMap(f)
Splits the lines of baseRDD into words

In [13]:
test = ["Pyspark is awesome", "Mindful feast", "world hunger"]
test_rdd = sc.parallelize(test)

test_rdd.collect()
x = test_rdd.flatMap(lambda x: x.split(' '))
x.collect()

['Pyspark', 'is', 'awesome', 'Mindful', 'feast', 'world', 'hunger']

### filter(f)
A new RDD is returned containing the elements, which satisfies the function inside the filter. In the following example, we filter out the strings containing ''spark".

In [14]:
res = words_rdd.filter(lambda x: "spark" in x)
for w in res.collect(): print(w)

spark
spark vs hadoop
pyspark
pyspark and spark
sparklyr


# Creating RDD from external datasets using `textFile()`


In [15]:
file = sc.textFile("pyspark_documentation.txt")
type(file)

pyspark.rdd.RDD

In [16]:
# Split the lines into words and print them all
words = file.flatMap(lambda x: x.split())
for word in words.collect(): print(word)

PySpark
is
an
interface
for
Apache
Spark
in
Python.
It
not
only
allows
you
to
write
Spark
applications
using
Python
APIs,
but
also
provides
the
PySpark
shell
for
interactively
analyzing
your
data
in
a
distributed
environment.
PySpark
supports
most
of
Spark's
features
such
as
Spark
SQL,
DataFrame,
Streaming,
MLlib
(Machine
Learning)
and
Spark
Core


In [17]:
# Count the total number of words in the file
words.count()

54

In [18]:
# Count the nunber of instance of the word "spark" in the file
res = words.filter(lambda x: 'Spark' in x)
res.count()

8

In [19]:
# Calculate the total number of characters in the file
lines = file.map(lambda x: len(x))
lines.reduce(lambda x, y: x + y)

345