## User-Defined Functions (UDF) in Pyspark

In [7]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



## Activity: Write a function in Python that takes an String and capitalized the first letters
- Your function name can be convertCase

In [2]:
s1 = 'john jones'
convertCase(s1)

'John Jones'

In [9]:
def convertCase(string):
    resStr=""
    arr = string.split(" ")
    for x in arr:
        resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr

In [11]:
convertUDF = udf(lambda z: convertCase(z), StringType())

df.select(col("Seqno"), convertUDF(col("Name")).alias("Name_Upper")).show(truncate=False)

+-----+-------------+
|Seqno|Name_Upper   |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



## Withcolumn, withColumnRenamed and lit

In [12]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|dob       |gender|salary|
+---------+----------+--------+----------+------+------+
|James    |          |Smith   |1991-04-01|M     |3000  |
|Michael  |Rose      |        |2000-05-19|M     |4000  |
|Robert   |          |Williams|1978-09-05|M     |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |
+---------+----------+--------+----------+------+------+



In [13]:
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.printSchema()
df2.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|dob       |gender|salary|
+---------+----------+--------+----------+------+------+
|James    |          |Smith   |1991-04-01|M     |3000  |
|Michael  |Rose      |        |2000-05-19|M     |4000  |
|Robert   |          |Williams|1978-09-05|M     |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |
+---------+----------+--------+----------+------+------+



In [14]:
df3 = df.withColumn("CopiedColumn",col("salary")* -1)
df3.printSchema()
df3.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- CopiedColumn: long (nullable = true)

+---------+----------+--------+----------+------+------+------------+
|firstname|middlename|lastname|       dob|gender|salary|CopiedColumn|
+---------+----------+--------+----------+------+------+------------+
|    James|          |   Smith|1991-04-01|     M|  3000|       -3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|       -4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|       -4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|       -4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|           1|
+---------+----------+--------+----------+------+------+------------+



In [15]:
df4 = df.withColumn("Country", lit("USA"))
df4.show()

+---------+----------+--------+----------+------+------+-------+
|firstname|middlename|lastname|       dob|gender|salary|Country|
+---------+----------+--------+----------+------+------+-------+
|    James|          |   Smith|1991-04-01|     M|  3000|    USA|
|  Michael|      Rose|        |2000-05-19|     M|  4000|    USA|
|   Robert|          |Williams|1978-09-05|     M|  4000|    USA|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|    USA|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|    USA|
+---------+----------+--------+----------+------+------+-------+



In [17]:
df = df.withColumnRenamed("gender","sex")
df.show(truncate=False)

+---------+----------+--------+----------+---+------+
|firstname|middlename|lastname|dob       |sex|salary|
+---------+----------+--------+----------+---+------+
|James    |          |Smith   |1991-04-01|M  |3000  |
|Michael  |Rose      |        |2000-05-19|M  |4000  |
|Robert   |          |Williams|1978-09-05|M  |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F  |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F  |-1    |
+---------+----------+--------+----------+---+------+



In [18]:
df3.drop("CopiedColumn").show(truncate=False)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|dob       |gender|salary|
+---------+----------+--------+----------+------+------+
|James    |          |Smith   |1991-04-01|M     |3000  |
|Michael  |Rose      |        |2000-05-19|M     |4000  |
|Robert   |          |Williams|1978-09-05|M     |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |
+---------+----------+--------+----------+------+------+



## Activity: Read titanic dataset

- Create a new column as Gender, when Sex is female it is zero when sex is male it is one (use UDF for the conversion)

In [19]:
df_titanic= spark.read.csv('titanic.csv',header=True, inferSchema = True)

In [23]:
df_titanic.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
only showing top 2 rows



In [26]:
from pyspark.sql.types import IntegerType

convertUDF = udf(lambda s: 1 if s == 'male' else 0, IntegerType())

df_titanic = df_titanic.withColumn('Gender', convertUDF(col("Sex")))
df_titanic.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|Gender|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| null|       S|     1|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|     0|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+------+
only showing top 2 rows



## Boradcast in Pyspark

- In order to send/save the copy of data across all nodes, we use it
- Basically, broadcast variables are used as lookup when each executor will keep a local copy of it, so no network I/O overhead involves here
- In below example, if `states` variable would be only at master node, and at each worker node if we want to transform state abbreviation to the full name, then we are generating network overhead a lot

In [1]:
import pyspark
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

states = {"NY":"New York", "CA":"California", "FL":"Florida"}


data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+



In [2]:
broadcastStates = spark.sparkContext.broadcast(states)
broadcastStates

<pyspark.broadcast.Broadcast at 0x7f1a6c1ea9d0>

In [3]:
broadcastStates.value['CA']

'California'

In [11]:
broadcastStates.value

{'NY': 'New York', 'CA': 'California', 'FL': 'Florida'}

In [4]:
def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3])))

## What is the type of result variable?

In [9]:
result.take(2)

[('James', 'Smith', 'USA', 'California'),
 ('Michael', 'Rose', 'USA', 'New York')]

In [10]:
result = result.toDF(columns)
result.show(truncate=False)

+---------+--------+-------+----------+
|firstname|lastname|country|state     |
+---------+--------+-------+----------+
|James    |Smith   |USA    |California|
|Michael  |Rose    |USA    |New York  |
|Robert   |Williams|USA    |California|
|Maria    |Jones   |USA    |Florida   |
+---------+--------+-------+----------+



## Optional: Accumulators

- Accumulators are a built-in feature of Spark that allow multiple workers to write to a shared variable

- This shared variable is (usually) at the master/drive node, but this variable can be updated by executors and propagates back to driver program

- For example, we want to obtain the values in RDD, which is distributed among nodes, if it is greater than 3 

- https://sparkour.urizone.net/recipes/aggregating-accumulators/

In [3]:
import os 
os.environ["JAVA_HOME"]='/usr/local/opt/openjdk@8'

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark regression example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()



### If each value in RDD is greater than 3, then add 1 to the accumulator global variable which is at the master node

In [15]:
accum = spark.sparkContext.accumulator(0)

sample_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8])

def f(x):
    if x > 3:
        accum.add(1)

sample_rdd.foreach(f)
accum.value

5

In [16]:
accum

Accumulator<id=9, value=5>

### Summation of all RDD elements if they are greater than 3 and save it as a global variable at master node  

In [6]:
accum = spark.sparkContext.accumulator(0)

sample_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8])

def f(x):
    if x > 3:
        accum.add(x)

sample_rdd.foreach(f)
accum.value

30

In [7]:
4 + 5 + 6 + 7 + 8

30

In [21]:
# if we use accumulator functions (such as .add), we do not need to define it (accum) as a gobal variable in our defined function
# Unless, we should define it as global variable in the body of the function 
accum = spark.sparkContext.accumulator(0)

sample_rdd = spark.sparkContext.parallelize([1, 2, 3, 4])

def f(x):
    global accum
    accum += x

sample_rdd.foreach(f)
accum.value

10

## Another Accumulator Example

- String addition (concatination) of all RDD elements and save it at a global variable at master node

In [14]:
from pyspark import AccumulatorParam

class StringAccumulator(AccumulatorParam):
    def zero(self, s):
        return s
    def addInPlace(self, s1, s2):
        return s1 + s2

accumulator = spark.sparkContext.accumulator("", StringAccumulator())

def add(x):
    global accumulator
    accumulator += x

spark.sparkContext.parallelize(["a", "b", "c"]).foreach(add)
accumulator.value

'abc'

## Summary:

- Broadcast and Accumulators come under shared variable category in Apache Spark
- The goal of both variables is to boost the overall execution performance of Apache Spark job in cluster environment
- Broadcast variables efficiently distribute data to tasks executing on different cluster nodes while Accumulators aggregate data from nodes to driver program