# Using map and filter methods with lambda functions on RDD

In [1]:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder \
#     .master("local[1]") \
#     .appName("MySparkApp") \
#     .config("spark.some.config.option", "some-value") \
#     .getOrCreate()


## Lambda function

Printing a lambda function shows it is indeed a function

In [2]:
lambda x: x + 1

<function __main__.<lambda>(x)>

Following assigns a name to the lambda function

In [3]:
func = lambda r: r + 1

func(3)

4

## Toy Data

From https://sparkbyexamples.com/

In [4]:
data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+



## RDD: resilient distributed dataset

Raw RDD does not have index names

In [5]:
rdd = df.rdd
dataColl=rdd.collect()

for row in dataColl:
    print(row[0] + ", " +str(row[1]))

James, Smith
Anna, Rose
Robert, Williams


## Map operation

Map operation applies a function to each row of RDD. To print the contents nicely, turn RDD back into a dataframe

Following example uses an anonymous lambda function

In [6]:
rdd_rename_1 = df.rdd.map(lambda x: 
    (x.firstname+", "+x.lastname,x.gender,x.salary*2)
    )
rdd_rename_1.toDF(["name", "gender", "salary"]).show()

+----------------+------+------+
|            name|gender|salary|
+----------------+------+------+
|    James, Smith|     M|    60|
|      Anna, Rose|     F|    82|
|Robert, Williams|     M|   124|
+----------------+------+------+



Following example uses an named lambda function. Note the function call is clearer at the cost of creating another lingering object.

In [7]:
rename_lambda_func = lambda x: (x.firstname+", "+x.lastname,x.gender,x.salary*2)

rdd_rename_2 = df.rdd.map(rename_lambda_func)
rdd_rename_2.toDF(["name", "gender", "salary"]).show()

+----------------+------+------+
|            name|gender|salary|
+----------------+------+------+
|    James, Smith|     M|    60|
|      Anna, Rose|     F|    82|
|Robert, Williams|     M|   124|
+----------------+------+------+



Following non-lambda function is equivalent.

In [8]:
def rename_def_func(x):
    return (x.firstname+", "+x.lastname,x.gender,x.salary*2)

rdd_rename_3 = df.rdd.map(rename_def_func)
rdd_rename_3.toDF(["name", "gender", "salary"]).show()

+----------------+------+------+
|            name|gender|salary|
+----------------+------+------+
|    James, Smith|     M|    60|
|      Anna, Rose|     F|    82|
|Robert, Williams|     M|   124|
+----------------+------+------+



## Filtering operation

Filtering rows of RDD can be achieved with lambda functions as well.

In [9]:
rdd_filter = df.rdd.filter(lambda x: "James" in x.firstname)
rdd_filter.toDF(["name", "gender", "salary"]).show()

+-----+------+------+------+
| name|gender|salary|salary|
+-----+------+------+------+
|James| Smith|     M|    30|
+-----+------+------+------+

