<a href="https://colab.research.google.com/github/shivashankarkammari/PySpark/blob/main/4_DataFrame_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DF2").getOrCreate()

# transform()

DataFrame.transform(func: Callable[[…], DataFrame], *args: Any, **kwargs: Any) → pyspark.sql.dataframe.DataFrame

The following are the parameters:

func - Custom function to call.

*args - Arguments to pass to func.

*kwargs - Keyword arguments to pass to func.

In [None]:
simpleData = (("Java",4000,5), \
    ("Python", 4600,10),  \
    ("Scala", 4100,15),   \
    ("Scala", 4500,15),   \
    ("PHP", 3000,20),  \
  )
columns= ["CourseName", "fee", "discount"]

# Create DataFrame
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- CourseName: string (nullable = true)
 |-- fee: long (nullable = true)
 |-- discount: long (nullable = true)

+----------+----+--------+
|CourseName|fee |discount|
+----------+----+--------+
|Java      |4000|5       |
|Python    |4600|10      |
|Scala     |4100|15      |
|Scala     |4500|15      |
|PHP       |3000|20      |
+----------+----+--------+



In [None]:
#custom transformation
from pyspark.sql.functions import *

def to_upper(df):
  return df.withColumn("CourseName",upper(df.CourseName))



df = df.transform(to_upper)
df.show()

+----------+----+--------+
|CourseName| fee|discount|
+----------+----+--------+
|      JAVA|4000|       5|
|    PYTHON|4600|      10|
|     SCALA|4100|      15|
|     SCALA|4500|      15|
|       PHP|3000|      20|
+----------+----+--------+



In [None]:
#reducing the fee with custom transformation

def reduce_fee(df,reduce_amount):
  return df.withColumn("new_fee",df.fee - reduce_amount)

df = df.transform(reduce_fee,1000)
df.show()


#applying the discount with custom transformations

def discount(df):
  return df.withColumn("Final_fee",(df.new_fee - (df.new_fee*(df.discount/100)) ))

df = df.transform(discount)
df.show()

+----------+----+--------+-------+
|CourseName| fee|discount|new_fee|
+----------+----+--------+-------+
|      JAVA|4000|       5|   3000|
|    PYTHON|4600|      10|   3600|
|     SCALA|4100|      15|   3100|
|     SCALA|4500|      15|   3500|
|       PHP|3000|      20|   2000|
+----------+----+--------+-------+

+----------+----+--------+-------+---------+
|CourseName| fee|discount|new_fee|Final_fee|
+----------+----+--------+-------+---------+
|      JAVA|4000|       5|   3000|   2850.0|
|    PYTHON|4600|      10|   3600|   3240.0|
|     SCALA|4100|      15|   3100|   2635.0|
|     SCALA|4500|      15|   3500|   2975.0|
|       PHP|3000|      20|   2000|   1600.0|
+----------+----+--------+-------+---------+



The PySpark sql.functions.transform() is used to apply the transformation on a column of type Array. This function applies the specified transformation on every element of the array and returns an object of ArrayType.

In [None]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"]),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"]),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"])
]
df = spark.createDataFrame(data=data,schema=["Name","Languages1","Languages2"])
df.printSchema()
df.show()

root
 |-- Name: string (nullable = true)
 |-- Languages1: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Languages2: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------------+------------------+---------------+
|            Name|        Languages1|     Languages2|
+----------------+------------------+---------------+
|    James,,Smith|[Java, Scala, C++]|  [Spark, Java]|
|   Michael,Rose,|[Spark, Java, C++]|  [Spark, Java]|
|Robert,,Williams|      [CSharp, VB]|[Spark, Python]|
+----------------+------------------+---------------+



In [None]:
df.select(transform("languages2",lambda x: upper(x)).alias("Lang2")).show()

+---------------+
|          Lang2|
+---------------+
|  [SPARK, JAVA]|
|  [SPARK, JAVA]|
|[SPARK, PYTHON]|
+---------------+



apply()

In [None]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



In [None]:
import pyspark.pandas as ps
import numpy as np

technologies = ({
    'Fee' :[20000,25000,30000,22000,np.NaN],
    'Discount':[1000,2500,1500,1200,3000]
               })
# Create a DataFrame
psdf = ps.DataFrame(technologies)
print(psdf)

def add(data):
   return data[0] + data[1]

addDF = psdf.apply(add,axis=1)
print(addDF)



       Fee  Discount
0  20000.0      1000
1  25000.0      2500
2  30000.0      1500
3  22000.0      1200
4      NaN      3000




0    21000.0
1    27500.0
2    31500.0
3    23200.0
4        NaN
dtype: float64


# fillna()

DF.fillna(value,subset=[col_name])

df.na.fill(value,subset=[col_name])

# pivot(), unpivot()


In [None]:
data = [("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"), \
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"), \
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"), \
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico")]

columns= ["Product","Amount","Country"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+



root
 |-- Product: string (nullable = true)
 |-- Canada: long (nullable = true)
 |-- China: long (nullable = true)
 |-- Mexico: long (nullable = true)
 |-- USA: long (nullable = true)

+-------+------+-----+------+----+
|Product|Canada|China|Mexico|USA |
+-------+------+-----+------+----+
|Orange |NULL  |4000 |NULL  |4000|
|Beans  |NULL  |1500 |2000  |1600|
|Banana |2000  |400  |NULL  |1000|
|Carrots|2000  |1200 |NULL  |1500|
+-------+------+-----+------+----+



In [None]:
from pyspark.sql.functions import *

# MapType()

functions for maptype are

1.explode()

2.map_keys()--> gives keys

3.map_values()-->gives values

In [None]:
from pyspark.sql.types import IntegerType
data =[(1,{"fname":"shiva","lname":"shankar"}),
       (2,{"fname":"Hari","lname":"prasad"})]

schema = StructType().add("id",IntegerType(),True).add("name",MapType(StringType(),StringType(),True))

df = spark.createDataFrame(data=data,schema=schema)

df.show(truncate=False)

+---+----------------------------------+
|id |name                              |
+---+----------------------------------+
|1  |{fname -> shiva, lname -> shankar}|
|2  |{fname -> Hari, lname -> prasad}  |
+---+----------------------------------+



In [None]:
df.select(df.id,explode(df.name)).show()

+---+-----+-------+
| id|  key|  value|
+---+-----+-------+
|  1|fname|  shiva|
|  1|lname|shankar|
|  2|fname|   Hari|
|  2|lname| prasad|
+---+-----+-------+



In [None]:
df.select(df.id,map_keys(df.name)).show()

+---+--------------+
| id|map_keys(name)|
+---+--------------+
|  1|[fname, lname]|
|  2|[fname, lname]|
+---+--------------+



# PySpark Aggregate Functions

PySpark SQL Aggregate functions are grouped as “agg_funcs” in Pyspark. Below is a list of functions defined under this group. Click on each link to learn with example.

approx_count_distinct()-->works similar to distinct.

avg()-->gives avg

collect_list(col) -->collect_list() function returns all values from an input column with duplicates.

collect_set()-->function returns all values from an input column with duplicate values eliminated.

countDistinct()-->function returns the number of distinct elements in a columns

count()-->function returns number of elements in a column.

grouping()-->Indicates whether a given input column is aggregated or not. returns 1 for aggregated or 0 for not aggregated in the result. If you try grouping directly on the salary column you will get below error.

first()-->function returns the first element in a column when ignoreNulls is set to true, it returns the first non-null element.

last()-->function returns the last element in a column. when ignoreNulls is set to true, it returns the last non-null element.

kurtosis()-->function returns the kurtosis of the values in a group.

max()-->function returns the maximum value in a column.

min()-->function returns the minimum value in a column.

mean()-->function returns the average of the values in a column. Alias for Avg

skewness-->function returns the skewness of the values in a group.

stddev()-->

stddev_samp

stddev_pop

sum()-->function Returns the sum of all values in a column.

sumDistinct()-->function returns the sum of all distinct values in a column.

variance()-->alias for var_samp()

var_samp() function returns the unbiased variance of the values in a column.

var_pop() function returns the population variance of the values in a column.

In [None]:
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [None]:
#approx_count_distinct

df.select(approx_count_distinct("salary").alias("unique_salary")).show()

+-------------+
|unique_salary|
+-------------+
|            6|
+-------------+



In [None]:
#avg

df.select(avg("salary").alias("avg_salary")).show()

+----------+
|avg_salary|
+----------+
|    3400.0|
+----------+



# Window Functions:

**row_number(): Column**	Returns a sequential number starting from 1 within a window partition

**rank(): Column**	Returns the rank of rows within a window partition, with gaps.

**percent_rank(): Column**	Returns the percentile rank of rows within a window partition.

**dense_rank(): Column**	Returns the rank of rows within a window partition without any gaps. Where as Rank() returns rank with gaps.

**ntile(n: Int): Column	**Returns the ntile id in a window partition.

**cume_dist(): Column**	Returns the cumulative distribution of values within a window partition

**lag(e: Column, offset: Int): Column**
**lag(columnName: String, offset: Int): Column**
**lag(columnName: String, offset: Int, defaultValue: Any): Column **
returns the value that is `offset` rows before the current row, and `null` if there is less than `offset` rows before the current row.

**lead(columnName: String, offset: Int): Column**
**lead(columnName: String, offset: Int): Column**
**lead(columnName: String, offset: Int, defaultValue: Any): Column**
returns the value that is `offset` rows after the current row, and `null` if there is less than `offset` rows after the current row.



In [None]:
#row_number
#row_number() window function is used to give the sequential row number starting from 1 to the result of each window partition.



simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )

columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [None]:
from pyspark.sql.window import Window

In [None]:
windowspec = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(windowspec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         2|
|       Robert|     Sales|  4100|         3|
|         Saif|     Sales|  4100|         4|
|      Michael|     Sales|  4600|         5|
+-------------+----------+------+----------+



In [None]:

# import pyspark
# from pyspark.sql import SparkSession

# spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

# simpleData = (("James", "Sales", 3000), \
#     ("Michael", "Sales", 4600),  \
#     ("Robert", "Sales", 4100),   \
#     ("Maria", "Finance", 3000),  \
#     ("James", "Sales", 3000),    \
#     ("Scott", "Finance", 3300),  \
#     ("Jen", "Finance", 3900),    \
#     ("Jeff", "Marketing", 3000), \
#     ("Kumar", "Marketing", 2000),\
#     ("Saif", "Sales", 4100) \
#   )

# columns= ["employee_name", "department", "salary"]

# df = spark.createDataFrame(data = simpleData, schema = columns)

# df.printSchema()
# df.show(truncate=False)

# from pyspark.sql.window import Window
# from pyspark.sql.functions import row_number
# windowSpec  = Window.partitionBy("department").orderBy("salary")

# df.withColumn("row_number",row_number().over(windowSpec)) \
#     .show(truncate=False)

# from pyspark.sql.functions import rank
# df.withColumn("rank",rank().over(windowSpec)) \
#     .show()

# from pyspark.sql.functions import dense_rank
# df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
#     .show()

# from pyspark.sql.functions import percent_rank
# df.withColumn("percent_rank",percent_rank().over(windowSpec)) \
#     .show()

# from pyspark.sql.functions import ntile
# df.withColumn("ntile",ntile(2).over(windowSpec)) \
#     .show()

# from pyspark.sql.functions import cume_dist
# df.withColumn("cume_dist",cume_dist().over(windowSpec)) \
#    .show()

# from pyspark.sql.functions import lag
# df.withColumn("lag",lag("salary",2).over(windowSpec)) \
#       .show()

# from pyspark.sql.functions import lead
# df.withColumn("lead",lead("salary",2).over(windowSpec)) \
#     .show()

# windowSpecAgg  = Window.partitionBy("department")
# from pyspark.sql.functions import col,avg,sum,min,max,row_number
# df.withColumn("row",row_number().over(windowSpec)) \
#   .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
#   .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
#   .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
#   .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
#   .where(col("row")==1).select("department","avg","sum","min","max") \

## Date functions:

current_date()-->Returns the current date as a date column.

to_date()-->Converts the column into `DateType` by casting rules to `DateType`.

to_date(column, fmt)	Converts the column into a `DateType` with a specified format.

add_months(Column, numMonths)-->Returns the date that is `numMonths` after `startDate`.

date_add(column, days)
date_sub(column, days)-->Returns the date that is `days` days after `start`

year(column)	Extracts the year as an integer from a given date/timestamp/string

quarter(column)	Extracts the quarter as an integer from a given date/timestamp/string.

month(column)	Extracts the month as an integer from a given date/timestamp/string

dayofweek(column)	Extracts the day of the week as an integer from a given date/timestamp/string. Ranges from 1 for a Sunday through to 7 for a Saturday.


# Timestamp functions



current_timestamp ()-->Returns the current timestamp as a timestamp column

hour(column)-->Extracts the hours as an integer from a given date/timestamp/string.

minute(column)-->Extracts the minutes as an integer from a given date/timestamp/string.

second(column)-->Extracts the seconds as an integer from a given date/timestamp/string.

to_timestamp(column)-->Converts to a timestamp by casting rules to `TimestampType`.

to_timestamp(column, fmt)-->Converts time string with the given pattern to timestamp.