# UDF
User Defined Functions

In [13]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



create a python function

In [14]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

Convert a python function to pyspark UDF
- passing the function to PySpark SQL udf()
- udf() function returns pyspark.sql.functions UserDefinedFunction class object

Using UDF with select()
- use convertUDF() on a DataFrame column as a regular build-in function

In [15]:
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z))

df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



# Other UDF explanation

In [7]:
import pyspark
from pyspark import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
from pyspark.sql.functions import udf
from pyspark.sql import Row

conf = pyspark.SparkConf() 
sc = pyspark.SparkContext.getOrCreate(conf=conf)
spark = SQLContext(sc)

schema = StructType([
StructField("sales", FloatType(),True),    
StructField("employee", StringType(),True),
StructField("ID", IntegerType(),True)
])
data = [[ 10.2, "Fred",123], [18.5, "Yeong", 234]]
df = spark.createDataFrame(data, schema=schema)

In [8]:
df.show()

+-----+--------+---+
|sales|employee| ID|
+-----+--------+---+
| 10.2|    Fred|123|
| 18.5|   Yeong|234|
+-----+--------+---+



### 1. withColumn()

In [9]:
# colsInt 라는 함수를 생성하고 
colsInt = udf(lambda z : toInt(z), IntegerType())

# 등록한다.
# 1st 인수 : 이 함수를 우리가 refer할 때 사용할 이름
# 2nd 인수 : 등록할 함수
spark.udf.register("colsInt", colsInt)

# 등록된 함수는 toInt() 라는 다른 함수를 호출한다. - 얘는 등록할 필요 없다.
def toInt(s):
    if isinstance(s, str) == True: # isinstance(얘가, 이타입이면true)
        st = [str(ord(i)) for i in s]  # ord() : Unicode character가 나타내는 integer 리턴
        return (int(''.join(st)))
    else:
        return Null
    
# 이제 colsInt 함수를 호출한다.
# 1st 인수 : 우리가 생성하고자 하는 새로운 컬럼 이름
# 2nd 인수 : 함수에 plug in 하고자 하는 데이터프레임 컬럼
# df['employee'] 는 column object라는 걸 기억하자. (single employee가 아니다!) 
df2 = df.withColumn('semployee', colsInt('employee'))

# 따라서 우리는 해당 컬럼의 모든 row 를 looping해야한다. --> colsInt = udf(lambda z : toInt(z), Integer)

22/04/07 06:18:38 WARN SimpleFunctionRegistry: The function colsint replaced a previously registered function.


In [10]:
df2.show()

+-----+--------+---+----------+
|sales|employee| ID| semployee|
+-----+--------+---+----------+
| 10.2|    Fred|123|1394624364|
| 18.5|   Yeong|234|2014554583|
+-----+--------+---+----------+



In [17]:
# 이건 역으로  int -> string 으로 바꾸는 연습
colsStr = udf(lambda x : toString(x), StringType())
spark.udf.register("colsStr", colsStr)
def toString(i):
    if isinstance(i, int) == True:
        return str(i)
    else:
        return Null
df3 = df.withColumn('sid', colsStr('ID'))
df3.show()
df3.printSchema() # sid type string인거 확인

22/04/07 06:34:30 WARN SimpleFunctionRegistry: The function colsstr replaced a previously registered function.


+-----+--------+---+---+
|sales|employee| ID|sid|
+-----+--------+---+---+
| 10.2|    Fred|123|123|
| 18.5|   Yeong|234|234|
+-----+--------+---+---+

root
 |-- sales: float (nullable = true)
 |-- employee: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- sid: string (nullable = true)



### 2. Using SQL

In [18]:
# sql 구문을 먹이기 위해서 dataframe을 table로 등록한다.
# df : 데이터프레임 , dftab : 임시 테이블
spark.registerDataFrameAsTable(df, "dftab")

# employee 컬럼에 colsInt 함수 먹이기
df4 = spark.sql("select sales, employee, ID, colsInt(employee) as iemployee from dftab")
df4.show()

+-----+--------+---+----------+
|sales|employee| ID| iemployee|
+-----+--------+---+----------+
| 10.2|    Fred|123|1394624364|
| 18.5|   Yeong|234|2014554583|
+-----+--------+---+----------+



### 3. RDD Map

In [20]:
# "name"으로 employee element를 연속해서 참조한 후,
# 해당 필드의 각 letter를 integer로 바꾸고 concatenate
def toIntEmployee(rdd):
    s = rdd["employee"]
    if isinstance(s, str):
        st = [str(ord(i)) for i in s]
        e = int(''.join(st))
    else:
        e = s
    return Row(rdd["sales"], rdd["employee"], rdd["ID"], e)

# DataFrame은 map() 함수가 없기 때문에, map()을 쓰려면 dff.rdd 를 써서 datafrmae을 RDD로 바꿔줘야한다.
# 얘는 toIntEmployee 함수에 row object를 리턴한다.
# RDD 는 immutable 하기 때문에, 새로운 row를 만들어줘야한다.
rdd = df.rdd.map(toIntEmployee)

In [21]:
for x in rdd.collect():
    print(x)

<Row(10.199999809265137, 'Fred', 123, 70114101100)>
<Row(18.5, 'Yeong', 234, 89101111110103)>


In [16]:
def upperCase(str):
    return str.upper()

In [17]:
upperCaseUDF = udf(lambda z:upperCase(z),StringType())    

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
.show(truncate=False)

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



In [18]:
""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)
     
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + \
          "where Name is not null and convertUDF(Name) like '%John%'") \
     .show(truncate=False) 

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



In [19]:
""" null check """

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)]

In [20]:
df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|4    |null        |
+-----+------------+



In [21]:
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())

spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)

+------------------+
|_nullsafeUDF(Name)|
+------------------+
|John Jones        |
|Tracey Smith      |
|Amy Sanders       |
|                  |
+------------------+



In [22]:
spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
          " where Name is not null and _nullsafeUDF(Name) like '%John%'") \
     .show(truncate=False) 

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



In [1]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

In [3]:
from pyspark.sql.types import IntegerType
@pandas_udf(IntegerType()) # spark에 udf 등록
def slen(s: pd.Series) -> pd.Series:
    return s.str.len()

In [None]:
from pyspark.sql.functions import PandasUDFType
@pandas_udf(IntegerType(), PandasUDFType.SCALAR)
def slen(s):
    return s.str.len()

In [None]:
@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")
df.printSchema()
df.select(func("long_col", "string_col", "struct_col")).printSchema()