In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



In [3]:
from pyspark.sql.window import Window
import pyspark.sql.functions as f

In [4]:
# windowSpec  = Window.orderBy("salary")
windowSpec  = Window.partitionBy('department').orderBy("salary")


df.withColumn("row_number",f.dense_rank().over(windowSpec)).show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |1         |
|Robert       |Sales     |4100  |2         |
|Saif         |Sales     |4100  |2         |
|Michael      |Sales     |4600  |3         |
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
+-------------+----------+------+----------+



In [5]:
df.withColumn("percent_rank",f.percent_rank().over(windowSpec)) \
    .show()

+-------------+----------+------+------------+
|employee_name|department|salary|percent_rank|
+-------------+----------+------+------------+
|        James|     Sales|  3000|         0.0|
|        James|     Sales|  3000|         0.0|
|       Robert|     Sales|  4100|         0.5|
|         Saif|     Sales|  4100|         0.5|
|      Michael|     Sales|  4600|         1.0|
|        Maria|   Finance|  3000|         0.0|
|        Scott|   Finance|  3300|         0.5|
|          Jen|   Finance|  3900|         1.0|
|        Kumar| Marketing|  2000|         0.0|
|         Jeff| Marketing|  3000|         1.0|
+-------------+----------+------+------------+



ntile() window function returns the relative rank of result rows within a window partition. 

In [6]:
windowSpec  = Window.orderBy("salary")
df.withColumn("ntile",f.ntile(5).over(windowSpec)) \
    .show()

+-------------+----------+------+-----+
|employee_name|department|salary|ntile|
+-------------+----------+------+-----+
|        Kumar| Marketing|  2000|    1|
|        James|     Sales|  3000|    1|
|        Maria|   Finance|  3000|    2|
|        James|     Sales|  3000|    2|
|         Jeff| Marketing|  3000|    3|
|        Scott|   Finance|  3300|    3|
|          Jen|   Finance|  3900|    4|
|       Robert|     Sales|  4100|    4|
|         Saif|     Sales|  4100|    5|
|      Michael|     Sales|  4600|    5|
+-------------+----------+------+-----+



In [9]:
df.select('department').distinct().count()

3

In [14]:
df.withColumn('employee_name', f.when(f.col('department')=='Marketing', 'Vivek').otherwise(f.col('employee_name'))).show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|        Vivek| Marketing|  3000|
|        Vivek| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



In [15]:
df.filter(f.col('salary')>3000).show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Saif|     Sales|  4100|
+-------------+----------+------+



### Window Analytic funtions

In [7]:
# windowSpec  = Window.orderBy("salary")
windowSpec  = Window.partitionBy('department').orderBy("salary")

df.withColumn("cume_dist",f.cume_dist().over(windowSpec)) \
   .show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|        James|     Sales|  3000|               0.4|
|        James|     Sales|  3000|               0.4|
|       Robert|     Sales|  4100|               0.8|
|         Saif|     Sales|  4100|               0.8|
|      Michael|     Sales|  4600|               1.0|
|        Maria|   Finance|  3000|0.3333333333333333|
|        Scott|   Finance|  3300|0.6666666666666666|
|          Jen|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|         Jeff| Marketing|  3000|               1.0|
+-------------+----------+------+------------------+



# Exploring Maptype and chain and dictionary

In [18]:
from pyspark.sql.types import StringType, MapType

In [None]:
MapType()