<a href="https://colab.research.google.com/github/roytalari/SparkPySpark/blob/main/PySparkRDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Prepare the environment for Pyspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.6.tgz
!tar -xvf spark-2.4.3-bin-hadoop2.6.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.6"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [6]:
df = spark.createDataFrame([{"Google": "Colab","Spark":"Scala"},{"Google": "Runbook","Spark":"PySpark"}])
df.show()



+-------+-------+
| Google|  Spark|
+-------+-------+
|  Colab|  Scala|
|Runbook|PySpark|
+-------+-------+



COPY A DATA FILE TO YOUR LOCAL COLAB ENVIRONMENT

In [21]:
!wget https://raw.githubusercontent.com/futurexskill/bigdata/master/retailstore.csv

--2020-10-23 18:10:14--  https://raw.githubusercontent.com/futurexskill/bigdata/master/retailstore.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 279 [text/plain]
Saving to: ‘retailstore.csv.1’


2020-10-23 18:10:15 (13.6 MB/s) - ‘retailstore.csv.1’ saved [279/279]



Local Retailstore file is in Colab environment

In [27]:
!ls -h

retailstore.csv    sample_data		      spark-2.4.3-bin-hadoop2.6.tgz
retailstore.csv.1  spark-2.4.3-bin-hadoop2.6


# Resilient Distributed Datasets(RDD)

Import SparkContext and SparkConf

In [28]:
from pyspark import SparkContext

# Create Spark Context from Spark Session

In [29]:
sc = spark.sparkContext

# Create a RDD from Python List

In [13]:
sampleRDD = sc.parallelize([10,20,30,40,50,60])

In [14]:
type(sampleRDD)

pyspark.rdd.RDD

In [15]:
sampleRDD.collect()

[10, 20, 30, 40, 50, 60]

# Read the CSV file into a RDD

In [33]:
customerData = sc.textFile("retailstore.csv.1")

In [34]:
type(customerData)

pyspark.rdd.RDD

# Perform RDD Operations

## print all records

In [35]:
customerData.collect()

['Age,Salary,Gender,Country,Purchased',
 '18,20000,Male,Germany,N',
 '19,22000,Female,France,N',
 '20,24000,Female,England,N',
 '21,,Male,England,N',
 '22,50000,Male,France,Y',
 '23,35000,Female,England,N',
 '24,,Male,Germany,N',
 '25,32000,Female,France,Y',
 ',35000,Male,Germany,N',
 '27,37000,Female,France,N']

## Print Count

In [36]:
customerData.count()

11

## Print the first row

In [37]:
customerData.first()

'Age,Salary,Gender,Country,Purchased'

## Fetch the first 3 rows

In [38]:
customerData.take(3)

['Age,Salary,Gender,Country,Purchased',
 '18,20000,Male,Germany,N',
 '19,22000,Female,France,N']

## Fetch each row 

In [39]:
for line in customerData.collect():
  print(line)

Age,Salary,Gender,Country,Purchased
18,20000,Male,Germany,N
19,22000,Female,France,N
20,24000,Female,England,N
21,,Male,England,N
22,50000,Male,France,Y
23,35000,Female,England,N
24,,Male,Germany,N
25,32000,Female,France,Y
,35000,Male,Germany,N
27,37000,Female,France,N


# Spark Operations

1) Transformation(produces new RDD from existing RDD)

2) Action (returns the final result of RDD computation - collect, count, take)


# Transformation - *Map*

* Map is the most basic transformation function. It can call a function or an inline lambda expression to apply on one RDD and produce another RDD

* newRDD = oldrdd.map(function)
* It acts on one element at a time and performs some operation. Resulting RDD will have same number of elements as the original RDD

# Map

Replace "Male" with "M"

# Python lamda function

* A lamda function is an anonymous function
* A lamda function can take any number of arguments, but can only have one expression
* lambda arguments: expression
* e.g x = lambda a,b : a+b
* Without lambda
      def calculateSum(a,b)
          return a+b

In [41]:
customerData2 = customerData.map(lambda x: x.replace("Male", "M"))

In [42]:
customerData2.collect()

['Age,Salary,Gender,Country,Purchased',
 '18,20000,M,Germany,N',
 '19,22000,Female,France,N',
 '20,24000,Female,England,N',
 '21,,M,England,N',
 '22,50000,M,France,Y',
 '23,35000,Female,England,N',
 '24,,M,Germany,N',
 '25,32000,Female,France,Y',
 ',35000,M,Germany,N',
 '27,37000,Female,France,N']

# Transformation - Filter

* It filters an RDD to select elements based on the logic on the function.

* The funtion is used as a condition. It can be a simple inline lambda expression or a more complex function. 

* newRDD = oldrdd.filter(function)

# Filter

## display only females

In [43]:
femaleCustomers=customerData.filter(lambda x: "Female" in x)

In [44]:
femaleCustomers.collect()

['19,22000,Female,France,N',
 '20,24000,Female,England,N',
 '23,35000,Female,England,N',
 '25,32000,Female,France,Y',
 '27,37000,Female,France,N']

In [45]:
femaleCustomers.count()

5

# Transformation - flatMap

* flatMap works the same way as map but can return more elements thatn the original RDD

* It is used to break up elements in the original RDD and create a new RDD.

* newRDD = oldrdd.flatMap(function)

# flatMap

### create a new RDD by splitting each row with comma delimiter

In [46]:
words = femaleCustomers.flatMap(lambda line: line.split(","))

In [47]:
words.count()

25

In [48]:
words.collect()

['19',
 '22000',
 'Female',
 'France',
 'N',
 '20',
 '24000',
 'Female',
 'England',
 'N',
 '23',
 '35000',
 'Female',
 'England',
 'N',
 '25',
 '32000',
 'Female',
 'France',
 'Y',
 '27',
 '37000',
 'Female',
 'France',
 'N']

# Transformation - Set

* Set operations are performed on two RDDs.

* You can perform unions or intersections.



1.   unionRDD = firstRDD.union(secondRDD)
2.   intersectionRDD = firstRDD.intersect(secondRDD)




### Set - Union & Intersect 

In [65]:
rdd1 = sc.parallelize(["a","b","c","d","e"])
rdd2 = sc.parallelize(["c","e","k","l"])

#### Perform Union Operation

In [66]:
for unions in rdd1.union(rdd2).distinct().collect():
    print(unions)

b
c
l
a
e
d
k


#### Perform Intersect Operation

In [67]:
for intesects in rdd1.intersection(rdd2).distinct().collect():
  print(intesects)

c
e


#### Typically transformation are combined and performed in a sequence of operations within a python function. This is because transforming one by one can be costly. 

In [94]:
def transformRDD(customer):
  words = customer.split(",")
  #convert male to 0 and female to 1 
  if words[2] == 'Male':
    words[2] = "0"
  else:
    words[2] = "1"
  #Convert N to 0 and Y to 1 for the purchased value
  if words[4] == "N":
    words[4] = "0"
  else:
    words[4] = "1"
  #Convery Countty to upper case
  words[3] = words[3].upper()
  return ",".join(words)


#### Apply transformation using map

In [96]:
finaldata = customerData.map(transformRDD)

In [97]:
finaldata.collect()

['Age,Salary,1,COUNTRY,1',
 '18,20000,0,GERMANY,0',
 '19,22000,1,FRANCE,0',
 '20,24000,1,ENGLAND,0',
 '21,,0,ENGLAND,0',
 '22,50000,0,FRANCE,1',
 '23,35000,1,ENGLAND,0',
 '24,,0,GERMANY,0',
 '25,32000,1,FRANCE,1',
 ',35000,0,GERMANY,0',
 '27,37000,1,FRANCE,0']

['hot', 'mar', 'got']

AttributeError: ignored

In [85]:
x

['Age,Salary,Gender,Country,Purchased',
 '18,20000,Male,Germany,N',
 '19,22000,Female,France,N']