```{figure} ../images/banner.png
---
align: center
name: banner
---
```

# Chapter 17 : Window operations

## Chapter Learning Objectives

- Various data operations on columns containing map. 

## Chapter Outline

- [1. How to deal with map column?](#1)
    - [1a. How to create a column of map type?](#2)
    - [1b. How to read individual elements of a map column ?](#3)
    - [1c. How to extract the keys from a map column?](#4)
    - [1d. How to extract the values from a map column?](#5)
    - [1e. How to convert a map column into an array column?](#6)
    - [1f. How to create a map column from multiple array columns?](#7)
    - [1g. How to combine multiple map columns into one?](#8)

In [48]:

import pyspark
from pyspark.sql import functions as func
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
from IPython.display import display_html
import pandas as pd 
import numpy as np
def display_side_by_side(*args):
    html_str=''
    for df in args:
        html_str+=df.to_html(index=False)
        html_str+= "\xa0\xa0\xa0"*10
    display_html(html_str.replace('table','table style="display:inline"'),raw=True)
space = "\xa0" * 10

In [2]:
import panel as pn

css = """
div.special_table + table, th, td {
  border: 3px solid orange;
}
"""
pn.extension(raw_css=[css])

<a id='1'></a>

##  Chapter Outline - Gallery

<div class="special_table"></div>

Click on any of the image below                 |To come back to this image gallery, on the top right corner under contents, click on Chapter Outline - Gallery" 
- | - 
[![alt](img/chapter9/1.png)](#2)| [![alt](img/chapter9/2.png)](#3)
[![alt](img/chapter9/3.png)](#4)| [![alt](img/chapter9/4.png)](#5)
[![alt](img/chapter9/5.png)](#6)| [![alt](img/chapter9/6.png)](#7)
[![alt](img/chapter9/7.png)](#8)|

<a id='2'></a>

## 1a. How to create a column of map type?

Explanation here.

```{figure} img/chapter9/1a.png
---
align: center
---
```

Lets first understand the syntax



```{admonition} Syntax
<b>pyspark.sql.functions.map_keys(col)</b>

Returns an unordered array containing the keys of the map.


<b>Parameters</b>:

- col – name of column or expression


'''

At the top level there are mainly 3 types of joins:

INNER
OUTER
CROSS

INNER JOIN - fetches data if present in both the tables.

OUTER JOIN are of 3 types:

LEFT OUTER JOIN - fetches data if present in the left table.
RIGHT OUTER JOIN - fetches data if present in the right table.
FULL OUTER JOIN - fetches data if present in either of the two tables.

CROSS JOIN, as the name suggests, does [n X m] that joins everything to everything.
Similar to scenario where we simply lists the tables for joining (in the FROM clause of the SELECT statement), using commas to separate them.

In [77]:
###### from pyspark.sql.functions import to_date
df = spark.createDataFrame([
    ("sales",10,6000),("hr",7,3000),("it",5,5000),("sales",2,6000),
    ("hr",3,2000),("hr",4,6000),("it",8,8000),("sales",9,5000),
    ("sales",1,7000),("it",6,6000)],
    ["dept_id","emp_id","salary"])
d1 = df.toPandas()
print(d1.to_string(index=False))#show(truncate=False)print()

dept_id  emp_id  salary
  sales      10    6000
     hr       7    3000
     it       5    5000
  sales       2    6000
     hr       3    2000
     hr       4    6000
     it       8    8000
  sales       9    5000
  sales       1    7000
     it       6    6000


What are Window Functions?

A window function performs a calculation across a set of table rows that are somehow related to the current row. This is comparable to the type of calculation that can be done with an aggregate function. But unlike regular aggregate functions, use of a window function does not cause rows to become grouped into a single output row — the rows retain their separate identities. Behind the scenes, the window function is able to access more than just the current row of the query result.

In the DataFrame API, we provide utility functions to define a window specification. Taking Python as an example, users can specify partitioning expressions and ordering expressions as follows.

from pyspark.sql.window import Window
windowSpec = \
  Window \
    .partitionBy(...) \
    .orderBy(...)
    
In addition to the ordering and partitioning, users need to define the start boundary of the frame, the end boundary of the frame, and the type of the frame, which are three components of a frame specification.

There are five types of boundaries, which are unboundedPreceding, unboundedFollowing, currentRow, <value> Preceding, and <value> Following. 
    
unboundedPreceding and unboundedFollowing represent the first row of the partition and the last row of the partition, respectively. 
    
For the other three types of boundaries, they specify the offset from the position of the current input row and their specific meanings are defined based on the type of the frame. 
    
There are two types of frames, ROW frame and RANGE frame.
    
ROW frame

ROW frames are based on physical offsets from the position of the current input row, which means thatcurrentRow, <value> Preceding, and <value> Following specifies a physical offset. 
    
If currentRow is used as a boundary, it represents the current input row. 
<value> Preceding and <value> Following describes the number of rows appear before and after the current input row, respectively. 
    
The following figure illustrates a ROW frame with a 1 Preceding as the start boundary and 1 FOLLOWING as the end boundary (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING in the SQL syntax).

In [80]:
from pyspark.sql import functions as func
from pyspark.sql import Window
window = Window.partitionBy("dept_id").orderBy("emp_id").rowsBetween(-2,  0)
print(df.withColumn("sum", func.sum("salary").over(window)).toPandas().to_string(index=False))#show()

dept_id  emp_id  salary   sum
  sales       1    7000  7000
  sales       2    6000 13000
  sales       9    5000 18000
  sales      10    6000 17000
     hr       3    2000  2000
     hr       4    6000  8000
     hr       7    3000 11000
     it       5    5000  5000
     it       6    6000 11000
     it       8    8000 19000


In [81]:
window = Window.partitionBy("dept_id").orderBy("emp_id").rowsBetween(0, 2)
print(df.withColumn("sum", func.sum("salary").over(window)).toPandas().to_string(index=False))#show()

dept_id  emp_id  salary   sum
  sales       1    7000 18000
  sales       2    6000 17000
  sales       9    5000 11000
  sales      10    6000  6000
     hr       3    2000 11000
     hr       4    6000  9000
     hr       7    3000  3000
     it       5    5000 19000
     it       6    6000 14000
     it       8    8000  8000


In [82]:
window = Window.rowsBetween(-2,  0)
print(df.withColumn("sum", func.sum("salary").over(window)).toPandas().to_string(index=False))#show()

dept_id  emp_id  salary   sum
  sales      10    6000  6000
     hr       7    3000  9000
     it       5    5000 14000
  sales       2    6000 14000
     hr       3    2000 13000
     hr       4    6000 14000
     it       8    8000 16000
  sales       9    5000 19000
  sales       1    7000 20000
     it       6    6000 18000


In [83]:
window = Window.rowsBetween(0,  1)
df.withColumn("sum", func.sum("salary").over(window)).show()
print(df.withColumn("sum", func.sum("salary").over(window)).toPandas().to_string(index=False))

+-------+------+------+-----+
|dept_id|emp_id|salary|  sum|
+-------+------+------+-----+
|  sales|    10|  6000| 9000|
|     hr|     7|  3000| 8000|
|     it|     5|  5000|11000|
|  sales|     2|  6000| 8000|
|     hr|     3|  2000| 8000|
|     hr|     4|  6000|14000|
|     it|     8|  8000|13000|
|  sales|     9|  5000|12000|
|  sales|     1|  7000|13000|
|     it|     6|  6000| 6000|
+-------+------+------+-----+

dept_id  emp_id  salary   sum
  sales      10    6000  9000
     hr       7    3000  8000
     it       5    5000 11000
  sales       2    6000  8000
     hr       3    2000  8000
     hr       4    6000 14000
     it       8    8000 13000
  sales       9    5000 12000
  sales       1    7000 13000
     it       6    6000  6000


In [85]:
window = Window.partitionBy("dept_id").orderBy(func.col("salary").desc())
print(df.withColumn("rank", func.dense_rank().over(window)).toPandas().to_string(index=False))#show()

dept_id  emp_id  salary  rank
  sales       1    7000     1
  sales      10    6000     2
  sales       2    6000     2
  sales       9    5000     3
     hr       4    6000     1
     hr       7    3000     2
     hr       3    2000     3
     it       8    8000     1
     it       6    6000     2
     it       5    5000     3


In [87]:
window = Window.partitionBy("dept_id").orderBy(func.col("salary").desc())
print(df.withColumn("rank", func.rank().over(window)).toPandas().to_string(index=False))#show()

dept_id  emp_id  salary  rank
  sales       1    7000     1
  sales      10    6000     2
  sales       2    6000     2
  sales       9    5000     4
     hr       4    6000     1
     hr       7    3000     2
     hr       3    2000     3
     it       8    8000     1
     it       6    6000     2
     it       5    5000     3


In [89]:
window = Window.partitionBy("dept_id").orderBy(func.col("salary").desc())
print(df.withColumn("salary_bucket", func.ntile(4).over(window)).toPandas().to_string(index=False))#show()

dept_id  emp_id  salary  salary_bucket
  sales       1    7000              1
  sales      10    6000              2
  sales       2    6000              3
  sales       9    5000              4
     hr       4    6000              1
     hr       7    3000              2
     hr       3    2000              3
     it       8    8000              1
     it       6    6000              2
     it       5    5000              3


In [90]:
window = Window.partitionBy("dept_id").orderBy(func.col("salary").desc())
print(df.withColumn("previousrow_salary", func.lag('salary',1).over(window)).toPandas().to_string(index=False))#.show()

dept_id  emp_id  salary  previousrow_salary
  sales       1    7000                 NaN
  sales      10    6000              7000.0
  sales       2    6000              6000.0
  sales       9    5000              6000.0
     hr       4    6000                 NaN
     hr       7    3000              6000.0
     hr       3    2000              3000.0
     it       8    8000                 NaN
     it       6    6000              8000.0
     it       5    5000              6000.0


In [91]:
window = Window.partitionBy("dept_id").orderBy(func.col("salary").desc())
print(df.withColumn("nextrow_salary", func.lead('salary',1).over(window)).toPandas().to_string(index=False))#show()

dept_id  emp_id  salary  nextrow_salary
  sales       1    7000          6000.0
  sales      10    6000          6000.0
  sales       2    6000          5000.0
  sales       9    5000             NaN
     hr       4    6000          3000.0
     hr       7    3000          2000.0
     hr       3    2000             NaN
     it       8    8000          6000.0
     it       6    6000          5000.0
     it       5    5000             NaN


In [92]:
window = Window.partitionBy("dept_id").orderBy(func.col("salary"))
print(df.withColumn("percentile", func.percent_rank().over(window)).toPandas().to_string(index=False))#show()

dept_id  emp_id  salary  percentile
  sales       9    5000    0.000000
  sales      10    6000    0.333333
  sales       2    6000    0.333333
  sales       1    7000    1.000000
     hr       3    2000    0.000000
     hr       7    3000    0.500000
     hr       4    6000    1.000000
     it       5    5000    0.000000
     it       6    6000    0.500000
     it       8    8000    1.000000


In [93]:
window = Window.partitionBy("dept_id").orderBy(func.col("salary"))
print(df.withColumn("row_no", func.row_number().over(window)).toPandas().to_string(index=False))#show()

dept_id  emp_id  salary  row_no
  sales       9    5000       1
  sales      10    6000       2
  sales       2    6000       3
  sales       1    7000       4
     hr       3    2000       1
     hr       7    3000       2
     hr       4    6000       3
     it       5    5000       1
     it       6    6000       2
     it       8    8000       3


In [94]:
window = Window.partitionBy("dept_id").orderBy(func.col("salary"))
print(df.withColumn("cume_dist", func.cume_dist().over(window)).toPandas().to_string(index=False))#show()

dept_id  emp_id  salary  cume_dist
  sales       9    5000   0.250000
  sales      10    6000   0.750000
  sales       2    6000   0.750000
  sales       1    7000   1.000000
     hr       3    2000   0.333333
     hr       7    3000   0.666667
     hr       4    6000   1.000000
     it       5    5000   0.333333
     it       6    6000   0.666667
     it       8    8000   1.000000


Left Join

A left join returns all values from the left relation and the matched values from the right relation, or appends NULL if there is no match. It is also referred to as a left outer join.
LEFT JOIN and LEFT OUTER JOIN are equivalent.

LEFT  JOIN - fetches data if present in the left table and only matching records from the right table.

In [48]:
df_left.join(df_right,on="customer_id",how="left").toPandas()#show()

Unnamed: 0,customer_id,order_id,amount,name
0,6,1005,200,
1,1,1001,100,john
2,1,1004,200,john
3,3,1003,300,tony
4,2,1002,200,mike


In [25]:
df_left.join(df_right,on="customer_id",how="left_outer").show()

+-----------+--------+------+----+
|customer_id|order_id|amount|name|
+-----------+--------+------+----+
|          6|    1005|   200|null|
|          1|    1001|   100|john|
|          1|    1004|   200|john|
|          3|    1003|   300|tony|
|          2|    1002|   200|mike|
+-----------+--------+------+----+



In [26]:
df_left.join(df_right,on="customer_id",how="leftouter").show()

+-----------+--------+------+----+
|customer_id|order_id|amount|name|
+-----------+--------+------+----+
|          6|    1005|   200|null|
|          1|    1001|   100|john|
|          1|    1004|   200|john|
|          3|    1003|   300|tony|
|          2|    1002|   200|mike|
+-----------+--------+------+----+



Right Join

RIGHT JOIN - fetches data if present in the right table even if there is no matching records in the right table.

In [49]:
df_left.join(df_right,on="customer_id",how="right").toPandas()#show()

Unnamed: 0,customer_id,order_id,amount,name
0,1,1001.0,100.0,john
1,1,1004.0,200.0,john
2,3,1003.0,300.0,tony
3,2,1002.0,200.0,mike
4,4,,,kent


In [28]:
df_left.join(df_right,on="customer_id",how="right_outer").show()

+-----------+--------+------+----+
|customer_id|order_id|amount|name|
+-----------+--------+------+----+
|          1|    1001|   100|john|
|          1|    1004|   200|john|
|          3|    1003|   300|tony|
|          2|    1002|   200|mike|
|          4|    null|  null|kent|
+-----------+--------+------+----+



In [29]:
df_left.join(df_right,on="customer_id",how="rightouter").show()

+-----------+--------+------+----+
|customer_id|order_id|amount|name|
+-----------+--------+------+----+
|          1|    1001|   100|john|
|          1|    1004|   200|john|
|          3|    1003|   300|tony|
|          2|    1002|   200|mike|
|          4|    null|  null|kent|
+-----------+--------+------+----+



FULL JOIN - fetches data if present in either of the two tables.

In [50]:
df_left.join(df_right,on="customer_id",how="full").toPandas()#show()

Unnamed: 0,customer_id,order_id,amount,name
0,6,1005.0,200.0,
1,1,1001.0,100.0,john
2,1,1004.0,200.0,john
3,3,1003.0,300.0,tony
4,2,1002.0,200.0,mike
5,4,,,kent


In [31]:
df_left.join(df_right,on="customer_id",how="fullouter").show()

+-----------+--------+------+----+
|customer_id|order_id|amount|name|
+-----------+--------+------+----+
|          6|    1005|   200|null|
|          1|    1001|   100|john|
|          1|    1004|   200|john|
|          3|    1003|   300|tony|
|          2|    1002|   200|mike|
|          4|    null|  null|kent|
+-----------+--------+------+----+



In [32]:
df_left.join(df_right,on="customer_id",how="full_outer").show()

+-----------+--------+------+----+
|customer_id|order_id|amount|name|
+-----------+--------+------+----+
|          6|    1005|   200|null|
|          1|    1001|   100|john|
|          1|    1004|   200|john|
|          3|    1003|   300|tony|
|          2|    1002|   200|mike|
|          4|    null|  null|kent|
+-----------+--------+------+----+



In [47]:
df_left.join(df_right,on="customer_id",how="outer").show()

+-----------+--------+------+----+
|customer_id|order_id|amount|name|
+-----------+--------+------+----+
|          6|    1005|   200|null|
|          1|    1001|   100|john|
|          1|    1004|   200|john|
|          3|    1003|   300|tony|
|          2|    1002|   200|mike|
|          4|    null|  null|kent|
+-----------+--------+------+----+



cross join

In [34]:
spark.conf.get("spark.sql.crossJoin.enabled")

'true'

In [None]:
spark.conf.set("spark.sql.crossJoin.enabled", "true")

In [37]:
df_left.join(df_right,on="customer_id",how="semi").show()
#semi, leftsemi, left_semi, anti, leftanti and left_anti.

+-----------+--------+------+
|customer_id|order_id|amount|
+-----------+--------+------+
|          1|    1001|   100|
|          1|    1004|   200|
|          3|    1003|   300|
|          2|    1002|   200|
+-----------+--------+------+



In [38]:
df_left.join(df_right,on="customer_id",how="leftsemi").show()

+-----------+--------+------+
|customer_id|order_id|amount|
+-----------+--------+------+
|          1|    1001|   100|
|          1|    1004|   200|
|          3|    1003|   300|
|          2|    1002|   200|
+-----------+--------+------+



In [39]:
df_left.join(df_right,on="customer_id",how="left_semi").show()

+-----------+--------+------+
|customer_id|order_id|amount|
+-----------+--------+------+
|          1|    1001|   100|
|          1|    1004|   200|
|          3|    1003|   300|
|          2|    1002|   200|
+-----------+--------+------+



In [40]:
df_left.join(df_right,on="customer_id",how="anti").show()

+-----------+--------+------+
|customer_id|order_id|amount|
+-----------+--------+------+
|          6|    1005|   200|
+-----------+--------+------+



In [41]:
df_left.join(df_right,on="customer_id",how="leftanti").show()

+-----------+--------+------+
|customer_id|order_id|amount|
+-----------+--------+------+
|          6|    1005|   200|
+-----------+--------+------+



In [42]:
df_left.join(df_right,on="customer_id",how="left_anti").show()

+-----------+--------+------+
|customer_id|order_id|amount|
+-----------+--------+------+
|          6|    1005|   200|
+-----------+--------+------+



In [51]:
df_left.crossJoin(df_right).toPandas()#show()

Unnamed: 0,order_id,customer_id,amount,customer_id.1,name
0,1001,1,100,1,john
1,1001,1,100,2,mike
2,1001,1,100,3,tony
3,1001,1,100,4,kent
4,1002,2,200,1,john
5,1002,2,200,2,mike
6,1002,2,200,3,tony
7,1002,2,200,4,kent
8,1003,3,300,1,john
9,1003,3,300,2,mike


- Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. 

- Note that even with Arrow, toPandas() results in the collection of all records in the DataFrame to the driver program and should be done on a small subset of the data. 

- Not all Spark data types are currently supported and an error can be raised if a column has an unsupported type, see Supported SQL Types. If an error occurs during createDataFrame(), Spark will fall back to create the DataFrame without Arrow.

- Supported SQL Types

  - Currently, all Spark SQL data types are supported by Arrow-based conversion except MapType, ArrayType of TimestampType, and nested StructType.

<a id='3'></a>

## 1b. Pandas UDFs (Vectorized UDFs)


```{figure} img/chapter9/1b.png
---
align: center
---
```

- Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data to perform  vectorized operations. 

- A Pandas UDF is defined using the pandas_udf as a decorator.

- Pandas UDFs used to be defined with Python type hints.

- Note that the type hint should use pandas.Series in all cases but there is one variant that pandas.DataFrame should be used for its input or output type hint instead when the input or output column is of StructType. 

- The following example shows a Pandas UDF which takes long column, string column and struct column, and outputs a struct column. It requires the function to specify the type hints of pandas.Series and pandas.DataFrame as below:



In [19]:
import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("city string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a sturct column.
df = spark.createDataFrame(
    [[1, "tony", ("seattle",)]],
    "id long, name string, city_struct struct<city:string>")

df.printSchema()
df.show()

df_pandas = df.select(func("id", "name", "city_struct").alias("pandas_udf"))
df_pandas.printSchema()
df_pandas.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city_struct: struct (nullable = true)
 |    |-- city: string (nullable = true)

+---+----+-----------+
| id|name|city_struct|
+---+----+-----------+
|  1|tony|  [seattle]|
+---+----+-----------+

root
 |-- pandas_udf: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- col2: long (nullable = true)

+------------+
|  pandas_udf|
+------------+
|[seattle, 5]|
+------------+



<b> Summary:</b>

In [20]:
print("Input                     ",            "Output")
display_side_by_side(df.toPandas(),df_pandas.toPandas())

Input                      Output


id,name,city_struct
1,tony,{'city': 'seattle'}

pandas_udf
"{'city': 'seattle', 'col2': 5}"


In [61]:
display_html(df.printSchema())   
display_html(df.printSchema())

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city_struct: struct (nullable = true)
 |    |-- city: string (nullable = true)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city_struct: struct (nullable = true)
 |    |-- city: string (nullable = true)



In [63]:
var1 = df._jdf.schema().treeString()

In [64]:
print(var1,end ='');
print(var1),

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city_struct: struct (nullable = true)
 |    |-- city: string (nullable = true)
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city_struct: struct (nullable = true)
 |    |-- city: string (nullable = true)



(None,)

<a id='4'></a>

## 1c. How to extract the keys from a map column?


```{figure} img/chapter9/1c.png
---
align: center
---
```

Lets first understand the syntax



```{admonition} Syntax
<b>pyspark.sql.functions.map_keys(col)</b>

Returns an unordered array containing the keys of the map.


<b>Parameters</b>:

- col – name of column or expression


'''

<b>Input:  Spark data frame consisting of a map column </b>

In [65]:
import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

0    1
1    4
2    9
dtype: int64
+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+



<b>Output :  Spark data frame consisting of a column of keys </b>

In [23]:
from pyspark.sql.functions import map_keys
df_keys = df2.select(map_keys(df2.data).alias("keys"))
df_keys.show()

+---------+
|     keys|
+---------+
|[a, b, c]|
+---------+



<b> Summary:</b>

In [24]:
print("input                     ",            "output")
display_side_by_side(df2.toPandas(),df_keys.toPandas())

input                      output


data
"{'a': 1, 'b': None, 'c': 3}"

keys
"[a, b, c]"


<a id='5'></a>

## 1d. Iterator of Series to Iterator of Series

An iterator UDF is the same as a scalar pandas UDF except:

The Python function

- Takes an iterator of batches instead of a single input batch as input.
- Returns an iterator of output batches instead of a single output batch.
- The length of the entire output in the iterator should be the same as the length of the entire input.
- The wrapped pandas UDF takes a single Spark column as an input.
- You should specify the Python type hint as Iterator[pandas.Series] -> Iterator[pandas.Series].
- This pandas UDF is useful when the UDF execution requires initializing some state, for example, loading a machine learning model file to apply inference to every input batch.

The following example shows how to create a pandas UDF with iterator support.

```{figure} img/chapter9/1d.png
---
align: center
---
```

In [78]:
from typing import Iterator

import pandas as pd

from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
df.show()

var_bc = spark.sparkContext.broadcast(100)

def calculate_complex(var1,var2):
    return var1+var2+var1*var2

# Declare the function and create the UDF
@pandas_udf("long")
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    var = var_bc.value
    for x in iterator:
        yield calculate_complex(x , var)

df_out = df.select(plus_one("x"))
df_out.show()


+---+
|  x|
+---+
|  1|
|  2|
|  3|
+---+

+-----------+
|plus_one(x)|
+-----------+
|        201|
|        302|
|        403|
+-----------+



In [79]:
print("Input                     ",            "Output")
display_side_by_side(df.toPandas(),df_out.toPandas())

Input                      Output


x
1
2
3

plus_one(x)
201
302
403


<a id='6'></a>

## 1e. Iterator of Multiple Series to Iterator of Series

- An Iterator of multiple Series to Iterator of Series UDF has similar characteristics and restrictions as Iterator of Series to Iterator of Series UDF. 
- The specified function takes an iterator of batches and outputs an iterator of batches. 
- It is also useful when the UDF execution requires initializing some state.

The differences are:
- The underlying Python function takes an iterator of a tuple of pandas Series.
- The wrapped pandas UDF takes multiple Spark columns as an input.

```{figure} img/chapter9/1e.png
---
align: center
---
```

Lets first understand the syntax

```{admonition} Syntax
<b>pyspark.sql.functions.array_sort(col)</b>

sorts the input array in ascending order. The elements of the input array must be orderable. Null elements will be placed at the end of the returned array.

<b>Parameters</b>
- col – name of column or expression
'''

<b>Input:  Spark data frame with map column </b>

In [105]:
from typing import Iterator, Tuple

import pandas as pd

from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([(1,2,),(3,4,),(5,6)], columns=["weight","height"])
df = spark.createDataFrame(pdf)


var_bc = spark.sparkContext.broadcast(5)
def calculate_complex_mul(var1,var2,var3):
    return var1+var2+var3


# # Declare the function and create the UDF
@pandas_udf("long")
def run_ml_model(iterator: Iterator[Tuple[pd.Series,  pd.Series]]) -> Iterator[pd.Series]:
    var = var_bc.value
    for a, b in iterator:
        yield calculate_complex_mul(a,b,var)

df_out_mul = df.select(run_ml_model("weight", "height"))
print("Input                     ",            "Output")
display_side_by_side(df.toPandas(),df_out_mul.toPandas())

Input                      Output


weight,height
1,2
3,4
5,6

"run_ml_model(weight, height)"
8
12
16


<a id='7'></a>

## 1f. Series to Scalar



```{figure} img/chapter9/1f.png
---
align: center
---
```

Lets first understand the syntax

```{admonition} Syntax
<b>pyspark.sql.functions.map_from_arrays(col1, col2)</b>

Creates a new map from two arrays.

<b>Parameters</b>
- col1 – name of column containing a set of keys. All elements should not be null
- col2 – name of column containing a set of values

'''

<b>Input:  Spark data frame  </b>

<b>Output: Mean of the input columns  </b>

In [107]:
import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df_sca = df.select(mean_udf(df['v']))
print("Input                     ",            "Output")
display_side_by_side(df.toPandas(),df_sca.toPandas())

Input                      Output


id,v
1,1.0
1,2.0
2,3.0
2,5.0
2,10.0

mean_udf(v)
4.2


In [108]:
df.groupby("id").agg(mean_udf(df['v'])).show()
w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+



In [None]:
<a id='8'></a>

## 1g. Grouped Map


Grouped map operations with Pandas instances are supported by DataFrame.groupby().applyInPandas() which requires a Python function that takes a pandas.DataFrame and return another pandas.DataFrame. It maps each group to each pandas.DataFrame in the Python function.

This API implements the “split-apply-combine” pattern which consists of three steps:

- Split the data into groups by using DataFrame.groupBy.
- Apply a function on each group. The input and output of the function are both pandas.DataFrame. The input data - contains all the rows and columns for each group.
- Combine the results into a new PySpark DataFrame.

To use groupBy().applyInPandas(), the user needs to define the following:

A Python function that defines the computation for each group.
A StructType object or a string that defines the schema of the output PySpark DataFrame.
The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See pandas.DataFrame on how to label columns when constructing a pandas.DataFrame.

Note that all data for a group will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied on groups and it is up to the user to ensure that the grouped data will fit into the available memory.

The following example shows how to use groupby().applyInPandas() to subtract the mean from each value in the group.

Setting Arrow Batch Size

Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to high memory usage in the JVM. To avoid possible out of memory exceptions, the size of the Arrow record batches can be adjusted by setting the conf “spark.sql.execution.arrow.maxRecordsPerBatch” to an integer that will determine the maximum number of rows for each batch. The default value is 10,000 records per batch. If the number of columns is large, the value should be adjusted accordingly. Using this limit, each data partition will be made into 1 or more record batches for processing.

```{admonition} Syntax
<b>pyspark.sql.GroupedData.applyInPandas(func, schema)</b>

Maps each group of the current DataFrame using a pandas udf and returns the result as a DataFrame.

The function should take a pandas.DataFrame and return another pandas.DataFrame. For each group, all columns are passed together as a pandas.DataFrame to the user-function and the returned pandas.DataFrame are combined as a DataFrame.

The schema should be a StructType describing the schema of the returned pandas.DataFrame. The column labels of the returned pandas.DataFrame must either match the field names in the defined schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. The length of the returned pandas.DataFrame can be arbitrary.

<b>Parameters</b>
- func – a Python native function that takes a pandas.DataFrame, and outputs a pandas.DataFrame.
- schema – the return type of the func in PySpark. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string.

'''

In [110]:
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df_group_pandas  = df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double")
print("Input                     ",            "Output")
display_side_by_side(df.toPandas(),df_group_pandas.toPandas())

Input                      Output


id,v
1,1.0
1,2.0
2,3.0
2,5.0
2,10.0

id,v
1,-0.5
1,0.5
2,-3.0
2,-1.0
2,4.0


<a id='8'></a>

## 1g. Map

Map operations with Pandas instances are supported by DataFrame.mapInPandas() which maps an iterator of pandas.DataFrames to another iterator of pandas.DataFrames that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame. 

The functions takes and outputs an iterator of pandas.DataFrame. 

It can return the output of arbitrary length in contrast to some Pandas UDFs although internally it works similarly with Series to Series Pandas UDF.

The following example shows how to use mapInPandas():

```{figure} img/chapter9/1g.png
---
align: center
---
```

Lets first understand the syntax

```{admonition} Syntax
<b>pyspark.sql.DataFrame.mapInPandas(func, schema)</b>

Maps an iterator of batches in the current DataFrame using a Python native function that takes and outputs a pandas DataFrame, and returns the result as a DataFrame.

The function should take an iterator of pandas.DataFrames and return another iterator of pandas.DataFrames. All columns are passed together as an iterator of pandas.DataFrames to the function and the returned iterator of pandas.DataFrames are combined as a DataFrame. Each pandas.DataFrame size can be controlled by spark.sql.execution.arrow.maxRecordsPerBatch.

<b>Parameters</b>
- func – a Python native function that takes an iterator of pandas.DataFrames, and outputs an iterator of pandas.DataFrames.
- schema – the return type of the func in PySpark. The value can be either a pyspark.sql.types.DataType object or a    DDL-formatted type string.



Maps an iterator of batches in the current DataFrame using a Python native function that takes and outputs a pandas DataFrame, and returns the result as a DataFrame.

The function should take an iterator of pandas.DataFrames and return another iterator of pandas.DataFrames. All columns are passed together as an iterator of pandas.DataFrames to the function and the returned iterator of pandas.DataFrames are combined as a DataFrame. Each pandas.DataFrame size can be controlled by spark.sql.execution.arrow.maxRecordsPerBatch.

Parameters
func – a Python native function that takes an iterator of pandas.DataFrames, and outputs an iterator of pandas.DataFrames.
schema – the return type of the func in PySpark. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string.

'''

In [113]:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df_mapin = df.mapInPandas(filter_func, schema=df.schema)
print("Input                     ",            "Output")
display_side_by_side(df.toPandas(),df_mapin.toPandas())

Input                      Output


id,age
1,21
2,30

id,age
1,21


<a id='8'></a>

## 1g. Co-grouped Map

Co-grouped map operations with Pandas instances are supported by DataFrame.groupby().cogroup().applyInPandas() which allows two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup.

It consists of the following steps:

- Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
- Apply a function to each cogroup. The input of the function is two pandas.DataFrame (with an optional tuple representing the key). The output of the function is a pandas.DataFrame.
- Combine the pandas.DataFrames from all groups into a new PySpark DataFrame.

To use groupBy().cogroup().applyInPandas(), the user needs to define the following:

A Python function that defines the computation for each cogroup.

A StructType object or a string that defines the schema of the output PySpark DataFrame.

The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. See pandas.DataFrame on how to label columns when constructing a pandas.DataFrame.

Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for maxRecordsPerBatch is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.



```{figure} img/chapter9/1g.png
---
align: center
---
```

Lets first understand the syntax

```{admonition} Syntax
<b>pyspark.sql.PandasCogroupedOps(gd1, gd2)</b>

Applies a function to each cogroup using pandas and returns the result as a DataFrame.

The function should take two pandas.DataFrames and return another pandas.DataFrame. 

For each side of the cogroup, all columns are passed together as a pandas.DataFrame to the user-function and the returned pandas.DataFrame are combined as a DataFrame.

The schema should be a StructType describing the schema of the returned pandas.DataFrame. 
The column labels of the returned pandas.DataFrame must either match the field names in the defined schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. The length of the returned pandas.DataFrame can be arbitrary.

<b>Parameters</b>

- func – a Python native function that takes two pandas.DataFrames, and outputs a pandas.DataFrame, or that takes one tuple (grouping keys) and two pandas DataFrame``s, and outputs a pandas ``DataFrame.
- schema – the return type of the func in PySpark. The value can be either a pyspark.sql.types.DataType object or a DDL-formatted type string.
'''

In [123]:
import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df_out = df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string")
print("                     Input                                ",            "Output")
display_side_by_side(df1.toPandas(),df2.toPandas(), df_out.toPandas())

                     Input                                 Output


time,id,v1
20000101,1,1.0
20000101,2,2.0
20000102,1,3.0
20000102,2,4.0

time,id,v2
20000101,1,x
20000101,2,y

time,id,v1,v2
20000101,1,1.0,x
20000102,1,3.0,x
20000101,2,2.0,y
20000102,2,4.0,y


<a id='9'></a>

<a id='9'></a>