In [6]:
from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, IntegerType, LongType, DecimalType
import os
from pyspark.sql.functions import lit, count,sum,avg,collect_list,min,max,percentile_approx,stddev_pop,stddev_samp,var_pop,var_samp,first,last,col
from pyspark.sql.window import Window

In [2]:
local = True

if local:
    spark = SparkSession.builder \
        .master("local[4]") \
        .appName("VTLAnalytic")\
        .getOrCreate()
else:
    spark = SparkSession.builder\
        .master("k8s://https://kubernetes.default.svc:443") \
        .appName("VTLAnalytic")\
        .config("spark.kubernetes.container.image", "inseefrlab/jupyter-datascience:py3.9.7-spark3.2.0")\
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", os.environ['KUBERNETES_SERVICE_ACCOUNT'])\
        .config("spark.executor.instances", "4")\
        .config("spark.executor.memory", "8g")\
        .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE'])\
        .getOrCreate()

23/12/18 10:44:27 WARN Utils: Your hostname, pengfei-Virtual-Machine resolves to a loopback address: 127.0.1.1; using 10.50.2.80 instead (on interface eth0)
23/12/18 10:44:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/12/18 10:44:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [27]:
test_data=[("1", "XX", 1993, 3, 1.0),
    ("2", "XX", 1994, 4, 9.0),
    ("3", "XX", 1995, 7, 5.0)]


test_schema=StructType([StructField("Row_id",StringType(),True),
                   StructField("Id_2",StringType(),True),
                   StructField("Year",IntegerType(),True),
                   StructField("Me_1",IntegerType(),True),
                   StructField("Me_2",DoubleType(),True)])

test_ds=spark.createDataFrame(test_data, test_schema)
test_ds.show()

+------+----+----+----+----+
|Row_id|Id_2|Year|Me_1|Me_2|
+------+----+----+----+----+
|     1|  XX|1993|   3| 1.0|
|     2|  XX|1994|   4| 9.0|
|     3|  XX|1995|   7| 5.0|
+------+----+----+----+----+


In [3]:
data1=[("A", "XX", 2000, 3, 1.0),
    ("A", "XX", 2001, 4, 9.0),
    ("A", "XX", 2002, 7, 5.0),
    ("A", "XX", 2003, 6, 8.0),
    ("A", "YY", 2000, 9, 3.0),
    ("A", "YY", 2001, 5, 4.0),
    ("A", "YY", 2002, 10, 2.0),
    ("A", "YY", 2003, 5, 7.0)]

schema1=StructType([StructField("Id_1",StringType(),True),
                   StructField("Id_2",StringType(),True),
                   StructField("Year",IntegerType(),True),
                   StructField("Me_1",IntegerType(),True),
                   StructField("Me_2",DoubleType(),True)])

ds1=spark.createDataFrame(data1, schema1)
ds1.show()

                                                                                

+----+----+----+----+----+
|Id_1|Id_2|Year|Me_1|Me_2|
+----+----+----+----+----+
|   A|  XX|2000|   3| 1.0|
|   A|  XX|2001|   4| 9.0|
|   A|  XX|2002|   7| 5.0|
|   A|  XX|2003|   6| 8.0|
|   A|  YY|2000|   9| 3.0|
|   A|  YY|2001|   5| 4.0|
|   A|  YY|2002|  10| 2.0|
|   A|  YY|2003|   5| 7.0|
+----+----+----+----+----+


In [4]:
data2=[("A", "XX", 1993, 3, 1.0),
    ("A", "XX", 1994, 4, 9.0),
    ("A", "XX", 1995, 7, 5.0),
    ("A", "XX", 1996, 6, 8.0),
    ("A", "YY", 1993, 9, 3.0),
    ("A", "YY", 1994, 5, 4.0),
    ("A", "YY", 1995, 10, 2.0),
    ("A", "YY", 1996, 2, 7.0)]


schema2=StructType([StructField("Id_1",StringType(),True),
                   StructField("Id_2",StringType(),True),
                   StructField("Year",IntegerType(),True),
                   StructField("Me_1",IntegerType(),True),
                   StructField("Me_2",DoubleType(),True)])

ds2=spark.createDataFrame(data2, schema2)
ds2.show()

+----+----+----+----+----+
|Id_1|Id_2|Year|Me_1|Me_2|
+----+----+----+----+----+
|   A|  XX|1993|   3| 1.0|
|   A|  XX|1994|   4| 9.0|
|   A|  XX|1995|   7| 5.0|
|   A|  XX|1996|   6| 8.0|
|   A|  YY|1993|   9| 3.0|
|   A|  YY|1994|   5| 4.0|
|   A|  YY|1995|  10| 2.0|
|   A|  YY|1996|   2| 7.0|
+----+----+----+----+----+


## 1. Fist function

### 1.1 Fist with range between

**Range between**: It defines a sliding window as a numerical offset relative to the current data point (according to the orderBy clause). In another word, it defines the `frame boundaries based on the values of the order column in the partition that fall within a specified range relative to the current row's value`. 


With the below example, the frame is ordered by the value of Column


In [29]:
win_with_custom_range = Window.partitionBy("Id_2").orderBy("Year").rangeBetween(-1,1)
res = test_ds.withColumn("first_Me_1", first(col("Me_1")).over(win_with_custom_range))\
         .withColumn("first_Me_2", first(col("Me_2")).over(win_with_custom_range))
print(res.show())

+------+----+----+----+----+----------+----------+
|Row_id|Id_2|Year|Me_1|Me_2|first_Me_1|first_Me_2|
+------+----+----+----+----+----------+----------+
|     1|  XX|1993|   3| 1.0|         3|       1.0|
|     2|  XX|1994|   4| 9.0|         3|       1.0|
|     3|  XX|1995|   7| 5.0|         4|       9.0|
+------+----+----+----+----+----------+----------+

None


```
# VTL syntax: 

res := first_value ( ds2 over ( partition by Id_1 order by Year range between -1 preceding and 1 following) );

# pyspark syntax: 
win_with_custom_range = Window.partitionBy("Id_1").orderBy("Year").rangeBetween(-1,1)
res = ds2.withColumn("first_Me_1", first(col("Me_1")).over(win_with_custom_range))\
         .withColumn("first_Me_2", first(col("Me_2")).over(win_with_custom_range))

# input
+----+----+----+----+----+
|Id_1|Id_2|Year|Me_1|Me_2|
+----+----+----+----+----+
|   A|  XX|1993|   3| 1.0|
|   A|  XX|1994|   4| 9.0|
|   A|  XX|1995|   7| 5.0|
|   A|  XX|1996|   6| 8.0|
|   A|  YY|1993|   9| 3.0|
|   A|  YY|1994|   5| 4.0|
|   A|  YY|1995|  10| 2.0|
|   A|  YY|1996|   2| 7.0|
+----+----+----+----+----+

# output
+----+----+----+----+----+----------+----------+
|Id_1|Id_2|Year|Me_1|Me_2|first_Me_1|first_Me_2|
+----+----+----+----+----+----------+----------+
|   A|  XX|1993|   3| 1.0|         3|       1.0|
|   A|  YY|1993|   9| 3.0|         3|       1.0|
|   A|  XX|1994|   4| 9.0|         3|       1.0|
|   A|  YY|1994|   5| 4.0|         3|       1.0|
|   A|  XX|1995|   7| 5.0|         4|       9.0|
|   A|  YY|1995|  10| 2.0|         4|       9.0|
|   A|  XX|1996|   6| 8.0|         7|       5.0|
|   A|  YY|1996|   2| 7.0|         7|       5.0|
+----+----+----+----+----+----------+----------+
```

In [22]:
# win_with_custom_range = Window.partitionBy("Id_1").orderBy("Year").rangeBetween(-1,1)
win_with_custom_range = Window.partitionBy("Id_1","Id_2").orderBy("Year").rangeBetween(-1,1)
res = ds2.withColumn("first_Me_1", first(col("Me_1")).over(win_with_custom_range))\
         .withColumn("first_Me_2", first(col("Me_2")).over(win_with_custom_range))

In [24]:
print(res.show(20))

+----+----+----+----+----+----------+----------+
|Id_1|Id_2|Year|Me_1|Me_2|first_Me_1|first_Me_2|
+----+----+----+----+----+----------+----------+
|   A|  XX|1993|   3| 1.0|         3|       1.0|
|   A|  XX|1994|   4| 9.0|         3|       1.0|
|   A|  XX|1995|   7| 5.0|         4|       9.0|
|   A|  XX|1996|   6| 8.0|         7|       5.0|
|   A|  YY|1993|   9| 3.0|         9|       3.0|
|   A|  YY|1994|   5| 4.0|         9|       3.0|
|   A|  YY|1995|  10| 2.0|         5|       4.0|
|   A|  YY|1996|   2| 7.0|        10|       2.0|
+----+----+----+----+----+----------+----------+

None


### 1.2 Fist with data points(rows) between

**data points between**: defines a sliding window using a specified number of preceding and following data points relative to the current data point (according to the orderBy clause). In another word, it defines the frame boundaries based on `the number of rows before and after the current row` within the partition.

For example if I have 3 rows in a dataset 
```
+----+----+----+----+----+
|Row_id|Id_2|Year|Me_1|Me_2|
+----+----+----+----+----+
|   1|  XX|1993|   3| 1.0|
|   2|  XX|1994|   4| 9.0|
|   3|  XX|1995|   7| 5.0|
```



+------+----+----+----+----+
|Row_id|Id_2|Year|Me_1|Me_2|
+------+----+----+----+----+
|     1|  XX|1993|   3| 1.0|
|     2|  XX|1994|   4| 9.0|
|     3|  XX|1995|   7| 5.0|
+------+----+----+----+----+


In [26]:
win_with_custom_range = Window.partitionBy("Id_2").orderBy("Year").rowsBetween(-1,1)
res = test_ds.withColumn("first_Me_1", first(col("Me_1")).over(win_with_custom_range))\
         .withColumn("first_Me_2", first(col("Me_2")).over(win_with_custom_range))
print(res.show())

+------+----+----+----+----+----------+----------+
|Row_id|Id_2|Year|Me_1|Me_2|first_Me_1|first_Me_2|
+------+----+----+----+----+----------+----------+
|     1|  XX|1993|   3| 1.0|         3|       1.0|
|     2|  XX|1994|   4| 9.0|         3|       1.0|
|     3|  XX|1995|   7| 5.0|         4|       9.0|
+------+----+----+----+----+----------+----------+

None


In [16]:
win_with_custom_range = Window.partitionBy("Id_1","Id_2").orderBy("Year").rowsBetween(-2,2)
res = ds2.withColumn("first_Me_1", first(col("Me_1")).over(win_with_custom_range))\
         .withColumn("first_Me_2", first(col("Me_2")).over(win_with_custom_range))

In [17]:
res.show(20)

+----+----+----+----+----+----------+----------+
|Id_1|Id_2|Year|Me_1|Me_2|first_Me_1|first_Me_2|
+----+----+----+----+----+----------+----------+
|   A|  XX|1993|   3| 1.0|         3|       1.0|
|   A|  XX|1994|   4| 9.0|         3|       1.0|
|   A|  XX|1995|   7| 5.0|         3|       1.0|
|   A|  XX|1996|   6| 8.0|         4|       9.0|
|   A|  YY|1993|   9| 3.0|         9|       3.0|
|   A|  YY|1994|   5| 4.0|         9|       3.0|
|   A|  YY|1995|  10| 2.0|         9|       3.0|
|   A|  YY|1996|   2| 7.0|         5|       4.0|
+----+----+----+----+----+----------+----------+
