In [1]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
            .appName('SparkByExamples.com') \
            .getOrCreate()

# Prepare Data
simpleData = (("Java",4000,5), \
    ("Python", 4600,10),  \
    ("Scala", 4100,15),   \
    ("Scala", 4500,15),   \
    ("PHP", 3000,20),  \
  )
columns= ["CourseName", "fee", "discount"]

# Create DataFrame
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

24/12/19 08:30:54 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/12/19 08:30:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/19 08:30:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/19 08:30:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/12/19 08:30:56 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


root
 |-- CourseName: string (nullable = true)
 |-- fee: long (nullable = true)
 |-- discount: long (nullable = true)



                                                                                

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+



In [2]:
from pyspark.sql.functions import upper
def to_upper_str_columns(df):
    return df.withColumn("CourseName",upper(df.CourseName))

# Custom transformation 2
def reduce_price(df,reduceBy):
    return df.withColumn("new_fee",df.fee - reduceBy)

# Custom transformation 3
def apply_discount(df):
    return df.withColumn("discounted_fee",  \
             df.new_fee - (df.new_fee * df.discount) / 100)

In [4]:
def select_columns(df):
    return df.select("CourseName","discounted_fee")

# Chain transformations
df2 = df.transform(to_upper_str_columns) \
        .transform(reduce_price,1000) \
        .transform(apply_discount) \
        .transform(select_columns).show()

+----------+--------------+
|CourseName|discounted_fee|
+----------+--------------+
|      JAVA|        2850.0|
|    PYTHON|        3240.0|
|     SCALA|        2635.0|
|     SCALA|        2975.0|
|       PHP|        1600.0|
+----------+--------------+



In [None]:
# pyspark.sql.functions.transform(col, f)

In [5]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"]),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"]),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"])
]
df = spark.createDataFrame(data=data,schema=["Name","Languages1","Languages2"])
df.printSchema()
df.show()

# using transform() function
from pyspark.sql.functions import upper
from pyspark.sql.functions import transform
df.select(transform("Languages1", lambda x: upper(x)).alias("languages1")) \
  .show()

root
 |-- Name: string (nullable = true)
 |-- Languages1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Languages2: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+------------------+---------------+
|            Name|        Languages1|     Languages2|
+----------------+------------------+---------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|
+----------------+------------------+---------------+

+------------------+
|        languages1|
+------------------+
|[JAVA, SCALA, C++]|
|[SPARK, JAVA, C++]|
|      [CSHARP, VB]|
+------------------+



In [6]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"]),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"]),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"])
]
df = spark.createDataFrame(data=data,schema=["Name","Languages1","Languages2"])
df.printSchema()
df.show()

# using transform() function
from pyspark.sql.functions import upper
from pyspark.sql.functions import transform
df.select(transform("Languages1", lambda x: upper(x)).alias("languages1")) \
  .show()

root
 |-- Name: string (nullable = true)
 |-- Languages1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Languages2: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+------------------+---------------+
|            Name|        Languages1|     Languages2|
+----------------+------------------+---------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|
+----------------+------------------+---------------+

+------------------+
|        languages1|
+------------------+
|[JAVA, SCALA, C++]|
|[SPARK, JAVA, C++]|
|      [CSHARP, VB]|
+------------------+

