# Spark - Operations

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
.appName("RDD_Operations") \
.getOrCreate()

25/01/30 08:17:08 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [9]:
customer_data = [
"customer_id,name,city,state,country,registration_date,is_active",
"0,Customer_0,Bangalore,Karnataka,India,2023-11-11,True",
"1,Customer_1,Hyderabad,Delhi,India,2023-08-26,True",
"2,Customer_2,Ahmedabad,West Bengal,India,2023-06-23,True",
"3,Customer_3,Bangalore,Tamil Nadu,India,2023-03-24,False",
"4,Customer_4,Bangalore,Gujarat,India,2023-06-06,False",
"5,Customer_5,Delhi,Maharashtra,India,2023-04-19,False",
]


In [10]:
data_rdd = spark.sparkContext.parallelize(customer_data)

In [7]:
data_rdd.getNumPartitions()

2

In [8]:
# RDD - Rsilient Distributed Dataset

# first() -returns the first element of RDD

In [11]:
header = data_rdd.first()

                                                                                

In [51]:
header

'customer_id,name,city,state,country,registration_date,is_active'

# Filter () -> based on the condition its going to transform the dataset

In [14]:
data_rdd = data_rdd.filter(lambda row : row!=header)

In [15]:
data_rdd

PythonRDD[3] at RDD at PythonRDD.scala:53

# Map() - It applies a function to each element in an RDD.

In [16]:
data_rdd.collect()

['0,Customer_0,Bangalore,Karnataka,India,2023-11-11,True',
 '1,Customer_1,Hyderabad,Delhi,India,2023-08-26,True',
 '2,Customer_2,Ahmedabad,West Bengal,India,2023-06-23,True',
 '3,Customer_3,Bangalore,Tamil Nadu,India,2023-03-24,False',
 '4,Customer_4,Bangalore,Gujarat,India,2023-06-06,False',
 '5,Customer_5,Delhi,Maharashtra,India,2023-04-19,False']

In [22]:
first_row = data_rdd.first()
first_row.split(',')[6] == "True"

False

In [23]:
def parse_row(row):
    fields = row.split(',')
    return (
     int(fields[0]),
        fields[1],
        fields[2],
        fields[3],
        fields[4],
        fields[5],
        fields[6]=='True'
    )

In [24]:
parsed_rdd = data_rdd.map(parse_row)

In [26]:
parsed_rdd.collect()

[(0, 'Customer_0', 'Bangalore', 'Karnataka', 'India', '2023-11-11', True),
 (1, 'Customer_1', 'Hyderabad', 'Delhi', 'India', '2023-08-26', True),
 (2, 'Customer_2', 'Ahmedabad', 'West Bengal', 'India', '2023-06-23', True),
 (3, 'Customer_3', 'Bangalore', 'Tamil Nadu', 'India', '2023-03-24', False),
 (4, 'Customer_4', 'Bangalore', 'Gujarat', 'India', '2023-06-06', False),
 (5, 'Customer_5', 'Delhi', 'Maharashtra', 'India', '2023-04-19', False)]

# Advance RDD Operations

In [None]:
# Extract a field with Map() - Customer and City

In [29]:
name_city_rdd = parsed_rdd.map(lambda row : (row[1],row[2]))
name_city_rdd.first()

('Customer_0', 'Bangalore')

In [None]:
# Filter out Active Customers

In [30]:
active_customers = parsed_rdd.filter(lambda row: row[6] == True)

In [31]:
active_customers.collect()

[(0, 'Customer_0', 'Bangalore', 'Karnataka', 'India', '2023-11-11', True),
 (1, 'Customer_1', 'Hyderabad', 'Delhi', 'India', '2023-08-26', True),
 (2, 'Customer_2', 'Ahmedabad', 'West Bengal', 'India', '2023-06-23', True)]

In [None]:
# distinct() - Transfomration

In [32]:
cities_rdd = parsed_rdd.map(lambda row:row[2]).distinct()

In [34]:
cities_rdd.collect()

['Bangalore', 'Hyderabad', 'Ahmedabad', 'Delhi']

In [39]:
# take()

cities_rdd.take(1)

['Bangalore']

In [None]:
# Reduce By Key - Transformation
# combines values for each key using an associative reduce function.

In [42]:

# we can chain the functions together
customers_per_city = parsed_rdd.map(lambda row :(row[2],1)).reduceByKey(lambda x,y:x+y)

In [45]:
customers_per_city

PythonRDD[40] at RDD at PythonRDD.scala:53

In [41]:
customers_per_city.collect()

[('Bangalore', 3), ('Hyderabad', 1), ('Ahmedabad', 1), ('Delhi', 1)]

In [None]:
# CountByValue

In [43]:
cust_per_city = parsed_rdd.map(lambda row:row[2]).countByValue()

In [53]:
cust_per_city

defaultdict(int, {'Bangalore': 3, 'Hyderabad': 1, 'Ahmedabad': 1, 'Delhi': 1})

# Important - ReducebyKey is a Transformation while COuntByValue is an Action

# Combine More Operations

In [47]:
# Cities with active customer

active_cities = parsed_rdd.filter(lambda row:row[6]) \
                          .map(lambda row:row[2]) \
                          .distinct()

active_cities.collect()

['Bangalore', 'Hyderabad', 'Ahmedabad']

# HW <-----

# Count Active Customer By State

In [None]:
# Run this code on 500mb file as well.

In [48]:
active_cities.saveAsTextFile("/tmp/active_cities.csv")

                                                                                

In [50]:
spark.stop()