In [1]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder.appName("customer_data")
         .master("local[*]")
         .config("spark.executor.memory", "1g")
         .getOrCreate()
         )

In [2]:
customer_data = [
"customer_id,name,city,state,country,registration_date,is_active",
"0,Customer_0,Pune,Maharashtra,India,2023-06-29,False",
"1,Customer_1,Bangalore,Tamil Nadu,India,2023-12-07,True",
"2,Customer_2,Hyderabad,Gujarat,India,2023-10-27,True",
"3,Customer_3,Bangalore,Karnataka,India,2023-10-17,False",
"4,Customer_4,Ahmedabad,Karnataka,India,2023-03-14,False",
"5,Customer_5,Hyderabad,Karnataka,India,2023-07-28,False"
]

In [3]:
data_rdd = spark.sparkContext.parallelize(customer_data)

In [4]:
data_rdd.getNumPartitions()

12

In [5]:
header = data_rdd.first()

In [6]:
header

'customer_id,name,city,state,country,registration_date,is_active'

In [7]:
#Filter
data_rdd = data_rdd.filter(lambda row: row != header)

In [8]:
data_rdd

PythonRDD[3] at RDD at PythonRDD.scala:53

In [9]:
data_rdd.collect()

['0,Customer_0,Pune,Maharashtra,India,2023-06-29,False',
 '1,Customer_1,Bangalore,Tamil Nadu,India,2023-12-07,True',
 '2,Customer_2,Hyderabad,Gujarat,India,2023-10-27,True',
 '3,Customer_3,Bangalore,Karnataka,India,2023-10-17,False',
 '4,Customer_4,Ahmedabad,Karnataka,India,2023-03-14,False',
 '5,Customer_5,Hyderabad,Karnataka,India,2023-07-28,False']

In [10]:
first_row = data_rdd.first()
print(first_row.split(",")[6] == "True")

False


In [11]:
def parse_row(row):
    field = row.split(",")
    return (
        int(field[0]),
        field[1],
        field[2],
        field[3],
        field[4],
        field[5],
        field[6]=="True"
    )

In [12]:
parsed_rdd = data_rdd.map(parse_row)

In [13]:
parsed_rdd.collect()

[(0, 'Customer_0', 'Pune', 'Maharashtra', 'India', '2023-06-29', False),
 (1, 'Customer_1', 'Bangalore', 'Tamil Nadu', 'India', '2023-12-07', True),
 (2, 'Customer_2', 'Hyderabad', 'Gujarat', 'India', '2023-10-27', True),
 (3, 'Customer_3', 'Bangalore', 'Karnataka', 'India', '2023-10-17', False),
 (4, 'Customer_4', 'Ahmedabad', 'Karnataka', 'India', '2023-03-14', False),
 (5, 'Customer_5', 'Hyderabad', 'Karnataka', 'India', '2023-07-28', False)]

# Advance RDD Operations

Extract with map() - customer and city

In [14]:
name_city_rdd = parsed_rdd.map(lambda row: (row[1], row[2]))
name_city_rdd.collect()

[('Customer_0', 'Pune'),
 ('Customer_1', 'Bangalore'),
 ('Customer_2', 'Hyderabad'),
 ('Customer_3', 'Bangalore'),
 ('Customer_4', 'Ahmedabad'),
 ('Customer_5', 'Hyderabad')]

In [15]:
active_customer = parsed_rdd.filter(lambda row: row[6] == True)
active_customer.collect()

[(1, 'Customer_1', 'Bangalore', 'Tamil Nadu', 'India', '2023-12-07', True),
 (2, 'Customer_2', 'Hyderabad', 'Gujarat', 'India', '2023-10-27', True)]

In [16]:
cities_rdd = parsed_rdd.map(lambda row: row[2]).distinct()
cities_rdd.collect()

['Pune', 'Hyderabad', 'Bangalore', 'Ahmedabad']

In [17]:
# take()
cities_rdd.take(2)

['Pune', 'Hyderabad']

Reduce by key (Transformation)

In [18]:
customers_per_city = parsed_rdd.map(lambda row: (row[2], 1)).reduceByKey(lambda x, y: x + y)
customers_per_city.collect()

[('Pune', 1), ('Hyderabad', 2), ('Bangalore', 2), ('Ahmedabad', 1)]

Count by value (Action)

In [19]:
cust_per_city = parsed_rdd.map(lambda row: row[2]).countByValue()
cust_per_city

defaultdict(int, {'Pune': 1, 'Bangalore': 2, 'Hyderabad': 2, 'Ahmedabad': 1})

# More Operations

In [20]:
# Cities with active customer
active_cities = (parsed_rdd.filter(lambda row: row[6] == True)
                            .map(lambda row: row[2])
                            .distinct())
active_cities.collect()

['Hyderabad', 'Bangalore']

In [21]:
# Count active customer by state
active_cust_state = (parsed_rdd.filter(lambda row: row[6] == True)
                               .map(lambda row: (row[3], 1))
                               .reduceByKey(lambda x, y: x + y))
active_cust_state.collect()

[('Gujarat', 1), ('Tamil Nadu', 1)]