In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


SPARK setup in Colab

### install java

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

### Download spark from repository

In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz

###Unzip the downloaded file

In [None]:
!tar xf spark-3.5.7-bin-hadoop3.tgz

###Install find spark

In [None]:
!pip install -q findspark

###Set environment variable for Java and Spark

In [None]:
import os
os.environ["JAVA_HOME"] ="/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"]= "/content/spark-3.5.7-bin-hadoop3"

###Importng the findspark

In [None]:
import findspark
findspark.init()

##Spark-Creation of RDDs

###Importing the SparkContext and SparkSession

In [None]:
#from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RDD").getOrCreate()
sc= spark.sparkContext # get SparkContext from Spark session

##RESILIENT DISTRIBUTED DATASET(RDD)

###1.Parallelizing a collection

In [None]:
#create an RDD from a list
a=[1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(a)
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
type(rdd)

In [None]:
type(a)

list

In [None]:
nums = range(0,5)
print(nums)
list(nums)

range(0, 5)


[0, 1, 2, 3, 4]

In [None]:
type(nums)

range

In [None]:
rdd= sc.parallelize(nums,numSlices=10)
print("Default parallelism:{}".format(sc.defaultParallelism))
print("Number of Partition:{}".format(rdd.getNumPartitions()))
print(" Partitioner:{}".format(rdd.partitioner))
print(" Partitions structure:{}".format(rdd.glom().collect()))


Default parallelism:2
Number of Partition:10
 Partitioner:None
 Partitions structure:[[], [0], [], [1], [], [2], [], [3], [], [4]]


In [None]:
rdd1=rdd.repartition(2)
print("Number of Partition:{}".format(rdd1.getNumPartitions()))
print(" Partitions structure:{}".format(rdd1.glom().collect()))

Number of Partition:2
 Partitions structure:[[1, 2, 4], [0, 3]]


In [None]:
rdd.reduce(lambda a,b:a+b)

10

In [None]:
type(rdd)

In [None]:
# creating an RDD from a tuple
rdd1 = sc.parallelize(('apple','banana','mango',"orange",'plum','cherry'))
rdd1.collect()

['apple', 'banana', 'mango', 'orange', 'plum', 'cherry']

In [None]:
#creating a list of tuples
s = [('cat','dog'),('elephant','zebra')]
rdd1 = sc.parallelize(s)
rdd1.collect()

[('cat', 'dog'), ('elephant', 'zebra')]

##2.Referencing the Dataset

In [None]:
#creating an RDD from a txt file
txt = sc.textFile("/content/drive/MyDrive/spark.txt")
txt.collect()

['Checking 1 2 3 ']

In [None]:
#creating an RDD from csv file
csvFile = spark.read.csv("/content/drive/MyDrive/Advertising.csv",header=True,inferSchema=True)
csvFile.show()

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
|  6|  8.7| 48.9|     75.0|  7.2|
|  7| 57.5| 32.8|     23.5| 11.8|
|  8|120.2| 19.6|     11.6| 13.2|
|  9|  8.6|  2.1|      1.0|  4.8|
| 10|199.8|  2.6|     21.2| 10.6|
| 11| 66.1|  5.8|     24.2|  8.6|
| 12|214.7| 24.0|      4.0| 17.4|
| 13| 23.8| 35.1|     65.9|  9.2|
| 14| 97.5|  7.6|      7.2|  9.7|
| 15|204.1| 32.9|     46.0| 19.0|
| 16|195.4| 47.7|     52.9| 22.4|
| 17| 67.8| 36.6|    114.0| 12.5|
| 18|281.4| 39.6|     55.8| 24.4|
| 19| 69.2| 20.5|     18.3| 11.3|
| 20|147.3| 23.9|     19.1| 14.6|
+---+-----+-----+---------+-----+
only showing top 20 rows



In [None]:
words = sc.parallelize(("the","quick","brown","fox","jumps","over","the","lazy","dog"))
wordPair = words.map(lambda w:w[0])
wordPair.collect()

['t', 'q', 'b', 'f', 'j', 'o', 't', 'l', 'd']

In [None]:
type(words)


In [None]:
type(wordPair)

In [None]:
a = sc.parallelize([2,3,4,5])
square = a.map(lambda a:a*a)

In [None]:
for x in square.collect():
  print(x)

4
9
16
25


In [None]:
#Example : map transformation
import math
v_RDD = sc.parallelize([1,2,4,8,16])
r = v_RDD.map(lambda x: math.log(x,2)) #transform value into its log2
r.collect()

[0.0, 1.0, 2.0, 3.0, 4.0]

In [None]:
# keys (year) and values (temperature)


In [None]:
#word count
words = "to be or not to be".split()
print(words)

['to', 'be', 'or', 'not', 'to', 'be']


In [None]:
wordRdd = sc.parallelize(words)
type(wordRdd)

In [None]:
#count each words
wordTuplesRdd = wordRdd.map(lambda x:(x,1))
wordTuplesRdd.collect()


[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]

In [None]:
type(wordTuplesRdd)

In [None]:
wordCountsRdd = wordTuplesRdd.reduceByKey(lambda x,y:x+y)
wordCountsRdd

PythonRDD[64] at RDD at PythonRDD.scala:53

In [None]:
wordCounts= wordCountsRdd.collect()
wordCounts

[('to', 2), ('be', 2), ('or', 1), ('not', 1)]