In [9]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql import functions as F

spark = SparkSession. \
    builder. \
    appName("Data Sources"). \
    master("local"). \
    config("spark.jars", "../jars/postgresql-42.2.19.jar"). \
    config("spark.sql.legacy.timeParserPolicy", "LEGACY"). \
    getOrCreate()

In [10]:
simpleData = [("James", "Sales", 3000), ("John", "ServiceDesk", 4600), ("Michael", "Sales", 4600), ("Robert", "Sales", 4100),
                 ("Maria", "Finance", 3000), ("James", "Sales", 3000), ("Scott", "Finance", 3300), ("Jen", "Finance", 3900),
                 ("Jeff", "Marketing", 3000), ("Kumar", "Marketing", 2000), ("Saif", "Sales", 4100)]

employeeDF = spark.createDataFrame(simpleData).toDF("employee_name", "department", "salary")

employeeDF.show()


+-------------+-----------+------+
|employee_name| department|salary|
+-------------+-----------+------+
|        James|      Sales|  3000|
|         John|ServiceDesk|  4600|
|      Michael|      Sales|  4600|
|       Robert|      Sales|  4100|
|        Maria|    Finance|  3000|
|        James|      Sales|  3000|
|        Scott|    Finance|  3300|
|          Jen|    Finance|  3900|
|         Jeff|  Marketing|  3000|
|        Kumar|  Marketing|  2000|
|         Saif|      Sales|  4100|
+-------------+-----------+------+



# Window functions

In [14]:
# Window functions in Spark SQL

employeeDF.createOrReplaceTempView("employee")

# How to find second salary in employee
result_sql_df = spark.sql("""select distinct salary from (
                                select 
                                    employee_name, 
                                    department, 
                                    salary, 
                                    row_number() OVER (ORDER BY salary DESC) as row_num, 
                                    rank() OVER (ORDER BY salary DESC) as rank,
                                    dense_rank() OVER (ORDER BY salary DESC) as dense_rank
                              from employee) where dense_rank = 2""")

result_sql_df.explain()
result_sql_df.show()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[salary#137L], functions=[])
   +- HashAggregate(keys=[salary#137L], functions=[])
      +- Project [salary#137L]
         +- Filter (dense_rank#199 = 2)
            +- Window [dense_rank(salary#137L) windowspecdefinition(salary#137L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS dense_rank#199], [salary#137L DESC NULLS LAST]
               +- Sort [salary#137L DESC NULLS LAST], false, 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#134]
                     +- Project [_3#131L AS salary#137L]
                        +- Scan ExistingRDD[_1#129,_2#130,_3#131L]


+------+
|salary|
+------+
|  4100|
+------+



In [16]:
# Window functions in Spark DSL

# windowSpec = Window.partitionBy("department").orderBy(F.col("salary").desc())

windowSpec = Window.orderBy(F.col("salary").desc())
result_with_rank_df = employeeDF.\
    withColumn("rank", F.rank().over(windowSpec)).\
    withColumn("row_number", F.row_number().over(windowSpec)).\
    withColumn("dense_rank", F.dense_rank().over(windowSpec))
result_with_rank_df.explain()
result_with_rank_df.show()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [rank(salary#137L) windowspecdefinition(salary#137L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#292, row_number() windowspecdefinition(salary#137L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number#304, dense_rank(salary#137L) windowspecdefinition(salary#137L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS dense_rank#312], [salary#137L DESC NULLS LAST]
   +- Sort [salary#137L DESC NULLS LAST], false, 0
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#328]
         +- Project [_1#129 AS employee_name#135, _2#130 AS department#136, _3#131L AS salary#137L]
            +- Scan ExistingRDD[_1#129,_2#130,_3#131L]


+-------------+-----------+------+----+----------+----------+
|employee_name| department|salary|rank|row_number|dense_rank|
+-------------+-----------+------+----

In [18]:
# Single Partitions Dangerous DON'T WRITE SUCH CODE IN PROD
result_sql_df = spark.sql("""
                            select 
                                employee_name, 
                                department, 
                                salary, 
                                count(*) OVER () as cnt
                            from employee
                            """)

result_sql_df.show()
result_sql_df.explain()

+-------------+-----------+------+---+
|employee_name| department|salary|cnt|
+-------------+-----------+------+---+
|        James|      Sales|  3000| 11|
|         John|ServiceDesk|  4600| 11|
|      Michael|      Sales|  4600| 11|
|       Robert|      Sales|  4100| 11|
|        Maria|    Finance|  3000| 11|
|        James|      Sales|  3000| 11|
|        Scott|    Finance|  3300| 11|
|          Jen|    Finance|  3900| 11|
|         Jeff|  Marketing|  3000| 11|
|        Kumar|  Marketing|  2000| 11|
|         Saif|      Sales|  4100| 11|
+-------------+-----------+------+---+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [count(1) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt#356L]
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#462]
      +- Project [_1#129 AS employee_name#135, _2#130 AS department#136, _3#131L AS salary#137L]
         +- Scan ExistingRDD[_1#129,_2#130,_3#131L]




In [29]:
# 
single_part_df_1 = employeeDF.\
    withColumn("rank", F.count().over())


single_part_df_1.show()
single_part_df_1.explain()

TypeError: count() missing 1 required positional argument: 'col'

In [21]:
print("DON'T ADD COUNT")
result_sql_df.show()
result_sql_df.explain()

DON'T ADD COUNT
+-------------+-----------+------+---+
|employee_name| department|salary|cnt|
+-------------+-----------+------+---+
|        James|      Sales|  3000| 11|
|         John|ServiceDesk|  4600| 11|
|      Michael|      Sales|  4600| 11|
|       Robert|      Sales|  4100| 11|
|        Maria|    Finance|  3000| 11|
|        James|      Sales|  3000| 11|
|        Scott|    Finance|  3300| 11|
|          Jen|    Finance|  3900| 11|
|         Jeff|  Marketing|  3000| 11|
|        Kumar|  Marketing|  2000| 11|
|         Saif|      Sales|  4100| 11|
+-------------+-----------+------+---+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [count(1) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt#356L]
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#462]
      +- Project [_1#129 AS employee_name#135, _2#130 AS department#136, _3#131L AS salary#137L]
         +- Scan ExistingRDD[_1#129,_2#130,_3#131L

In [30]:
# CORRECT WAY

cnt = employeeDF.count()
result_with_count_df = employeeDF.\
    withColumn("count", F.lit(employeeDF.count()))

result_with_count_df.show()
result_with_count_df.explain()

+-------------+-----------+------+-----+
|employee_name| department|salary|count|
+-------------+-----------+------+-----+
|        James|      Sales|  3000|   11|
|         John|ServiceDesk|  4600|   11|
|      Michael|      Sales|  4600|   11|
|       Robert|      Sales|  4100|   11|
|        Maria|    Finance|  3000|   11|
|        James|      Sales|  3000|   11|
|        Scott|    Finance|  3300|   11|
|          Jen|    Finance|  3900|   11|
|         Jeff|  Marketing|  3000|   11|
|        Kumar|  Marketing|  2000|   11|
|         Saif|      Sales|  4100|   11|
+-------------+-----------+------+-----+

== Physical Plan ==
*(1) Project [_1#129 AS employee_name#135, _2#130 AS department#136, _3#131L AS salary#137L, 11 AS count#468]
+- *(1) Scan ExistingRDD[_1#129,_2#130,_3#131L]




In [31]:
# DON'T ADD ROW NUM becouse Single partitions

single_part_df_2 = employeeDF.\
    withColumn("row_num", F.row_number().over(windowSpec))
single_part_df_2.show()
single_part_df_2.explain()


+-------------+-----------+------+-------+
|employee_name| department|salary|row_num|
+-------------+-----------+------+-------+
|         John|ServiceDesk|  4600|      1|
|      Michael|      Sales|  4600|      2|
|       Robert|      Sales|  4100|      3|
|         Saif|      Sales|  4100|      4|
|          Jen|    Finance|  3900|      5|
|        Scott|    Finance|  3300|      6|
|        James|      Sales|  3000|      7|
|        Maria|    Finance|  3000|      8|
|        James|      Sales|  3000|      9|
|         Jeff|  Marketing|  3000|     10|
|        Kumar|  Marketing|  2000|     11|
+-------------+-----------+------+-------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(salary#137L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_num#491], [salary#137L DESC NULLS LAST]
   +- Sort [salary#137L DESC NULLS LAST], false, 0
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 

In [32]:
# CORRECT WAY

result_with_uniq_num = employeeDF.\
    withColumn("row_num", F.monotonically_increasing_id())
print("CORRECT WAY")
result_with_uniq_num.show()
result_with_uniq_num.explain()


CORRECT WAY
+-------------+-----------+------+-------+
|employee_name| department|salary|row_num|
+-------------+-----------+------+-------+
|        James|      Sales|  3000|      0|
|         John|ServiceDesk|  4600|      1|
|      Michael|      Sales|  4600|      2|
|       Robert|      Sales|  4100|      3|
|        Maria|    Finance|  3000|      4|
|        James|      Sales|  3000|      5|
|        Scott|    Finance|  3300|      6|
|          Jen|    Finance|  3900|      7|
|         Jeff|  Marketing|  3000|      8|
|        Kumar|  Marketing|  2000|      9|
|         Saif|      Sales|  4100|     10|
+-------------+-----------+------+-------+

== Physical Plan ==
*(1) Project [_1#129 AS employee_name#135, _2#130 AS department#136, _3#131L AS salary#137L, monotonically_increasing_id() AS row_num#513L]
+- *(1) Scan ExistingRDD[_1#129,_2#130,_3#131L]




# UDF, UDAF user_define_functions

In [33]:
# Step-1: Define and register UDF function
lambda_is_world_war_two_year = lambda year: 1939 <= year <= 1945

In [34]:
# 1 way to init UDF
is_world_war_two_year = udf(lambda_is_world_war_two_year)

In [36]:
# 2 way to init UDF
spark.udf.register("isWorldWarTwoYear", lambda_is_world_war_two_year)

<function __main__.<lambda>(year)>

In [37]:
stateNames = spark.read.\
    option("header", "true").\
    option("inferSchema", "true").\
    csv("data/statenames")

stateNames.show()

+-----+------+----+---------+---+
|State|Gender|Year|     Name|Cnt|
+-----+------+----+---------+---+
|   IN|     F|1910|     Mary|619|
|   IN|     F|1910|    Helen|324|
|   IN|     F|1910|     Ruth|238|
|   IN|     F|1910|  Dorothy|215|
|   IN|     F|1910|  Mildred|200|
|   IN|     F|1910| Margaret|196|
|   IN|     F|1910|   Thelma|137|
|   IN|     F|1910|     Edna|113|
|   IN|     F|1910|   Martha|112|
|   IN|     F|1910|    Hazel|108|
|   IN|     F|1910|    Alice|107|
|   IN|     F|1910|Elizabeth|106|
|   IN|     F|1910|  Frances|106|
|   IN|     F|1910|    Marie|103|
|   IN|     F|1910|     Anna|100|
|   IN|     F|1910| Florence| 93|
|   IN|     F|1910|    Edith| 87|
|   IN|     F|1910|   Esther| 86|
|   IN|     F|1910|    Irene| 86|
|   IN|     F|1910|   Evelyn| 82|
+-----+------+----+---------+---+
only showing top 20 rows



In [38]:
stateNames.\
    selectExpr("Year", "isWorldWarTwoYear(Year)").\
    distinct().\
    show(50)

+----+-----------------------+
|Year|isWorldWarTwoYear(Year)|
+----+-----------------------+
|1962|                  false|
|1946|                  false|
|1988|                  false|
|1998|                  false|
|1926|                  false|
|1935|                  false|
|1937|                  false|
|1971|                  false|
|1919|                  false|
|1939|                   true|
|1954|                  false|
|1995|                  false|
|2011|                  false|
|2003|                  false|
|1948|                  false|
|2010|                  false|
|1953|                  false|
|1915|                  false|
|1979|                  false|
|1981|                  false|
|2014|                  false|
|1913|                  false|
|1918|                  false|
|1967|                  false|
|2017|                  false|
|1944|                   true|
|1996|                  false|
|1952|                  false|
|2016|                  false|
|1943|  

In [39]:
stateNames.\
    select(F.col("Year"), is_world_war_two_year(F.col("Year"))).\
    distinct().\
    show(50)

+----+--------------+
|Year|<lambda>(Year)|
+----+--------------+
|1962|         false|
|1946|         false|
|1988|         false|
|1998|         false|
|1926|         false|
|1935|         false|
|1937|         false|
|1971|         false|
|1919|         false|
|1939|          true|
|1954|         false|
|1995|         false|
|2011|         false|
|2003|         false|
|1948|         false|
|2010|         false|
|1953|         false|
|1915|         false|
|1979|         false|
|1981|         false|
|2014|         false|
|1913|         false|
|1918|         false|
|1967|         false|
|2017|         false|
|1944|          true|
|1996|         false|
|1952|         false|
|2016|         false|
|1943|          true|
|1960|         false|
|1920|         false|
|1912|         false|
|1982|         false|
|1991|         false|
|1925|         false|
|1994|         false|
|1922|         false|
|1989|         false|
|1929|         false|
|1970|         false|
|1911|         false|
|2015|    

In [40]:
stateNames.createOrReplaceTempView("stateNames")

spark.sql(
    "SELECT DISTINCT Name, Year FROM stateNames WHERE Year IS NOT NULL AND isWorldWarTwoYear(Year) = true ORDER BY Name DESC").\
    show(50)


+------+----+
|  Name|Year|
+------+----+
|Zulema|1943|
|Zulema|1939|
|Zulema|1945|
|Zulema|1944|
|Zulema|1942|
|Zulema|1941|
|Zulema|1940|
|  Zula|1939|
|  Zula|1944|
|  Zula|1941|
|  Zula|1945|
|  Zula|1940|
|  Zula|1943|
|  Zula|1942|
|  Zora|1940|
|  Zora|1941|
|  Zora|1942|
|  Zora|1939|
|  Zora|1944|
|  Zora|1943|
|  Zora|1945|
|Zonnie|1945|
|  Zona|1944|
|  Zona|1939|
|  Zona|1942|
|  Zona|1945|
|  Zona|1941|
|  Zona|1943|
|  Zona|1940|
|Zollie|1945|
|Zollie|1940|
|Zollie|1944|
|  Zola|1942|
|  Zola|1945|
|  Zola|1941|
|  Zola|1940|
|  Zola|1943|
|  Zola|1944|
|  Zola|1939|
| Zoila|1941|
| Zoila|1939|
| Zoila|1943|
| Zoila|1945|
| Zoila|1942|
|   Zoe|1939|
|   Zoe|1945|
|   Zoe|1942|
|   Zoe|1941|
|   Zoe|1940|
|   Zoe|1943|
+------+----+
only showing top 50 rows

