In [1]:
import delta
import pyspark
from delta import configure_spark_with_delta_pip

In [2]:
builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

In [3]:
spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/Users/matthew.powers/opt/miniconda3/envs/pyspark-330-delta-210/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/matthew.powers/.ivy2/cache
The jars for the packages stored in: /Users/matthew.powers/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-07890284-7c57-4d1b-9052-784fa5c2d13d;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 321ms :: artifacts dl 13ms
	:: modules in use:
	io.delta#delta-core_2.12;2.1.0 from central in [default]
	io.delta#delta-storage;2.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number|

23/01/13 10:02:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Imports

In [4]:
from pyspark.sql import functions as F

In [5]:
import quinn

In [6]:
df = spark.range(0, 5)

In [7]:
df.show()

[Stage 0:>                                                        (0 + 10) / 10]

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



                                                                                

In [8]:
quinn.validate_absence_of_columns(df, ["age", "cool"])

In [9]:
quinn.validate_absence_of_columns(df, ["id"])

DataFrameProhibitedColumnError: The ['id'] columns are not allowed to be included in the DataFrame with the following columns ['id']

In [10]:
from quinn import validate_absence_of_columns

validate_absence_of_columns(df, ["age", "cool"])

## Column functions

In [12]:
df = spark.createDataFrame([("karen", 56), ("jodie", 16), ("jason", 3)]).toDF(
    "first_name", "age"
)

In [13]:
df.show()

                                                                                

+----------+---+
|first_name|age|
+----------+---+
|     karen| 56|
|     jodie| 16|
|     jason|  3|
+----------+---+



In [14]:
from pyspark.sql import functions as F

In [21]:
def life_stage(col):
    return (
        F.when(col < 13, "child")
        .when(col.between(13, 18), "teenager")
        .when(col > 18, "adult")
    )

In [24]:
df.withColumn("life_stage", life_stage(F.col("age"))).show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|     karen| 56|     adult|
|     jodie| 16|  teenager|
|     jason|  3|     child|
+----------+---+----------+



## Schema Dependent Custom DataFrame Transformation

In [27]:
def with_life_stage(df):
    return df.withColumn("life_stage", life_stage(F.col("age")))

In [28]:
df.transform(with_life_stage).show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|     karen| 56|     adult|
|     jodie| 16|  teenager|
|     jason|  3|     child|
+----------+---+----------+



## Schema Independent Custom DataFrame Transformation

In [35]:
def with_life_stage2(df, col_name):
    return df.withColumn("life_stage", life_stage(F.col(col_name)))

In [38]:
df.transform(with_life_stage2, "age").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|     karen| 56|     adult|
|     jodie| 16|  teenager|
|     jason|  3|     child|
+----------+---+----------+



In [39]:
df.transform(with_life_stage2, col_name="age").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|     karen| 56|     adult|
|     jodie| 16|  teenager|
|     jason|  3|     child|
+----------+---+----------+



## Null best practices

In [50]:
def with_full_name(df):
    return df.withColumn(
        "full_name", F.concat_ws(" ", F.col("first_name"), F.col("last_name"))
    )

In [47]:
df = spark.createDataFrame(
    [("Marilyn", "Monroe"), ("Abraham", None), (None, None)]
).toDF("first_name", "last_name")

In [48]:
df.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|   Marilyn|   Monroe|
|   Abraham|     null|
|      null|     null|
+----------+---------+



In [51]:
df.transform(with_full_name).show()

+----------+---------+--------------+
|first_name|last_name|     full_name|
+----------+---------+--------------+
|   Marilyn|   Monroe|Marilyn Monroe|
|   Abraham|     null|       Abraham|
|      null|     null|              |
+----------+---------+--------------+



In [52]:
df.transform(with_full_name).printSchema()

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- full_name: string (nullable = false)



## Testing column function

In [None]:
def life_stage(col):
    return (
        F.when(col < 13, "child")
        .when(col.between(13, 19), "teenager")
        .when(col > 19, "adult")
    )

In [54]:
df = spark.createDataFrame(
    [
        ("karen", 56, "adult"),
        ("jodie", 16, "teenager"),
        ("jason", 3, "child"),
        (None, None, None),
    ]
).toDF("first_name", "age", "expected")

In [55]:
res = df.withColumn("actual", life_stage(F.col("age")))

In [57]:
import chispa

In [59]:
chispa.assert_column_equality(res, "expected", "actual")

## Testing custom DataFrame transformation

In [61]:
def with_full_name(df):
    return df.withColumn(
        "full_name", F.concat_ws(" ", F.col("first_name"), F.col("last_name"))
    )

In [62]:
input_df = spark.createDataFrame(
    [("Marilyn", "Monroe"), ("Abraham", None), (None, None)]
).toDF("first_name", "last_name")

In [65]:
expected_df = spark.createDataFrame(
    [
        ("Marilyn", "Monroe", "Marilyn Monroe"),
        ("Abraham", None, "Abraham"),
        (None, None, ""),
    ]
).toDF("first_name", "last_name", "full_name")

In [66]:
chispa.assert_df_equality(
    expected_df, input_df.transform(with_full_name), ignore_nullable=True
)

## PySpark UDFs

In [67]:
countries1 = spark.createDataFrame([("Brasil", 1), ("Mexico", 2)], ["country", "id"])

In [68]:
countries1.show()

+-------+---+
|country| id|
+-------+---+
| Brasil|  1|
| Mexico|  2|
+-------+---+



In [70]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


@udf(returnType=StringType())
def bad_funify(s):
    return s + " is fun!"

In [71]:
countries1.withColumn("fun_country", bad_funify("country")).show()

                                                                                

+-------+---+--------------+
|country| id|   fun_country|
+-------+---+--------------+
| Brasil|  1|Brasil is fun!|
| Mexico|  2|Mexico is fun!|
+-------+---+--------------+



In [72]:
countries2 = spark.createDataFrame([("Thailand", 3), (None, 4)], ["country", "id"])

In [73]:
countries2.show()

+--------+---+
| country| id|
+--------+---+
|Thailand|  3|
|    null|  4|
+--------+---+



In [74]:
countries2.withColumn("fun_country", bad_funify("country")).show()

23/01/18 04:47:26 ERROR Executor: Exception in task 4.0 in stage 44.0 (TID 189)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/19/_52w4zps3xjc6plz_f63j8sh0000gp/T/ipykernel_96325/680247305.py", line 7, in bad_funify
TypeError: unsupported operand type(s) for +: 'NoneType' and 'str'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNex

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/19/_52w4zps3xjc6plz_f63j8sh0000gp/T/ipykernel_96325/680247305.py", line 7, in bad_funify
TypeError: unsupported operand type(s) for +: 'NoneType' and 'str'


In [75]:
@udf(returnType=StringType())
def good_funify(s):
    return None if s == None else s + " is fun!"

In [76]:
countries2.withColumn("fun_country", good_funify("country")).show()

+--------+---+----------------+
| country| id|     fun_country|
+--------+---+----------------+
|Thailand|  3|Thailand is fun!|
|    null|  4|            null|
+--------+---+----------------+



## Best to avoid a UDF completely

In [81]:
def best_funify(col):
    return F.concat(col, F.lit(" is fun!"))

In [82]:
countries2.withColumn("fun_country", best_funify(countries2.country)).show()

+--------+---+----------------+
| country| id|     fun_country|
+--------+---+----------------+
|Thailand|  3|Thailand is fun!|
|    null|  4|            null|
+--------+---+----------------+

