# Spark training from sparkbyexamples.com

## PySpark Join Types

### Setup environment

In [77]:
import os
import sys
import pyspark

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import pyspark
import pyspark.pandas as ps
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType
from pyspark.sql.functions import col, struct, when, lit, expr, sum, avg, max, min, mean, count, udf, upper, transform
from pyspark.sql import Row

spark = SparkSession.builder.master('local[1]') \
    .appName('SparkByExamples.com') \
    .getOrCreate()



### Create base data frames

In [2]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+

root
 |-- dept_name: string (nullable = true)
 |-- dept_id: long (nullable = true)

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+



                                                                                

### Inner join

In [3]:
empDF.join(deptDF,empDF.emp_dept_id == deptDF.dept_id, 'inner').show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



### Full outer join

In [4]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'outer').show(truncate=False)
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'full').show(truncate=False)
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'fullouter').show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+-

### Left outer join

In [5]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'left').show(truncate=False)
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'leftouter').show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|6     |Brown   |2              |2010       |50         |      |-1    |null     |null   |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|n

### Right outer join

In [6]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'right').show(truncate=False)
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'rightouter').show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|null  |null    |null           |null       |null       |null  |null  |Sales    |30     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|n

 ### Left semi join

In [7]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'leftsemi').show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+



### Left anti join

In [8]:
empDF.join(deptDF, empDF.emp_dept_id == deptDF.dept_id, 'leftanti').show(truncate=False)

+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+



### Self join

In [11]:
empDF.alias('emp1').join(empDF.alias('emp2'), \
        col('emp1.superior_emp_id') == col('emp2.emp_id'), 'inner') \
        .select(col('emp1.emp_id'), col('emp1.name'), \
               col('emp2.emp_id').alias('superior_emp_id'), \
               col('emp2.name').alias('superior_emp_name')) \
        .show(truncate=False)

+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+



### Using sql expression

In [12]:
empDF.createOrReplaceTempView('EMP')
deptDF.createOrReplaceTempView('DEPT')

joinDF = spark.sql('select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id') \
    .show(truncate=False)

joinDF2 = spark.sql('select * from EMP e inner join DEPT d on e.emp_dept_id == d.dept_id') \
    .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+-

## PySpark union and union all

### Create data frame

In [15]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000) \
  ]

columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

simpleData2 = [("James","Sales","NY",90000,34,10000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns2= ["employee_name","department","state","salary","age","bonus"]

df2 = spark.createDataFrame(data = simpleData2, schema = columns2)

df2.printSchema()
df2.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
+-------------+----------+-----+------+---+-----+

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----

### Merge two dataframes with union

In [18]:
unionDF = df.union(df2)
unionDF.show(truncate=False)
print('Count of merged dataframes: ' + str(unionDF.count()))

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+

Count of merged dataframes: 9


### Merge two dataframes using union all
Union All has been deprecated since PySpark 2.0.0.  It is recommended that you use union instead.

In [19]:
unionAllDF = df.unionAll(df2)
unionAllDF.show(truncate=False)
print('Count of merged dataframes: ' + str(unionAllDF.count()))

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+

Count of merged dataframes: 9


### Merge without duplicates

In [20]:
disDF = df.union(df2).distinct()
disDF.show(truncate=False)
print('Count of merged dataframes: ' + str(disDF.count()))

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
+-------------+----------+-----+------+---+-----+

Count of merged dataframes: 7


## PySpark unionByName() 
### Build base data frames

In [21]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

# Create DataFrame df1 with columns name, and id
data = [("James",34), ("Michael",56), \
        ("Robert",30), ("Maria",24) ]

df1 = spark.createDataFrame(data = data, schema=["name","id"])
df1.printSchema()

# Create DataFrame df2 with columns name and id
data2=[(34,"James"),(45,"Maria"), \
       (45,"Jen"),(34,"Jeff")]

df2 = spark.createDataFrame(data = data2, schema = ["id","name"])
df2.printSchema()

root
 |-- name: string (nullable = true)
 |-- id: long (nullable = true)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



### unionByName() example

In [22]:
df3 = df1.unionByName(df2)
df3.printSchema()
df3.show(truncate=False)
print('Count of rows in new dataframe: ' + str(df3.count()))

root
 |-- name: string (nullable = true)
 |-- id: long (nullable = true)

+-------+---+
|name   |id |
+-------+---+
|James  |34 |
|Michael|56 |
|Robert |30 |
|Maria  |24 |
|James  |34 |
|Maria  |45 |
|Jen    |45 |
|Jeff   |34 |
+-------+---+

Count of rows in new dataframe: 8


### unionByName with different columns

In [23]:
# Create DataFrames with different column names
df1 = spark.createDataFrame([[5, 2, 6]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[6, 7, 3]], ["col1", "col2", "col3"])

# Using allowMissingColumns
df3 = df1.unionByName(df2, allowMissingColumns=True)
df3.printSchema
df3.show()

+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
|   5|   2|   6|null|
|null|   6|   7|   3|
+----+----+----+----+



## PySpark User Defined Function
### Build base dataframes

In [24]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



### Create user defined function

In [34]:
def convertCase(str):
    resStr = ''
    arr = str.split(' ')
    for x in arr:
        resStr = resStr + x[0:1].upper() + x[1:len(x)] + ' '
        
    return resStr

### Convert python function to UDF

In [35]:
convertUDF = udf(lambda z: convertCase(z), StringType())

### Use UDF with dataframe

In [36]:
print(convertCase('here'))

df.select(col('Seqno'), \
    convertUDF(col('Name')).alias('Name') ) \
    .show(truncate=False)

Here 
+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



### Using UDF with PySpark dataframe withColumn()

In [38]:
def upperCase(str):
    return str.upper()

upperCaseUDF = udf(lambda z: upperCase(z), StringType())

df.withColumn('Cureated Name', upperCaseUDF(col('Name'))).show(truncate=False)

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



### Registering PySpark UDF and use it on SQL

In [41]:
spark.udf.register('convertUDF', convertCase, StringType())
df.createOrReplaceTempView('NAME_TABLE')
spark.sql('select Seqno, convertUDF(Name) as Name from NAME_TABLE') \
    .show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



23/04/26 20:57:53 WARN SimpleFunctionRegistry: The function convertudf replaced a previously registered function.


### Creating UDF using annotation

In [42]:
@udf(returnType=StringType())
def upperCase(str):
    return str.upper()

df.withColumn('Cureated Name', upperCase(col('Name'))).show(truncate=False)

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+



### Handling null values

In [43]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)]

df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")

spark.sql("select convertUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|4    |null        |
+-----+------------+



23/04/26 21:02:54 ERROR Executor: Exception in task 0.0 in stage 122.0 (TID 92)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/k4/wb623tjn7rscpqqh2tbnc62r0000gn/T/ipykernel_1450/1659673845.py", line 3, in convertCase
AttributeError: 'NoneType' object has no attribute 'split'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Itera

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/k4/wb623tjn7rscpqqh2tbnc62r0000gn/T/ipykernel_1450/1659673845.py", line 3, in convertCase
AttributeError: 'NoneType' object has no attribute 'split'


In [44]:
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())

spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)

spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
          " where Name is not null and _nullsafeUDF(Name) like '%John%'") \
     .show(truncate=False)   

+------------------+
|_nullsafeUDF(Name)|
+------------------+
|John Jones        |
|Tracey Smith      |
|Amy Sanders       |
|                  |
+------------------+

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+



## PySpark transform() function

### Create base data

In [5]:
simpleData = (('Java',4000,5), \
             ('Python',4600,10), \
             ('Scala',4500,15), \
              ('Scala',4100,15), \
             ('PHP',3000,20), \
             )

columns = ['CourseName','fee','discount']

#Create data frame
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)


root
 |-- CourseName: string (nullable = true)
 |-- fee: long (nullable = true)
 |-- discount: long (nullable = true)

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4500|15      |
|Scala     |4100|15      |
|PHP       |3000|20      |
+----------+----+--------+



### Create custom functions/transformation

In [64]:
#Create first custom transformation
def to_upper_str_columns(df):
    return df.withColumn('CourseName',upper(df.CourseName))

#Create second custom transformation
def reduce_price(reduceBy):
    def inner(df):
        return df.withColumn('new_fee', df.fee - reduceBy)
    return inner

#Create third custom transformation
def apply_discount(df):
    return df.withColumn('discounted_fee', \
                        df.new_fee - (df.new_fee * df.discount) / 100)

#Create fourth transformation
def select_columns(df):
    return df.select('CourseName','discounted_fee')


### PySpark apply dataframe transformations

In [65]:
#df2 = df.transform(to_upper_str_columns) \
        #.transform(reduce_price,1000) \
#.transform(apply_discount)
#df2 = df.transform(reduce_price,1000)
df2 = df.transform(to_upper_str_columns) \
    .transform(reduce_price(1000)) \
    .transform(apply_discount) 
df2.printSchema()
df2.show(truncate=False)

df2 = df.transform(to_upper_str_columns) \
    .transform(reduce_price(1000)) \
    .transform(apply_discount) \
    .transform(select_columns)
df2.printSchema()
df2.show(truncate=False)




root
 |-- CourseName: string (nullable = true)
 |-- fee: long (nullable = true)
 |-- discount: long (nullable = true)
 |-- new_fee: long (nullable = true)
 |-- discounted_fee: double (nullable = true)

+----------+----+--------+-------+--------------+
|CourseName|fee |discount|new_fee|discounted_fee|
+----------+----+--------+-------+--------------+
|JAVA      |4000|5       |3000   |2850.0        |
|PYTHON    |4600|10      |3600   |3240.0        |
|SCALA     |4500|15      |3500   |2975.0        |
|SCALA     |4100|15      |3100   |2635.0        |
|PHP       |3000|20      |2000   |1600.0        |
+----------+----+--------+-------+--------------+

root
 |-- CourseName: string (nullable = true)
 |-- discounted_fee: double (nullable = true)

+----------+--------------+
|CourseName|discounted_fee|
+----------+--------------+
|JAVA      |2850.0        |
|PYTHON    |3240.0        |
|SCALA     |2975.0        |
|SCALA     |2635.0        |
|PHP       |1600.0        |
+----------+--------------+



## PySpark sql.functions.transform

### Create base data

In [66]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"]),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"]),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"])
]
df = spark.createDataFrame(data=data,schema=["Name","Languages1","Languages2"])
df.printSchema()
df.show()

root
 |-- Name: string (nullable = true)
 |-- Languages1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Languages2: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+------------------+---------------+
|            Name|        Languages1|     Languages2|
+----------------+------------------+---------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|
+----------------+------------------+---------------+



### Use transform() function

In [67]:
df.select(transform('Languages1', lambda x: upper(x)).alias('languages1')).show(truncate=False)

+------------------+
|languages1        |
+------------------+
|[JAVA, SCALA, C++]|
|[SPARK, JAVA, C++]|
|[CSHARP, VB]      |
+------------------+



## PySpark appl function to column

Create base data

In [68]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



### apply function using withColumn()

In [69]:
df.withColumn('Upper_Name', upper(df.Name)).show(truncate=False)

+-----+------------+------------+
|Seqno|Name        |Upper_Name  |
+-----+------------+------------+
|1    |john jones  |JOHN JONES  |
|2    |tracey smith|TRACEY SMITH|
|3    |amy sanders |AMY SANDERS |
+-----+------------+------------+



### apply function using select

In [72]:
df.select('Seqno','Name',upper(df.Name).alias('Upper_Name')).show()

+-----+------------+------------+
|Seqno|        Name|  Upper_Name|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+



In [73]:
df.createOrReplaceTempView('TAB')
spark.sql('select Seqno, Name, upper(Name) as Upper_Name from TAB').show()

+-----+------------+------------+
|Seqno|        Name|  Upper_Name|
+-----+------------+------------+
|    1|  john jones|  JOHN JONES|
|    2|tracey smith|TRACEY SMITH|
|    3| amy sanders| AMY SANDERS|
+-----+------------+------------+



### Create custom function

In [74]:
def upperCase(str):
    return str.upper()

### Register UDF

In [75]:
upperCaseUDF = udf(lambda x:upperCase(x), StringType())

### Apply custom UDF to column

In [76]:
# Custom UDF using withColumn
df.withColumn('Cureated Name', upperCaseUDF(col('Name'))).show(truncate=False)

# Custom UDF with select()
df.select(col('Seqno'), \
         upperCaseUDF(col('Name')).alias('Name')).show(truncate=False)

# Custom UDF with sql
spark.udf.register('upperCaseUDF', upperCaseUDF)
df.createOrReplaceTempView('TAB')
spark.sql('select Seqno, Name, upperCaseUDF(Name) as Upper_Name from TAB').show(truncate=False)

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |JOHN JONES  |
|2    |TRACEY SMITH|
|3    |AMY SANDERS |
+-----+------------+

+-----+------------+------------+
|Seqno|Name        |Upper_Name  |
+-----+------------+------------+
|1    |john jones  |JOHN JONES  |
|2    |tracey smith|TRACEY SMITH|
|3    |amy sanders |AMY SANDERS |
+-----+------------+------------+



### PySpark Pandas apply()

In [78]:
technologies = ({
    'Fee' :[20000,25000,30000,22000,np.NaN],
    'Discount':[1000,2500,1500,1200,3000]
               })

# Create data frame
psdf = ps.DataFrame(technologies)
print(psdf)

def add(data):
    return data[0] + data[1]

addDF = psdf.apply(add, axis=1)
print(addDF)

  fields = [


       Fee  Discount
0  20000.0      1000
1  25000.0      2500
2  30000.0      1500
3  22000.0      1200
4      NaN      3000


  fields = [


0    21000.0
1    27500.0
2    31500.0
3    23200.0
4        NaN
dtype: float64


## PySpark map() transformations

Create base data

In [79]:
data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd=spark.sparkContext.parallelize(data)

### PySpark map() example with RDD

In [80]:
rdd2 = rdd.map(lambda x: (x,1))
for element in rdd2.collect():
    print(element)

('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)


### PySpark map() example with data frame

In [81]:
data = [('James','Smith','M',30),
       ('Anna','Rose','F',41),
       ('Robert','Williams','M',62),
       ]

columns = ['firstname','lastname','gender','salary']

df = spark.createDataFrame(data=data, schema=columns)
df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+



In [84]:
# Refering to columns by index
rdd2 = df.rdd.map(lambda x: (x[1] + ', '+ x[0],x[2],x[3]*2))
df2=rdd2.toDF(['name','gender','new_salary'])
df2.show()

+----------------+------+----------+
|            name|gender|new_salary|
+----------------+------+----------+
|    Smith, James|     M|        60|
|      Rose, Anna|     F|        82|
|Williams, Robert|     M|       124|
+----------------+------+----------+



In [85]:
# Refering to column names
rdd2=df.rdd.map(lambda x: (x['lastname'] + ', ' + x['firstname'], x['gender'], x['salary'] * 2))
df3 = rdd2.toDF(['name','gender','salary'])
df3.show()

+----------------+------+------+
|            name|gender|salary|
+----------------+------+------+
|    Smith, James|     M|    60|
|      Rose, Anna|     F|    82|
|Williams, Robert|     M|   124|
+----------------+------+------+



In [86]:
# Also refering to column names
rdd2 = df.rdd.map(lambda x: (x.lastname+', '+x.firstname, x.gender, x.salary*2))
df4 = rdd2.toDF(['name','gender','salary'])
df4.show()

+----------------+------+------+
|            name|gender|salary|
+----------------+------+------+
|    Smith, James|     M|    60|
|      Rose, Anna|     F|    82|
|Williams, Robert|     M|   124|
+----------------+------+------+



In [87]:
# By calling function
def func1(x):
    firstName=x.firstname
    lastName=x.lastname
    name=lastName + ', ' + firstName
    gender=x.gender.lower()
    salary=x.salary * 2
    
    return(name, gender, salary)

rdd2 = df.rdd.map(lambda x: func1(x))
df5 = rdd2.toDF(['name','gender','salary'])
df5.show()

+----------------+------+------+
|            name|gender|salary|
+----------------+------+------+
|    Smith, James|     m|    60|
|      Rose, Anna|     f|    82|
|Williams, Robert|     m|   124|
+----------------+------+------+



## PySpark flatMap() transformation

### Create base data

In [88]:
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
for element in rdd.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s


In [90]:
rdd2=rdd.flatMap(lambda x: x.split(' '))
for element in rdd2.collect():
    print(element)

Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s
