In [0]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("Postgres connection") \
    .config("spark.jars", "/usr/local/postgresql-42.2.5.jar") \
    .getOrCreate()

# Set connection parameters
url = "jdbc:postgresql://13.xxx.xx.xx:5432/postgres"
table = "actor"
properties = {
    "user": "postgres",
    "password": "xxxxxxxxx",
    "driver": "org.postgresql.Driver"
}
# Read table into DataFrame
df = spark.read.jdbc(url=url, table=table, properties=properties) #read table actor

In [0]:
df.show(5)

+--------+----------+------------+-------------------+
|actor_id|first_name|   last_name|        last_update|
+--------+----------+------------+-------------------+
|       1|  PENELOPE|     GUINESS|2006-02-15 09:34:33|
|       2|      NICK|    WAHLBERG|2006-02-15 09:34:33|
|       3|        ED|       CHASE|2006-02-15 09:34:33|
|       4|  JENNIFER|       DAVIS|2006-02-15 09:34:33|
|       5|    JOHNNY|LOLLOBRIGIDA|2006-02-15 09:34:33|
+--------+----------+------------+-------------------+
only showing top 5 rows



In [0]:
# PySpark SQL expr() Function Examples
from pyspark.sql.functions import expr

In [0]:
# Concatenate Columns using || (similar to SQL)

df.withColumn("Full_Name",expr(" first_name ||' '|| last_name")).show(5)


+--------+----------+------------+-------------------+-------------------+
|actor_id|first_name|   last_name|        last_update|          Full_Name|
+--------+----------+------------+-------------------+-------------------+
|       1|  PENELOPE|     GUINESS|2006-02-15 09:34:33|   PENELOPE GUINESS|
|       2|      NICK|    WAHLBERG|2006-02-15 09:34:33|      NICK WAHLBERG|
|       3|        ED|       CHASE|2006-02-15 09:34:33|           ED CHASE|
|       4|  JENNIFER|       DAVIS|2006-02-15 09:34:33|     JENNIFER DAVIS|
|       5|    JOHNNY|LOLLOBRIGIDA|2006-02-15 09:34:33|JOHNNY LOLLOBRIGIDA|
+--------+----------+------------+-------------------+-------------------+
only showing top 5 rows



In [0]:
df.printSchema()

root
 |-- actor_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- last_update: timestamp (nullable = true)



In [0]:
# Using SQL CASE WHEN with expr()
df3=df.withColumn("actor_type", expr("CASE WHEN actor_id > '10' THEN 'Type_C' " +
                                     "WHEN actor_id > '5' THEN 'Type_B' " +
                                     "WHEN actor_id >= '1' THEN 'Type_A' " +
                                     "WHEN actor_id > '15' THEN 'unknown' ELSE 'unknown' END"))
df3.show(15)

+--------+----------+------------+-------------------+----------+
|actor_id|first_name|   last_name|        last_update|actor_type|
+--------+----------+------------+-------------------+----------+
|       1|  PENELOPE|     GUINESS|2006-02-15 09:34:33|    Type_A|
|       2|      NICK|    WAHLBERG|2006-02-15 09:34:33|    Type_A|
|       3|        ED|       CHASE|2006-02-15 09:34:33|    Type_A|
|       4|  JENNIFER|       DAVIS|2006-02-15 09:34:33|    Type_A|
|       5|    JOHNNY|LOLLOBRIGIDA|2006-02-15 09:34:33|    Type_A|
|       6|     BETTE|   NICHOLSON|2006-02-15 09:34:33|    Type_B|
|       7|     GRACE|      MOSTEL|2006-02-15 09:34:33|    Type_B|
|       8|   MATTHEW|   JOHANSSON|2006-02-15 09:34:33|    Type_B|
|       9|       JOE|       SWANK|2006-02-15 09:34:33|    Type_B|
|      10| CHRISTIAN|       GABLE|2006-02-15 09:34:33|    Type_B|
|      11|      ZERO|        CAGE|2006-02-15 09:34:33|    Type_C|
|      12|      KARL|       BERRY|2006-02-15 09:34:33|    Type_C|
|      13|

In [0]:
df3.select(df.first_name,\
     expr("len(first_name)")\
  .alias("lenght_fname")).show(5)

+----------+------------+
|first_name|lenght_fname|
+----------+------------+
|  PENELOPE|           8|
|      NICK|           4|
|        ED|           2|
|  JENNIFER|           8|
|    JOHNNY|           6|
+----------+------------+
only showing top 5 rows



In [0]:
# Using an Existing Column Value for Expression
#Add Month value from another column
df.select(df.last_update,df.actor_id, \
     expr("add_months(last_update, actor_id)") \
  .alias("inc_date")).show(10)

+-------------------+--------+----------+
|        last_update|actor_id|  inc_date|
+-------------------+--------+----------+
|2006-02-15 09:34:33|       1|2006-03-15|
|2006-02-15 09:34:33|       2|2006-04-15|
|2006-02-15 09:34:33|       3|2006-05-15|
|2006-02-15 09:34:33|       4|2006-06-15|
|2006-02-15 09:34:33|       5|2006-07-15|
|2006-02-15 09:34:33|       6|2006-08-15|
|2006-02-15 09:34:33|       7|2006-09-15|
|2006-02-15 09:34:33|       8|2006-10-15|
|2006-02-15 09:34:33|       9|2006-11-15|
|2006-02-15 09:34:33|      10|2006-12-15|
+-------------------+--------+----------+
only showing top 10 rows



In [0]:
# Giving Column Alias along with expr()
# Providing alias using 'as'

df.select(df.last_update,df.actor_id, \
     expr("""add_months(last_update, actor_id) as incl_date""")
  ).show(5)

+-------------------+--------+----------+
|        last_update|actor_id| incl_date|
+-------------------+--------+----------+
|2006-02-15 09:34:33|       1|2006-03-15|
|2006-02-15 09:34:33|       2|2006-04-15|
|2006-02-15 09:34:33|       3|2006-05-15|
|2006-02-15 09:34:33|       4|2006-06-15|
|2006-02-15 09:34:33|       5|2006-07-15|
+-------------------+--------+----------+
only showing top 5 rows



In [0]:
# Case Function with expr()
# converts int data type to String type.
df3 = df.select("actor_id",expr("cast(actor_id as string) as str_actor_id"))
df3.printSchema()
df3.show(5)

root
 |-- actor_id: integer (nullable = true)
 |-- str_actor_id: string (nullable = true)

+--------+------------+
|actor_id|str_actor_id|
+--------+------------+
|       1|           1|
|       2|           2|
|       3|           3|
|       4|           4|
|       5|           5|
+--------+------------+
only showing top 5 rows



In [0]:
# Arithmetic operations
df4 = df.select(df.actor_id,expr("actor_id + 10 as new_actor_id")
               ).show(5)

+--------+------------+
|actor_id|new_actor_id|
+--------+------------+
|       1|          11|
|       2|          12|
|       3|          13|
|       4|          14|
|       5|          15|
+--------+------------+
only showing top 5 rows



In [0]:
# Using Filter with expr()
from pyspark.sql.functions import expr
df5 = df.filter(expr("actor_id > 10")).show(5)

+--------+----------+---------+-------------------+
|actor_id|first_name|last_name|        last_update|
+--------+----------+---------+-------------------+
|      11|      ZERO|     CAGE|2006-02-15 09:34:33|
|      12|      KARL|    BERRY|2006-02-15 09:34:33|
|      13|       UMA|     WOOD|2006-02-15 09:34:33|
|      14|    VIVIEN|   BERGEN|2006-02-15 09:34:33|
|      15|      CUBA|  OLIVIER|2006-02-15 09:34:33|
+--------+----------+---------+-------------------+
only showing top 5 rows

