In [4]:
!hadoop fs -ls /tmp

Found 3 items
drwxrwxrwt   - hdfs            hadoop          0 2025-08-28 07:01 /tmp/hadoop-yarn
drwx-wx-wx   - hive            hadoop          0 2025-08-28 07:02 /tmp/hive
-rw-r--r--   2 nileshnandan_ts hadoop        110 2025-08-28 07:11 /tmp/input.txt


In [5]:
spark

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("RDD") \
.master("yarn") \
.getOrCreate()

25/08/28 09:04:02 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
hdfs_path = "/tmp/input.txt"
rdd_file = spark.sparkContext.textFile(hdfs_path)

In [4]:
rdd_1 = rdd_file.flatMap(lambda x:x.split(" "))
rdd_2 = rdd_1.map(lambda w:(w,1))
rdd_3 = rdd_2.reduceByKey(lambda a,b:a+b)

In [5]:
rdd_3.collect()

                                                                                

[('Nilesh,', 3),
 ('jin,', 3),
 ('Vegeta', 1),
 ('Goku,', 4),
 ('Vegeta,', 3),
 ('Nandan,', 2)]

In [7]:
rdd_3.getNumPartitions()

2

In [8]:
spark.conf.get('spark.sql.files.maxPartitionBytes')

'134217728b'

In [11]:
spark.sparkContext.defaultMinPartitions

2

In [13]:
spark.stop()

In [14]:
data = [
    "customer_id, name, city, state, country, registration_date, is_active",
    "0, Customer_0, Pune, West Bengal, India, 10/10/2023, TRUE",
    "1, Customer_1, Bangalore, Gujarat, India, 10/19/2023, FALSE",
    "2, Customer_2, Bangalore, Karnataka, India, 2/10/2023, TRUE",
    "3, Customer_3, Bangalore, Telangana, India, 3/24/2023, TRUE",
    "4, Customer_4, Hyderabad, Telangana, India, 6/4/2023, FALSE",
    "5, Customer_5, Hyderabad, West Bengal, India, 7/26/2023, TRUE"
]

In [15]:
spark = SparkSession.builder \
.appName("Customers") \
.master("yarn") \
.getOrCreate()

25/08/28 09:47:03 INFO SparkEnv: Registering MapOutputTracker
25/08/28 09:47:03 INFO SparkEnv: Registering BlockManagerMaster
25/08/28 09:47:03 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/08/28 09:47:03 INFO SparkEnv: Registering OutputCommitCoordinator


In [16]:
rdd = spark.sparkContext.parallelize(data)

In [17]:
rdd.collect()

                                                                                

['customer_id, name, city, state, country, registration_date, is_active',
 '0, Customer_0, Pune, West Bengal, India, 10/10/2023, TRUE',
 '1, Customer_1, Bangalore, Gujarat, India, 10/19/2023, FALSE',
 '2, Customer_2, Bangalore, Karnataka, India, 2/10/2023, TRUE',
 '3, Customer_3, Bangalore, Telangana, India, 3/24/2023, TRUE',
 '4, Customer_4, Hyderabad, Telangana, India, 6/4/2023, FALSE',
 '5, Customer_5, Hyderabad, West Bengal, India, 7/26/2023, TRUE']

In [18]:
header = rdd.first()

                                                                                

In [19]:
rdd_data = rdd.filter(lambda x:x!=header)

In [20]:
rdd_data.collect()

                                                                                

['0, Customer_0, Pune, West Bengal, India, 10/10/2023, TRUE',
 '1, Customer_1, Bangalore, Gujarat, India, 10/19/2023, FALSE',
 '2, Customer_2, Bangalore, Karnataka, India, 2/10/2023, TRUE',
 '3, Customer_3, Bangalore, Telangana, India, 3/24/2023, TRUE',
 '4, Customer_4, Hyderabad, Telangana, India, 6/4/2023, FALSE',
 '5, Customer_5, Hyderabad, West Bengal, India, 7/26/2023, TRUE']

In [37]:
def parse_row(r):
    r1 = r.split(",")
    return (
        int(r1[0]),
        r1[1],
        r1[2],
        r1[3],
        r1[4],
        r1[5],
        r1[6]==' TRUE'
    )

In [38]:
parsed_rdd = rdd_data.map(parse_row)

In [39]:
parsed_rdd.collect()

[(0, ' Customer_0', ' Pune', ' West Bengal', ' India', ' 10/10/2023', True),
 (1, ' Customer_1', ' Bangalore', ' Gujarat', ' India', ' 10/19/2023', False),
 (2, ' Customer_2', ' Bangalore', ' Karnataka', ' India', ' 2/10/2023', True),
 (3, ' Customer_3', ' Bangalore', ' Telangana', ' India', ' 3/24/2023', True),
 (4, ' Customer_4', ' Hyderabad', ' Telangana', ' India', ' 6/4/2023', False),
 (5,
  ' Customer_5',
  ' Hyderabad',
  ' West Bengal',
  ' India',
  ' 7/26/2023',
  True)]

In [41]:
city = parsed_rdd.map(lambda x:(x[1], x[2]))

In [43]:
city.first()

(' Customer_0', ' Pune')

In [44]:
active_customers = parsed_rdd.filter(lambda x:x[6]==True)

In [47]:
active_customers.first()

(0, ' Customer_0', ' Pune', ' West Bengal', ' India', ' 10/10/2023', True)

In [48]:
distinct_cities = parsed_rdd.map(lambda x:x[2]).distinct()

In [49]:
distinct_cities.collect()

                                                                                

[' Pune', ' Bangalore', ' Hyderabad']

In [50]:
distinct_cities.take(2)

[' Pune', ' Bangalore']

In [51]:
customers_per_city = parsed_rdd.map(lambda x:(x[2],1)).reduceByKey(lambda a,b:a+b)

In [54]:
customers_per_city.collect() #countByVlaue returns a dictionary not a rdd 

[(' Pune', 1), (' Bangalore', 3), (' Hyderabad', 2)]

In [55]:
state_active = parsed_rdd.map(lambda x:x[3]) \
                         .filter(lambda x:x[6]) \
                         .distinct()

In [56]:
state_active.collect()

[' West Bengal', ' Karnataka', ' Telangana', ' Gujarat']

In [None]:
spark.stop()