<a href="https://colab.research.google.com/github/srikanthgr/pyspark-python/blob/main/pyspark_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Install JDK
## Install Spark
## Set Environment variables
## Create a Spark Session


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.6.tgz
!tar -xvf spark-2.4.3-bin-hadoop2.6.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.6"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [2]:
df = spark.createDataFrame([{"Google":"Colab", "Spark":"Scala"},{"Google":"Dataproc", "Spark":"Python"}])
df.show()



+--------+------+
|  Google| Spark|
+--------+------+
|   Colab| Scala|
|Dataproc|Python|
+--------+------+



In [3]:
!wget https://raw.githubusercontent.com/srikanthgr/pyspark-python/main/retailstore.csv

--2021-03-04 09:59:46--  https://raw.githubusercontent.com/srikanthgr/pyspark-python/main/retailstore.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 279 [text/plain]
Saving to: ‘retailstore.csv’


2021-03-04 09:59:46 (15.0 MB/s) - ‘retailstore.csv’ saved [279/279]



In [4]:
!ls

retailstore.csv  [0m[01;34mspark-2.4.3-bin-hadoop2.6[0m/
[01;34msample_data[0m/     spark-2.4.3-bin-hadoop2.6.tgz


In [5]:
from pyspark import SparkContext
sc = spark.sparkContext
sampleRDD = sc.parallelize([10,20,30,40,50,60])
type(sampleRDD)
sampleRDD.collect()

[10, 20, 30, 40, 50, 60]

In [6]:
customerData = sc.textFile("retailstore.csv")
type(customerData)


pyspark.rdd.RDD

In [7]:
customerData.collect()

['Age,Salary,Gender,Country,Purchased',
 '18,20000,Male,Germany,N',
 '19,22000,Female,France,N',
 '20,24000,Female,England,N',
 '21,,Male,England,N',
 '22,50000,Male,France,Y',
 '23,35000,Female,England,N',
 '24,,Male,Germany,N',
 '25,32000,Female,France,Y',
 ',35000,Male,Germany,N',
 '27,37000,Female,France,N']

In [8]:
customerData.count()
customerData.first()
customerData.take(3)
for line in customerData.collect():
  print(line)


Age,Salary,Gender,Country,Purchased
18,20000,Male,Germany,N
19,22000,Female,France,N
20,24000,Female,England,N
21,,Male,England,N
22,50000,Male,France,Y
23,35000,Female,England,N
24,,Male,Germany,N
25,32000,Female,France,Y
,35000,Male,Germany,N
27,37000,Female,France,N


In [11]:
customerData2 = customerData.map(lambda x: x.replace("Male", "M"))
customerData2.collect()

['Age,Salary,Gender,Country,Purchased',
 '18,20000,M,Germany,N',
 '19,22000,Female,France,N',
 '20,24000,Female,England,N',
 '21,,M,England,N',
 '22,50000,M,France,Y',
 '23,35000,Female,England,N',
 '24,,M,Germany,N',
 '25,32000,Female,France,Y',
 ',35000,M,Germany,N',
 '27,37000,Female,France,N']

In [14]:
femaleCustomers = customerData.filter(lambda x: "Female" in x)
femaleCustomers.collect()
femaleCustomers.count()

5

In [20]:
words = femaleCustomers.flatMap(lambda line: line.split(","))
words.count()
words.collect()

['19',
 '22000',
 'Female',
 'France',
 'N',
 '20',
 '24000',
 'Female',
 'England',
 'N',
 '23',
 '35000',
 'Female',
 'England',
 'N',
 '25',
 '32000',
 'Female',
 'France',
 'Y',
 '27',
 '37000',
 'Female',
 'France',
 'N']

In [21]:
rdd1 = sc.parallelize(["a","b","c","d","e"])
rdd2 = sc.parallelize(["c","e","k","l"])


In [22]:
for unions in rdd1.union(rdd2).distinct().collect():
  print(unions)

b
c
l
a
e
d
k


In [23]:
for unions in rdd1.intersection(rdd2).distinct().collect():
  print(unions)

c
e


In [24]:
def transformRDD(customer):
  words = customer.split(",")
  if words[2] == "Male" : 
    words[0] = "0"
  else:
    words[2] = "1"
  
  if words[4] == "N":
    words[4] = "0"
  else:
    words[4]= "1"
  words[3] = words[3].upper()
  return ",".join(words)

In [26]:
transformedCustData = customerData.map(transformRDD)
transformedCustData.collect()

['Age,Salary,1,COUNTRY,1',
 '0,20000,Male,GERMANY,0',
 '19,22000,1,FRANCE,0',
 '20,24000,1,ENGLAND,0',
 '0,,Male,ENGLAND,0',
 '0,50000,Male,FRANCE,1',
 '23,35000,1,ENGLAND,0',
 '0,,Male,GERMANY,0',
 '25,32000,1,FRANCE,1',
 '0,35000,Male,GERMANY,0',
 '27,37000,1,FRANCE,0']

In [27]:
sampleRDD = sc.parallelize([10,20,30,40])
sampleRDD.reduce(lambda a, b: a + b)

100