<a href="https://colab.research.google.com/github/niteshsoni30/PySpark-RDD/blob/main/Spark_Rdd_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder\
.appName("Spark Rdd")\
.getOrCreate()

In [None]:
customer_data=[
    "customer_id,name,city,state,counry,registration_date,is_active",
    "0,Customer_0,Bangalore,karnataka,INdia,2023-11-11,True",
    "1,Customer_1,Heydrabad,karnataka,INdia,2023-11-11,True",
    "2,Customer_2,Ahemdabad,gujarat,INdia,2023-11-11,True",
    "3,Customer_3,Bangalore,karnataka,INdia,2023-11-11,False",
    "4,Customer_4,Bangalore,karnataka,INdia,2023-11-11,False",
    "5,Customer_5,Delhi,Dekhi,INdia,2023-11-11,False",
]

In [None]:
data_rdd = spark.sparkContext.parallelize(customer_data)

In [None]:
data_rdd.getNumPartitions()

2

In [None]:
#RDD- Resilient Distributed Dataset

In [None]:
#FIRST  returns first element of rdd

In [None]:
header = data_rdd.first()

In [None]:
header

'customer_id,name,city,state,counry,registration_date,is_active'

In [None]:
#filter the data
#removing the header from data

data_rdd = data_rdd.filter(lambda row:row!=header)

In [None]:
data_rdd.collect()

['0,Customer_0,Bangalore,karnataka,INdia,2023-11-11,True',
 '1,Customer_1,Heydrabad,karnataka,INdia,2023-11-11,True',
 '2,Customer_2,Ahemdabad,gujarat,INdia,2023-11-11,True',
 '3,Customer_3,Bangalore,karnataka,INdia,2023-11-11,False',
 '4,Customer_4,Bangalore,karnataka,INdia,2023-11-11,False',
 '5,Customer_5,Delhi,Dekhi,INdia,2023-11-11,False']

In [None]:
def parse_row(row):
  fields = row.split(",")
  return (
          fields[0],
          fields[1],
          fields[2],
          fields[3],
          fields[4],
          fields[5],
          fields[6] =='True'
  )

In [None]:
#map operation

#it applies a function to each element in an rdd

parsed_rdd = data_rdd.map(parse_row)

In [None]:
parsed_rdd.collect()

[('0', 'Customer_0', 'Bangalore', 'karnataka', 'INdia', '2023-11-11', True),
 ('1', 'Customer_1', 'Heydrabad', 'karnataka', 'INdia', '2023-11-11', True),
 ('2', 'Customer_2', 'Ahemdabad', 'gujarat', 'INdia', '2023-11-11', True),
 ('3', 'Customer_3', 'Bangalore', 'karnataka', 'INdia', '2023-11-11', False),
 ('4', 'Customer_4', 'Bangalore', 'karnataka', 'INdia', '2023-11-11', False),
 ('5', 'Customer_5', 'Delhi', 'Dekhi', 'INdia', '2023-11-11', False)]

In [None]:
#Advanced RDD Operations

In [None]:
name_city_rdd = parsed_rdd.map(lambda row: (row[1],row[2]))

In [None]:
name_city_rdd.collect()

[('Customer_0', 'Bangalore'),
 ('Customer_1', 'Heydrabad'),
 ('Customer_2', 'Ahemdabad'),
 ('Customer_3', 'Bangalore'),
 ('Customer_4', 'Bangalore'),
 ('Customer_5', 'Delhi')]

In [None]:
#filter out active customer


In [None]:
active_customers = parsed_rdd.filter(lambda row:row[6] == True)

In [None]:
active_customers.collect()

[('0', 'Customer_0', 'Bangalore', 'karnataka', 'INdia', '2023-11-11', True),
 ('1', 'Customer_1', 'Heydrabad', 'karnataka', 'INdia', '2023-11-11', True),
 ('2', 'Customer_2', 'Ahemdabad', 'gujarat', 'INdia', '2023-11-11', True)]

In [None]:
#distinct transformation

cities_rdd = parsed_rdd.map(lambda row:row[2]).distinct()

In [None]:
cities_rdd.collect()

['Ahemdabad', 'Delhi', 'Bangalore', 'Heydrabad']

In [None]:
#take
cities_rdd.take(2)

['Ahemdabad', 'Delhi']

In [None]:
# reduce by key transformation

customer_per_city = parsed_rdd.map(lambda row:(row[2],1)).reduceByKey(lambda x,y:x+y)

In [None]:
customer_per_city.collect()

[('Ahemdabad', 1), ('Delhi', 1), ('Bangalore', 3), ('Heydrabad', 1)]

In [None]:
#count by value

count_per_city = parsed_rdd.map(lambda row:row[2]).countByValue()

In [None]:
#Important reduce  by key is a transformation while count by key is an action

In [None]:
count_per_city

defaultdict(int, {'Bangalore': 3, 'Heydrabad': 1, 'Ahemdabad': 1, 'Delhi': 1})

In [None]:
#Combine more operation
active_cities = parsed_rdd.filter(lambda row:row[6])\
                          .map(lambda row:row[2])\
                          .distinct()

In [None]:
active_cities.collect()

['Ahemdabad', 'Bangalore', 'Heydrabad']

In [None]:
#count active customer by state

active_customer = parsed_rdd.filter(lambda row:row[6] == True)

In [None]:
active_customer.collect()

[('0', 'Customer_0', 'Bangalore', 'karnataka', 'INdia', '2023-11-11', True),
 ('1', 'Customer_1', 'Heydrabad', 'karnataka', 'INdia', '2023-11-11', True),
 ('2', 'Customer_2', 'Ahemdabad', 'gujarat', 'INdia', '2023-11-11', True)]

In [None]:
active_state_customer = parsed_rdd.filter(lambda row:row[6]== True).map(lambda row:(row[3],1)).reduceByKey(lambda x,y:x+y)

In [None]:
active_state_customer.collect()

[('karnataka', 2), ('gujarat', 1)]

In [109]:
# Reduce by key is a good fit because it works in active memory so it uses less disk r\W resources in compare of group by.
# we can see use of resorces in YARN

In [110]:
#increasing and decresing partation in pyspark
# repartation or coalsec

In [111]:
parsed_rdd.getNumPartitions()

2

In [117]:
part_increase = parsed_rdd.repartition(4) #we must have to save in another variable for increasing and decreasing the partation

In [118]:
part_increase.getNumPartitions()

4

In [1]:
#coalsec is prefered for decresing the partation because it's faster

In [2]:
spark.stop()

NameError: name 'spark' is not defined