In [1]:
import pandas as pd
import numpy as np
import json
import math
import re

### Overview
- Driver Program : Users Main function
- Executors : Runs the job
- Resilient Distributed Dataset : RDD, collection of elelments partition across the nodes of cluster that can be operated on parallel.
- Variables : Broadcast Variables(all nodes), Accumulators (added)


### Linking with Spark
- pyspark 4.0.0

In [2]:
from pyspark import SparkContext, SparkConf

### Initialising Spark

In [3]:
#conf=SparkConf().setAppName("Spark_RDD").setMaster("local")
#sc=SparkContext(conf=conf)
conf=SparkConf().setAppName("Spark_RDD").setMaster("local")
sc=SparkContext(conf=conf)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/14 22:28:12 WARN Utils: Your hostname, Nitishs-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.5 instead (on interface en0)
25/07/14 22:28:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/14 22:28:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Resilient Distributed Datasets (RDDs)
- Fault Tolerant collection of elements
- Parallel Operations
- Parallelizing, Referencing an External Storage system that support Hadoop Input format.

In [4]:
data=list(range(1,11))
distributeddata=sc.parallelize(data,2)

In [5]:
res=distributeddata.map(lambda x: (x,"a"*x))
res.collect()

                                                                                

[(1, 'a'),
 (2, 'aa'),
 (3, 'aaa'),
 (4, 'aaaa'),
 (5, 'aaaaa'),
 (6, 'aaaaaa'),
 (7, 'aaaaaaa'),
 (8, 'aaaaaaaa'),
 (9, 'aaaaaaaaa'),
 (10, 'aaaaaaaaaa')]

In [6]:
res.saveAsSequenceFile("/Users/nitish/Desktop/Code for Data/Py Spark/Apache Spark/Data/")

In [7]:
sorted(sc.sequenceFile("/Users/nitish/Desktop/Code for Data/Py Spark/Apache Spark/Data/").collect())

[(1, 'a'),
 (2, 'aa'),
 (3, 'aaa'),
 (4, 'aaaa'),
 (5, 'aaaaa'),
 (6, 'aaaaaa'),
 (7, 'aaaaaaa'),
 (8, 'aaaaaaaa'),
 (9, 'aaaaaaaaa'),
 (10, 'aaaaaaaaaa')]

In [8]:
res.first()

(1, 'a')

In [9]:
res.take(4)

[(1, 'a'), (2, 'aa'), (3, 'aaa'), (4, 'aaaa')]

### RDD Operations
- Transformations : Create a new dataset from existing one . Map
- Actions : Returns a value form a driver program after running computation on a dataset . Reduce
- Lazy Evaluation : Only computed when actions requires a result
- Persist : Cache for faster access

In [10]:
res.take(2)

[(1, 'a'), (2, 'aa')]

In [11]:
r2=res.map(lambda x: (x[0],x[1],len(x[1])))

In [12]:
r2.collect()

[(1, 'a', 1),
 (2, 'aa', 2),
 (3, 'aaa', 3),
 (4, 'aaaa', 4),
 (5, 'aaaaa', 5),
 (6, 'aaaaaa', 6),
 (7, 'aaaaaaa', 7),
 (8, 'aaaaaaaa', 8),
 (9, 'aaaaaaaaa', 9),
 (10, 'aaaaaaaaaa', 10)]

In [13]:
totallength=r2.reduce(lambda a,b:a+b)

In [14]:
type(totallength)

tuple

In [15]:
r2.persist()

PythonRDD[11] at collect at /var/folders/kv/96360ndn5zxd9bhcmb69nzlm0000gn/T/ipykernel_5452/4136562622.py:1

### Passing Function to Spark
- Lambda Expression
- Local def
- Top Level functions

In [16]:
def do(self,rdd):
    field=self.field
    return rdd.map(lambda x: field +x)

In [17]:
### Understanding Closure : Scope and Lifecucle of a Variables and methods when executing code across a cluster
new_rdd=sc.parallelize(data)
new_rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [18]:
counter=0
def inc_counter(x):
    global counter
    counter += x
new_rdd.foreach(inc_counter)
counter

0

In [19]:
### Accumulator : Mechanism for self updating variable when execution is split up across worker nodes

In [20]:
lines=sc.parallelize(list(range(1,11)))
pairs=lines.map(lambda x: (x,1))
counts=pairs.reduceByKey(lambda a,b:a+b)


In [21]:
counts.take(2)

[(1, 1), (2, 1)]

### Transformations
- map(func)
- filter(func)
- flatMap(func)
- mapPartitions(func)
- mapPartitionsWithIndex(func)
- union(other Dataset)
- intersection (other Dataset)
- distinct([numPartitions])
- groupByKey([numPartitions])
- reduceByKey(func,[numPartitions])
- aggregateByKey(zeroValue)
- sortBykey
- join
- cogroup
- cartesian
- pipe
- coalesce(numPartitions)
- repartitionAndSortWithinPartition(partitioner)

#### Map

In [22]:
rdd=sc.parallelize([1,2,3,4,5])
squared=rdd.map(lambda x: x*x)
print(rdd.collect())
print(squared.collect())

words=sc.parallelize(["Hi Nitish","Hi Manu"])
upper=words.map(lambda x: x.upper())
prefix=words.map(lambda x: f"Hello, {x}")
print(words.collect())
print(upper.collect())
print(prefix.collect())

pairs=sc.parallelize([("Hi Nitish",1),("Hi Manu",2)])
keys=pairs.map(lambda x: x[0])
value=pairs.map(lambda x:x[1])
transform= pairs.map(lambda x: (x[0],x[1]*10))
print(keys.collect())
print(value.collect())
print(transform.collect())

print("\nJson_Parse:")
json_string=sc.parallelize(['{"name":"Nitish","age":20}','{"name":"Manu","age":19}'])
parsed=json_string.map(lambda x:json.loads(x))
name=parsed.map(lambda x:x['name'])
age=parsed.map(lambda x:x['age'])
print(f"{name.collect()},{age.collect()}")

print("\nMultiple field Extraction")
data=sc.parallelize(["Nitish,26,Engineer","Manu,25,Doctor","Hary,25,Engineer"])
structured=data.map(lambda x:
                    {"name":x.split(',')[0].title(),
                    "age":int(x.split(',')[1]),
                    "Profession":x.split(',')[2]})
print(structured.collect())

print("\nMathematical Functions:")
data=list(range(1,5))
numbers=sc.parallelize(data)
sqrt_value=numbers.map(lambda x: math.sqrt(x))
tuple_value=numbers.map(lambda x: (x,x*2,x**2))
print(sqrt_value.collect())
print(tuple_value.collect())

print("\nConditional Transformation")
numbers=sc.parallelize(list(range(1,10)))
decile=numbers.map(lambda x:
                    "small" if x<4 else
                    "medium" if x<8 else "large" )
print(decile.collect())


print("\nCustom Function")
def process_record(data):
    parts=data.split(",")
    return {
        "Name":parts[0].upper(),
        "Age":int(parts[1]),
        "Designation": parts[1]
    }

raw_data=sc.parallelize({
    "Nitish, 20, Engineer","Manu, 19, Doctor","Ragu , 20,Engineer"
})
process_data=raw_data.map(process_record)
print(process_data.collect())


print("\n map(1:1) vs flatMap(1:many)")
words=sc.parallelize(["hello world","spark rdd correct"])
val1=words.map(lambda x: x.split())
val2=words.flatMap(lambda x: x.split())
print(f'{val1.count()},{val2.count()}')
print(f'{val1.collect()},{val2.collect()}')


[1, 2, 3, 4, 5]
[1, 4, 9, 16, 25]
['Hi Nitish', 'Hi Manu']
['HI NITISH', 'HI MANU']
['Hello, Hi Nitish', 'Hello, Hi Manu']
['Hi Nitish', 'Hi Manu']
[1, 2]
[('Hi Nitish', 10), ('Hi Manu', 20)]

Json_Parse:
['Nitish', 'Manu'],[20, 19]

Multiple field Extraction
[{'name': 'Nitish', 'age': 26, 'Profession': 'Engineer'}, {'name': 'Manu', 'age': 25, 'Profession': 'Doctor'}, {'name': 'Hary', 'age': 25, 'Profession': 'Engineer'}]

Mathematical Functions:
[1.0, 1.4142135623730951, 1.7320508075688772, 2.0]
[(1, 2, 1), (2, 4, 4), (3, 6, 9), (4, 8, 16)]

Conditional Transformation
['small', 'small', 'small', 'medium', 'medium', 'medium', 'medium', 'large', 'large']

Custom Function
[{'Name': 'NITISH', 'Age': 20, 'Designation': ' 20'}, {'Name': 'MANU', 'Age': 19, 'Designation': ' 19'}, {'Name': 'RAGU ', 'Age': 20, 'Designation': ' 20'}]

 map(1:1) vs flatMap(1:many)
2,5
[['hello', 'world'], ['spark', 'rdd', 'correct']],['hello', 'world', 'spark', 'rdd', 'correct']


#### Filter

In [23]:
numbers=sc.parallelize([1,2,3,4,5,6,7,8,9,10])
evens=numbers.filter(lambda x: x%2==0)
greater_than_5=numbers.filter(lambda x :x>5)
print(evens.collect())
print(greater_than_5.collect())

print("\nFilter Strings")
strings=sc.parallelize(["apple","banana","litchi","kiwi","strawberry"])
len_5=strings.filter(lambda x: len(x)>5)
print(len_5.collect())

print("\nRange and Boundary Filtering")
scores=sc.parallelize([60,69,74,99,85,84,92])
passing_scores=scores.filter(lambda x: x>=75)
excellent=scores.filter(lambda x: 85 <= x <= 100)
print(passing_scores.collect())
print(excellent.collect())

print("\n Text and Pattern filtering ")
sentence=sc.parallelize(["The quick brown fox","Jumps over the lazy dog","Python is awsome","Spark Processes big Data"])
contains_the=sentence.filter(lambda x: "the" in x.lower())
email_pattern=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9._]+[a-zA-Z]{2,}$'
emails=sc.parallelize(["nitishtripurawork@gmail.com","testemail123@gmail.in","mywork@gmail.in","staf@edu.in","123@ae@#.incom"])
valid_emails=emails.filter(lambda x: re.match(email_pattern,x))
print(contains_the.collect())
print(valid_emails.collect())


print("\n Key value Filtering")
key_value_pairs=sc.parallelize([("apple",1),("banana",2),("cherry",3),("date",2),("muskmelon",3)])
starts_with_c=key_value_pairs.filter(lambda x: x[0].startswith('c'))
val_greater_than_2=key_value_pairs.filter(lambda x:x[1]>2)
print(starts_with_c.collect())
print(val_greater_than_2.collect())

print("\n Complex filtering with multiple conditions")
students=sc.parallelize([("Alice", 85, "Math"),("Bob",65,"CSE"),("Mark",95,"Math"),("Methew",86,"Math")])
math_high_score=students.filter(lambda x: x[1]>90 and x[2].lower()=="math")
M_or_CSE=students.filter(lambda x: x[0][0]=="M" or x[2]=="CSE")
print(math_high_score.collect())
print(M_or_CSE.collect())

print("\n Custom Business Logic")
def is_premium(data):
    return (data["purchases"]>5 and data["total_spent"]>1000 and data["membership_years"]>=2)
cus=sc.parallelize([{"name":"John","purchases":8,"total_spent":1500,"membership_years":3},
                    {"name":"Jane","purchases":3,"total_spent":800,"membership_years":1},
                    {"name":"Bob","purchases":12,"total_spent":2000,"membership_years":4}])
premium_customers=cus.filter(is_premium)
print(premium_customers.collect())

print("\n Combining Filter with map")
numbers=sc.parallelize(range(1,100))
result=numbers.filter(lambda x:x%2==0).map(lambda x:x*x).filter(lambda x:x>100).collect()
print(result)

[2, 4, 6, 8, 10]
[6, 7, 8, 9, 10]

Filter Strings
['banana', 'litchi', 'strawberry']

Range and Boundary Filtering
[99, 85, 84, 92]
[99, 85, 92]

 Text and Pattern filtering 
['The quick brown fox', 'Jumps over the lazy dog']
['nitishtripurawork@gmail.com', 'testemail123@gmail.in', 'mywork@gmail.in', 'staf@edu.in']

 Key value Filtering
[('cherry', 3)]
[('cherry', 3), ('muskmelon', 3)]

 Complex filtering with multiple conditions
[('Mark', 95, 'Math')]
[('Bob', 65, 'CSE'), ('Mark', 95, 'Math'), ('Methew', 86, 'Math')]

 Custom Business Logic
[{'name': 'John', 'purchases': 8, 'total_spent': 1500, 'membership_years': 3}, {'name': 'Bob', 'purchases': 12, 'total_spent': 2000, 'membership_years': 4}]

 Combining Filter with map
[144, 196, 256, 324, 400, 484, 576, 676, 784, 900, 1024, 1156, 1296, 1444, 1600, 1764, 1936, 2116, 2304, 2500, 2704, 2916, 3136, 3364, 3600, 3844, 4096, 4356, 4624, 4900, 5184, 5476, 5776, 6084, 6400, 6724, 7056, 7396, 7744, 8100, 8464, 8836, 9216, 9604]


### Actions
- reduce(func)
- collect()
- count()
- first()
- take(n)
- takeOrdered(n,[ordering])
- saveAsTextFile(path)
- savsAsSequenceFile(path)
- countByKey()
- foreach(func)

In [24]:
sc.stop()