# Funkcje oknowe (Window functions)

https://towardsdatascience.com/a-guide-to-advanced-sql-window-functions-f63f2642cbf9

In [1]:
# !pip install pyspark
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .master("local") \
    .appName("Windows functions") \
    .getOrCreate()

23/08/20 09:12:51 WARN Utils: Your hostname, md-ASUS resolves to a loopback address: 127.0.1.1; using 192.168.30.211 instead (on interface eth0)
23/08/20 09:12:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/20 09:12:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import random
from pyspark.sql import Row

random.seed(42)

geo_id = [random.choice(["regA", "regB", "regC", "regD", "regE", "regF"]) for x in range(400)]

prod_id = [random.choice(["prodA", "prodB", "prodC", "prodD", "prodE", "prodF",
                          "prodG", "prodH", "prodI", "prodJ", "prodK", "prodL", "prodM"]) for x in range(400)]

date = [random.choice(["2015-", "2016-", "2017-"]) +
        random.choice(["01-", "02-", "03-", "04-", "05-", "06-", "07-", "08-", "09-", "10-", "11-", "12-"]) + "01"
        for x in range(400)]

value = [random.uniform(10000, 100000) for x in range(400)]

volume = [random.uniform(1000, 10000) for x in range(400)]

In [3]:
df = spark.createDataFrame([Row(prod=p, geo=g, val=v, vol=vl, dt=d)
                            for p, g, v, vl, d in zip(prod_id, geo_id, value, volume, date)])

df.printSchema()

root
 |-- prod: string (nullable = true)
 |-- geo: string (nullable = true)
 |-- val: double (nullable = true)
 |-- vol: double (nullable = true)
 |-- dt: string (nullable = true)



In [4]:
from pyspark.sql import functions as f

df = df.withColumn("dt", f.to_date(df["dt"]))  # zamiana 'string' na 'datetime'
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-----+----+------------------+------------------+----------+
| prod| geo|               val|               vol|        dt|
+-----+----+------------------+------------------+----------+
|prodB|regF| 95087.40786730956| 5119.116197026179|2016-06-01|
|prodD|regA|13902.853320925453| 3640.983140575281|2016-05-01|
|prodF|regA| 80490.42963823416| 1394.256480932108|2016-05-01|
|prodE|regF| 88028.28169838544| 2795.228503436695|2015-08-01|
|prodC|regC| 56930.60932417757| 1377.153477373438|2015-12-01|
|prodH|regB|51223.826988791385| 9400.338819553575|2017-01-01|
|prodI|regB| 96762.35648098258| 5638.452303290489|2016-04-01|
|prodL|regB|15474.286674505525|  9902.10432066511|2017-02-01|
|prodE|regF|  53108.3719898527| 5887.276278887673|2017-01-01|
|prodJ|regA| 46145.55290613044|3279.8238868235567|2015-04-01|
|prodM|regF| 71748.77464560093| 7779.618269369784|2015-01-01|
|prodK|regF|54124.196872980276| 2719.930876605198|2017-03-01|
|prodI|regE| 91873.07462037064| 4212.767584318271|2015-03-01|
|prodA|r

                                                                                

## Ranking - różne warianty segregowania

In [5]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("prod").orderBy("dt")

df.withColumn("ranked", f.rank().over(windowSpec)) \
    .withColumn("ranked_d", f.dense_rank().over(windowSpec)) \
    .withColumn("row_num", f.row_number().over(windowSpec)) \
    .show(50)

+-----+----+------------------+------------------+----------+------+--------+-------+
| prod| geo|               val|               vol|        dt|ranked|ranked_d|row_num|
+-----+----+------------------+------------------+----------+------+--------+-------+
|prodA|regA| 76166.82863310802|  8890.68851126131|2015-01-01|     1|       1|      1|
|prodA|regA| 72863.19665306929| 5582.102646026639|2015-02-01|     2|       2|      2|
|prodA|regE|  85146.6789354143| 7265.541705821403|2015-03-01|     3|       3|      3|
|prodA|regE| 69106.26726893369| 7111.558512220555|2015-04-01|     4|       4|      4|
|prodA|regC|   82649.923403711|6846.0466303542835|2015-05-01|     5|       5|      5|
|prodA|regF| 80966.80146515637| 4572.951168504696|2015-05-01|     5|       5|      6|
|prodA|regA| 74895.46108715344|1605.1002675589793|2015-06-01|     7|       6|      7|
|prodA|regC| 69859.70767542781|7207.8832760093765|2015-07-01|     8|       7|      8|
|prodA|regA|35929.142023530745| 7028.505684037936|2015

## Różnica od pierwszej wartości (różnicowanie)

In [6]:
windowSpec = Window.partitionBy("prod").orderBy("dt")

df.withColumn("diff_from_first", df.val - f.first(df.val).over(windowSpec)).show(50)

+-----+----+------------------+------------------+----------+-------------------+
| prod| geo|               val|               vol|        dt|    diff_from_first|
+-----+----+------------------+------------------+----------+-------------------+
|prodA|regA| 76166.82863310802|  8890.68851126131|2015-01-01|                0.0|
|prodA|regA| 72863.19665306929| 5582.102646026639|2015-02-01| -3303.631980038728|
|prodA|regE|  85146.6789354143| 7265.541705821403|2015-03-01|   8979.85030230628|
|prodA|regE| 69106.26726893369| 7111.558512220555|2015-04-01| -7060.561364174326|
|prodA|regC|   82649.923403711|6846.0466303542835|2015-05-01|  6483.094770602984|
|prodA|regF| 80966.80146515637| 4572.951168504696|2015-05-01| 4799.9728320483555|
|prodA|regA| 74895.46108715344|1605.1002675589793|2015-06-01|-1271.3675459545775|
|prodA|regC| 69859.70767542781|7207.8832760093765|2015-07-01| -6307.120957680207|
|prodA|regA|35929.142023530745| 7028.505684037936|2015-07-01| -40237.68660957727|
|prodA|regD| 906

## Średnia ruchoma (krocząca)

In [7]:
windowSpec = Window.partitionBy("prod").orderBy("dt").rowsBetween(-1, 1)  # okno o szerokości=3

df.withColumn("moving_avg", f.avg(df.val).over(windowSpec)).show(50)

+-----+----+------------------+------------------+----------+------------------+
| prod| geo|               val|               vol|        dt|        moving_avg|
+-----+----+------------------+------------------+----------+------------------+
|prodA|regA| 76166.82863310802|  8890.68851126131|2015-01-01| 74515.01264308864|
|prodA|regA| 72863.19665306929| 5582.102646026639|2015-02-01| 78058.90140719719|
|prodA|regE|  85146.6789354143| 7265.541705821403|2015-03-01| 75705.38095247243|
|prodA|regE| 69106.26726893369| 7111.558512220555|2015-04-01| 78967.62320268633|
|prodA|regC|   82649.923403711|6846.0466303542835|2015-05-01| 77574.33071260036|
|prodA|regF| 80966.80146515637| 4572.951168504696|2015-05-01| 79504.06198534027|
|prodA|regA| 74895.46108715344|1605.1002675589793|2015-06-01|  75240.6567425792|
|prodA|regC| 69859.70767542781|7207.8832760093765|2015-07-01| 60228.10359537066|
|prodA|regA|35929.142023530745| 7028.505684037936|2015-07-01| 65472.58619969026|
|prodA|regD| 90628.908900112

## Suma od pierwszego do bieżącego rekordu (kumulanta)

In [8]:
windowSpec = Window.partitionBy("prod").orderBy("dt")

df.withColumn("sum_from_start", f.sum(df.val).over(windowSpec)).show(50)

+-----+----+------------------+------------------+----------+------------------+
| prod| geo|               val|               vol|        dt|    sum_from_start|
+-----+----+------------------+------------------+----------+------------------+
|prodA|regA| 76166.82863310802|  8890.68851126131|2015-01-01| 76166.82863310802|
|prodA|regA| 72863.19665306929| 5582.102646026639|2015-02-01| 149030.0252861773|
|prodA|regE|  85146.6789354143| 7265.541705821403|2015-03-01| 234176.7042215916|
|prodA|regE| 69106.26726893369| 7111.558512220555|2015-04-01| 303282.9714905253|
|prodA|regC|   82649.923403711|6846.0466303542835|2015-05-01|466899.69635939266|
|prodA|regF| 80966.80146515637| 4572.951168504696|2015-05-01|466899.69635939266|
|prodA|regA| 74895.46108715344|1605.1002675589793|2015-06-01| 541795.1574465461|
|prodA|regC| 69859.70767542781|7207.8832760093765|2015-07-01| 647584.0071455047|
|prodA|regA|35929.142023530745| 7028.505684037936|2015-07-01| 647584.0071455047|
|prodA|regD| 90628.908900112

## ZADANIA

> 1. Pogrupuj dane po dacie i produkcie po czym porównaj wolumen produktów ze średnim wolumenem z trzech wcześniejszych okresów

In [9]:
windowSpec = Window.partitionBy("prod").orderBy("dt").rowsBetween(-3,-1)

df.groupBy("dt", "prod") \
    .agg(f.sum("vol").alias("vol")) \
    .withColumn("avg_past_3", f.avg(f.col("vol")).over(windowSpec)) \
    .withColumn("diff_avg_past_3", f.col("vol") - f.avg(f.col("vol")).over(windowSpec)).show()

+----------+-----+------------------+------------------+-------------------+
|        dt| prod|               vol|        avg_past_3|    diff_avg_past_3|
+----------+-----+------------------+------------------+-------------------+
|2015-01-01|prodA|  8890.68851126131|              null|               null|
|2015-02-01|prodA| 5582.102646026639|  8890.68851126131|-3308.5858652346715|
|2015-03-01|prodA| 7265.541705821403| 7236.395578643975| 29.146127177428752|
|2015-04-01|prodA| 7111.558512220555| 7246.110954369785|-134.55244214923005|
|2015-05-01|prodA| 11418.99779885898| 6653.067621356199|  4765.930177502781|
|2015-06-01|prodA|1605.1002675589793| 8598.699338966979|    -6993.599071408|
|2015-07-01|prodA|14236.388960047312| 6711.885526212838|  7524.503433834474|
|2015-08-01|prodA| 13617.98177963201| 9086.829008821756|  4531.152770810253|
|2015-10-01|prodA| 4117.271626860549| 9819.823669079435| -5702.552042218886|
|2015-11-01|prodA| 6190.941083296411|10657.214122179956| -4466.273038883544|

> 2. Stwórz kolumnę z rankingiem opartym na dacie dla kombinacji produkt-region


In [12]:
windowSpec = Window.partitionBy("prod", "geo").orderBy("dt")

df.withColumn("ranked", f.rank().over(windowSpec)).show()

+-----+----+------------------+------------------+----------+------+
| prod| geo|               val|               vol|        dt|ranked|
+-----+----+------------------+------------------+----------+------+
|prodA|regA| 76166.82863310802|  8890.68851126131|2015-01-01|     1|
|prodA|regA| 72863.19665306929| 5582.102646026639|2015-02-01|     2|
|prodA|regA| 74895.46108715344|1605.1002675589793|2015-06-01|     3|
|prodA|regA|35929.142023530745| 7028.505684037936|2015-07-01|     4|
|prodA|regA| 72646.47897183853| 4117.271626860549|2015-10-01|     5|
|prodA|regA|18390.709153330965| 3470.998240673718|2016-10-01|     6|
|prodA|regA|16614.164418980545| 8027.574102805825|2016-11-01|     7|
|prodA|regA| 97092.68569234702|2791.5209339720495|2016-11-01|     7|
|prodA|regA| 84859.46996273691| 9983.355379957442|2017-11-01|     9|
|prodA|regA|48692.227005810135| 5964.822405337847|2017-11-01|     9|
|prodA|regB| 20365.60353974184|3255.4477808379916|2016-02-01|     1|
|prodA|regB| 40680.63835923995|158

> 3. Oblicz różnicę w wolumenie pomiędzy następującymi po sobie datami dla regionów

In [11]:
windowSpec = Window.partitionBy("geo").orderBy("dt")

df.groupBy("geo", "dt") \
    .agg(f.sum("vol").alias("vol")) \
    .withColumn("prev", f.lag(f.col("vol")).over(windowSpec)) \
    .withColumn("diff_from_prev", f.col("vol") - f.col("prev")).show()

+----+----------+------------------+------------------+-------------------+
| geo|        dt|               vol|              prev|     diff_from_prev|
+----+----------+------------------+------------------+-------------------+
|regA|2015-01-01|  8890.68851126131|              null|               null|
|regA|2015-02-01| 18441.19938267378|  8890.68851126131|   9550.51087141247|
|regA|2015-03-01|14278.838642846531| 18441.19938267378| -4162.360739827251|
|regA|2015-04-01| 7094.258174584014|14278.838642846531| -7184.580468262517|
|regA|2015-05-01| 11905.43345355526| 7094.258174584014|  4811.175278971246|
|regA|2015-06-01|10510.696861187034| 11905.43345355526|-1394.7365923682264|
|regA|2015-07-01|10275.010561878516|10510.696861187034|-235.68629930851785|
|regA|2015-08-01|14982.787846579866|10275.010561878516|  4707.777284701349|
|regA|2015-09-01| 9775.745338724057|14982.787846579866| -5207.042507855809|
|regA|2015-10-01|15919.128299342528| 9775.745338724057|  6143.382960618472|
|regA|2015-1

> 4. Oblicz różnicę rok do roku w wartości i wolumenie dla produktów

In [13]:
windowSpec = Window.partitionBy("prod").orderBy("year")

df.withColumn("year", f.year("dt")) \
    .groupBy("year", "prod") \
    .agg(f.sum("vol").alias("vol"), f.sum("val").alias("val")) \
    .withColumn("vol_diff", f.col("vol") - f.lag(f.col("vol")).over(windowSpec)) \
    .withColumn("val_diff", f.col("val") - f.lag(f.col("val")).over(windowSpec)).show()

+----+-----+------------------+------------------+-------------------+-------------------+
|year| prod|               vol|               val|           vol_diff|           val_diff|
+----+-----+------------------+------------------+-------------------+-------------------+
|2015|prodA| 85926.27724359691|  940365.555332412|               null|               null|
|2016|prodA| 44036.23110696496| 293624.4981208674| -41890.04613663195| -646741.0572115446|
|2017|prodA|26598.881579084387|316809.85504606774| -17437.34952788057|  23185.35692520032|
|2015|prodB|17841.448640157232|42757.389755904675|               null|               null|
|2016|prodB| 93794.69250873126| 926651.4973501854|  75953.24386857402|  883894.1075942807|
|2017|prodB|45580.045786015726|402188.95044995466| -48214.64672271553| -524462.5469002307|
|2015|prodC|53595.687006463966| 639695.9475292781|               null|               null|
|2016|prodC| 65339.18471943844| 525533.5446220278| 11743.497712974473|-114162.40290725033|

> 5. Oblicz udział w całkowitej wartości poszczególnych produktów w danym roku dla każdego regionu

In [14]:
windowSpec = Window.partitionBy("prod", "year")

df.withColumn("year", f.year("dt")) \
    .groupBy("year", "prod", "geo") \
    .agg(f.sum("val").alias("val")) \
    .withColumn("total_val", f.sum(f.col("val")).over(windowSpec)) \
    .withColumn("share", f.col("val") / f.col("total_val")).show()

+----+-----+----+------------------+------------------+-------------------+
|year| prod| geo|               val|         total_val|              share|
+----+-----+----+------------------+------------------+-------------------+
|2015|prodA|regC| 152509.6310791388|  940365.555332412| 0.1621812179469162|
|2015|prodA|regF| 80966.80146515637|  940365.555332412|0.08610141131395994|
|2015|prodA|regD|112423.32262876796|  940365.555332412| 0.1195527866703148|
|2015|prodA|regE|261964.69279064887|  940365.555332412| 0.2785775077629745|
|2015|prodA|regA|    332501.1073687|  940365.555332412|0.35358707630583447|
|2016|prodA|regD| 82775.44100837356| 293624.4981208675|0.28190917834893975|
|2016|prodA|regB|61046.241898981796| 293624.4981208675|0.20790581947236822|
|2016|prodA|regA|132097.55926465854| 293624.4981208675|  0.449886028277797|
|2016|prodA|regE| 17705.25594885356| 293624.4981208675|0.06029897390089493|
|2017|prodA|regA|133551.69696854704| 316809.8550460678| 0.4215515863580919|
|2017|prodA|