# Using user defined functions in Spark

- You've seen some of the power behind Spark's built-in string functions when it comes to manipulating DataFrames. However, once you reach a certain point, it becomes difficult to process the data in a without creating a rat's nest of function calls. Here's one place where you can use User Defined Functions to manipulate our DataFrames.

- For this exercise, we'll use our `voter_df` DataFrame, but you're going to replace the `first_name` column with the first and middle names.

- The `pyspark.sql.functions` library is available under the alias `F`. The classes from `pyspark.sql.types` are already imported.

## Instructions

- Edit the `getFirstAndMiddle()` function to return a space separated string of names, except the last entry in the names list.
- Define the function as a user-defined function. It should return a string type.
- Create a new column on `voter_df` called `first_and_middle_name` using your UDF.
- Show the Data Frame.

In [2]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [3]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [4]:
!pwd

/home/talentum/spark-jupyter/3_UserDefinedFunctions


In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [58]:
# Load the CSV file
voter_df = spark.read.format('csv').options(Header=True).load('file:///home/talentum/spark-jupyter/3_UserDefinedFunctions/Dataset/DallasCouncilVoters.csv.gz')

# Add a new column called splits separated on whitespace
voter_df = voter_df.withColumn('splits', F.split(voter_df.VOTER_NAME, '\s+'))

# Create a new column called first_name based on the first item in splits
voter_df = voter_df.withColumn('first_name', voter_df.splits.getItem(0))

# Get the last entry of the splits list and create a column called last_name
voter_df = voter_df.withColumn('last_name', voter_df.splits.getItem(F.size('splits') - 1))
voter_df=voter_df.filter(voter_df.VOTER_NAME.isNotNull())



def getFirstAndMiddle(names):
  # Return a space separated string of names 
  return ' '.join(names[])

# Define the method as a UDF
udfFirstAndMiddle = F.udf(getFirstAndMiddle, StringType())

# Create a new column using your UDF
voter_df = voter_df.withColumn('first_and_middle_name', udfFirstAndMiddle(voter_df.splits))

# Show the DataFrame
b=voter_df.show()

+----------+-------------+-------------------+--------------------+----------+---------+---------------------+
|      DATE|        TITLE|         VOTER_NAME|              splits|first_name|last_name|first_and_middle_name|
+----------+-------------+-------------------+--------------------+----------+---------+---------------------+
|02/08/2017|Councilmember|  Jennifer S. Gates|[Jennifer, S., Ga...|  Jennifer|    Gates|    Jennifer S. Gates|
|02/08/2017|Councilmember| Philip T. Kingston|[Philip, T., King...|    Philip| Kingston|   Philip T. Kingston|
|02/08/2017|        Mayor|Michael S. Rawlings|[Michael, S., Raw...|   Michael| Rawlings|  Michael S. Rawlings|
|02/08/2017|Councilmember|       Adam Medrano|     [Adam, Medrano]|      Adam|  Medrano|         Adam Medrano|
|02/08/2017|Councilmember|       Casey Thomas|     [Casey, Thomas]|     Casey|   Thomas|         Casey Thomas|
|02/08/2017|Councilmember|Carolyn King Arnold|[Carolyn, King, A...|   Carolyn|   Arnold|  Carolyn King Arnold|
|

In [13]:
print(voter_df)

DataFrame[DATE: string, TITLE: string, VOTER_NAME: string, splits: array<string>, first_name: string, last_name: string, first_and_middle_name: string]


In [19]:
voter_df.select('first_name','splits').take(5)

[Row(first_name='Jennifer', splits=['Jennifer', 'S.', 'Gates']),
 Row(first_name='Philip', splits=['Philip', 'T.', 'Kingston']),
 Row(first_name='Michael', splits=['Michael', 'S.', 'Rawlings']),
 Row(first_name='Adam', splits=['Adam', 'Medrano']),
 Row(first_name='Casey', splits=['Casey', 'Thomas'])]

In [34]:
list1=['first_name','middle_name','last_name']
"  ".join(list1[0:2])

'first_name  middle_name'

In [55]:
voter_df=voter_df.filter(voter_df.VOTER_NAME.isNull())
voter_df.show()

AttributeError: 'int' object has no attribute 'show'

In [59]:
voter_df.describe().show()

+-------+--------------------+--------------------+--------------------+----------+----------+---------------------+
|summary|                DATE|               TITLE|          VOTER_NAME|first_name| last_name|first_and_middle_name|
+-------+--------------------+--------------------+--------------------+----------+----------+---------------------+
|  count|               44122|               44122|               44122|     44122|     44122|                44122|
|   mean|                null|                null|                null|      null|      null|                 null|
| stddev|                null|                null|                null|      null|      null|                 null|
|    min|          01/03/2018|   authorize   an...|   the   final  2...|          |011018__42|  the final 2018 A...|
|    max|MADELEINE JOHNSON...|       Mayor Pro Tem|   Tiffinni A. Young|  Tiffinni|     rooms|    Tiffinni A. Young|
+-------+--------------------+--------------------+-------------

In [64]:
v=voter_df.filter(voter_df["VOTER_NAME"].isNull())
v.show()

+----+-----+----------+------+----------+---------+---------------------+
|DATE|TITLE|VOTER_NAME|splits|first_name|last_name|first_and_middle_name|
+----+-----+----------+------+----------+---------+---------------------+
+----+-----+----------+------+----------+---------+---------------------+



In [88]:
import numpy as np
data=[
    ('James','CA',np.NaN), ('Julia','',None),
    ('Ram',None,200.0),('Ramya','NULL',np.NAN)

]
df=spark.createDataFrame(data,['name','state','number'])
df.show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|James|   CA|   NaN|
|Julia|     |  null|
|  Ram| null| 200.0|
|Ramya| NULL|   NaN|
+-----+-----+------+



In [67]:
df.filter(df.state.isNull()).show()
df.filter(df.number.isNull()).show()

+----+-----+------+
|name|state|number|
+----+-----+------+
| Ram| null| 200.0|
+----+-----+------+

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|Julia|     |  null|
+-----+-----+------+



In [75]:
df.filter(df.number.isin([np.NAN,None])).show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|James|   CA|   NaN|
|Ramya| NULL|   NaN|
+-----+-----+------+



In [78]:
df.filter(df.state.contains('Null') | df.state.contains('null') | df.state.contains('NULL')).show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|Ramya| NULL|   NaN|
+-----+-----+------+



In [79]:
df.filter(df.number.contains('Null') | df.number.contains('null') | df.number.contains('NULL')).show()

+----+-----+------+
|name|state|number|
+----+-----+------+
+----+-----+------+



In [89]:
df.filter(df.state.contains('')).show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|James|   CA|   NaN|
|Julia|     |  null|
|Ramya| NULL|   NaN|
+-----+-----+------+



In [90]:
df.filter(df.number.contains('')).show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|James|   CA|   NaN|
|  Ram| null| 200.0|
|Ramya| NULL|   NaN|
+-----+-----+------+



In [94]:
df.filter(F.isnan(df.number)).show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|James|   CA|   NaN|
|Ramya| NULL|   NaN|
+-----+-----+------+



In [95]:
df.filter(F.isnull(df.number)).show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|Julia|     |  null|
+-----+-----+------+



In [102]:
df.filter(df.state.isNull()).count()

1

In [100]:
df.filter(df.number.isNull()).count()

1

In [103]:
df.select(['name','number']).show()

+-----+------+
| name|number|
+-----+------+
|James|   NaN|
|Julia|  null|
|  Ram| 200.0|
|Ramya|   NaN|
+-----+------+



In [None]:
df.select(['name','number']).show()

In [111]:
df.selectExpr('count(number)').show()

+-------------+
|count(number)|
+-------------+
|            3|
+-------------+



In [113]:
df.describe().show()

+-------+-----+-----+------+
|summary| name|state|number|
+-------+-----+-----+------+
|  count|    4|    3|     3|
|   mean| null| null|   NaN|
| stddev| null| null|   NaN|
|    min|James|     | 200.0|
|    max|Ramya| NULL|   NaN|
+-------+-----+-----+------+



In [149]:
df.selectExpr(['name','concat(name,state)' 'kk','number - abs(number)' 'll']).describe().show()

+-------+-----+---------+---+
|summary| name|       kk| ll|
+-------+-----+---------+---+
|  count|    4|        3|  3|
|   mean| null|     null|NaN|
| stddev| null|     null|NaN|
|    min|James|  JamesCA|0.0|
|    max|Ramya|RamyaNULL|NaN|
+-------+-----+---------+---+



In [153]:
df.describe().count()

5

In [210]:
df.selectExpr(['count(*) - count(distinct(name))' 'null_in_name'
               ,'count(*) - count(distinct(state))' 'null_in_state'
               ,'count(*)-count(nullif(number,"NaN"))' 'null_in_number']).show()

+------------+-------------+--------------+
|null_in_name|null_in_state|null_in_number|
+------------+-------------+--------------+
|           0|            1|             3|
+------------+-------------+--------------+



In [204]:
df.selectExpr(['nullif(number,"NaN") ' 'll']).show()

+-----+
|   ll|
+-----+
| null|
| null|
|200.0|
| null|
+-----+



In [202]:
df.selectExpr(['isnull(nullif(number,"np.NaN")) ' 'll']).show()

+-----+
|   ll|
+-----+
|false|
| true|
|false|
|false|
+-----+



In [187]:
df.agg(*[F.count(c).alias(c) for c in df.columns]).show()

+----+-----+------+
|name|state|number|
+----+-----+------+
|   4|    3|     3|
+----+-----+------+



In [163]:
df.show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|James|   CA|   NaN|
|Julia|     |  null|
|  Ram| null| 200.0|
|Ramya| NULL|   NaN|
+-----+-----+------+



In [177]:
df.select(['name' 'n']).show()

AnalysisException: "cannot resolve '`namen`' given input columns: [name, state, number];;\n'Project ['namen]\n+- LogicalRDD [name#3156, state#3157, number#3158], false\n"

In [182]:
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+-----+------+
|name|state|number|
+----+-----+------+
|   0|    1|     3|
+----+-----+------+



In [217]:
df.filter(F.isnan(df.state)).show()
data_count=[
    (df.filter(df.name.isNull() | F.isnan(df.name) | (df.name == '')).count(),
     df.filter(df.state.isNull() | F.isnan(df.state) | (df.state == '')).count(),
     df.filter(df.number.isNull() | F.isnan(df.number) | (df.number == '')).count()
    )
]

df.filter(data_count).show()

+----+-----+------+
|name|state|number|
+----+-----+------+
+----+-----+------+



TypeError: condition should be string or Column

In [218]:
df.filter(F.isnan(df.state)).show()
data_count=[
    (df.filter(df.name.isNull() | F.isnan(df.name)).count(),
     df.filter(df.state.isNull() | F.isnan(df.state)).count(),
     df.filter(df.number.isNull() | F.isnan(df.number)).count()
    )
]


+----+-----+------+
|name|state|number|
+----+-----+------+
+----+-----+------+



In [220]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- number: double (nullable = true)



In [231]:
df.filter(df.number.contains('NaN')).show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|James|   CA|   NaN|
|Ramya| NULL|   NaN|
+-----+-----+------+



In [232]:
df.filter(df.number.contains('')).show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|James|   CA|   NaN|
|  Ram| null| 200.0|
|Ramya| NULL|   NaN|
+-----+-----+------+



In [233]:
df.filter(df.number.contains('200')).show()

+----+-----+------+
|name|state|number|
+----+-----+------+
| Ram| null| 200.0|
+----+-----+------+



In [231]:
df.filter(df.number.contains('NaN')).show()

+-----+-----+------+
| name|state|number|
+-----+-----+------+
|James|   CA|   NaN|
|Ramya| NULL|   NaN|
+-----+-----+------+

