<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Introduction" data-toc-modified-id="Introduction-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Introduction</a></span></li><li><span><a href="#Weighted-moving-average" data-toc-modified-id="Weighted-moving-average-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Weighted moving average</a></span></li><li><span><a href="#Weighted-Rolling-Average" data-toc-modified-id="Weighted-Rolling-Average-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Weighted Rolling Average</a></span></li></ul></div>

# Introduction
- https://stackoverflow.com/questions/47622447/weighted-moving-average-in-pyspark

Weighted moving average in pyspark.

In [48]:
import numpy as np
import pandas as pd

pd.set_option('display.max_colwidth',500)

In [49]:
# pyspark
import pyspark
spark = pyspark.sql.SparkSession.builder.appName('app').getOrCreate()

# sql
from pyspark.sql.functions import col as _col
from pyspark.sql.functions import udf

# @udf("integer") def myfunc(x,y): return x - y
# stddev format_number date_format, dayofyear, when
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import (mean as _mean, min as _min,
                                   max as _max, avg as _avg,
                                   when as _when
                                  )

from pyspark.sql.types import (StructField,StringType,
                               IntegerType, FloatType,
                               DoubleType,StructType)

from pyspark import SparkConf, SparkContext, SQLContext

sc = spark.sparkContext
sqlContext = SQLContext(sc) 
sqc = sqlContext
# spark_df = sqlContext.createDataFrame(pandas_df)

# Weighted moving average

In [50]:
from pyspark.sql.functions import coalesce, lit, col, lead, lag
from operator import add
from functools import reduce

def weighted_average(c, window, offsets, weights):
    assert len(weights) == len(offsets)

    def value(i):
        if i < 0: return lag(c, -i).over(window)
        if i > 0: return lead(c, i).over(window)
        return c

    # Create a list of Columns
    # - `value_i * weight_i` if `value_i IS NOT NULL` 
    # - literal 0 otherwise
    values = [coalesce(value(i) * w, lit(0)) for i, w in zip(offsets, weights)]

    # or sum(values, lit(0))
    return reduce(add, values, lit(0))

In [51]:
from pyspark.sql.window import Window

sdf = spark.createDataFrame([
    ("a", 1, 1.4), ("a", 2, 8.0), ("a", 3, -1.0), ("a", 4, 2.4),
    ("a", 5, 99.0), ("a", 6, 3.0), ("a", 7, -1.0), ("a", 8, 0.0)
]).toDF("id", "time", "value")

w = Window.partitionBy("id").orderBy("time")
offsets, delays =  [-2, -1, 0, 1, 2], [0.1, 0.20, 0.4, 0.20, 0.1]

result = sdf.withColumn("avg", weighted_average(
    col("value"), w, offsets, delays
))
result.toPandas()

Unnamed: 0,id,time,value,avg
0,a,1,1.4,2.06
1,a,2,8.0,3.52
2,a,3,-1.0,11.72
3,a,4,2.4,21.66
4,a,5,99.0,40.48
5,a,6,3.0,21.04
6,a,7,-1.0,10.1
7,a,8,0.0,0.1


In [52]:
"""
time delay value weight value*wt
1    0     1.4   0.4    0.56
2    1     8     0.2    1.6
3    2     -1    0.1    -0.1
=============================
                         2.06
""";

In [53]:
"""
time delay value weight value*wt
1    -2    1.4   0.1    0.14
2    -1    8.0   0.2    1.6
3    0     -1.0  0.4    -0.4
4    1     2.4   0.2    0.48
5    2     99.0  0.1    9.9
==============================
                        11.72
""";

In [54]:
result.withColumn(
 "normalization_factor",
 weighted_average(lit(1), w, offsets, delays)
).withColumn(
 "normalized_avg",
  col("avg") / col("normalization_factor")
).toPandas()

Unnamed: 0,id,time,value,avg,normalization_factor,normalized_avg
0,a,1,1.4,2.06,0.7,2.942857
1,a,2,8.0,3.52,0.9,3.911111
2,a,3,-1.0,11.72,1.0,11.72
3,a,4,2.4,21.66,1.0,21.66
4,a,5,99.0,40.48,1.0,40.48
5,a,6,3.0,21.04,1.0,21.04
6,a,7,-1.0,10.1,0.9,11.222222
7,a,8,0.0,0.1,0.7,0.142857


# Weighted Rolling Average

- https://stackoverflow.com/questions/63158118/efficiently-calculating-weighted-rolling-average-in-pyspark-with-some-caveats#63240195

I’m trying to calculate a rolling weighted avg over a window (partition by id1, id2 ORDER BY unixTime) in Pyspark and wanted to know if anyone had ideas on how to do this.

The rolling avg will take the current row’s value for a column, the 9 previous row values for that column and the 9 following row values for that column and weight each value based on how for it is from the row. So the current row is weighted 10x and the lag 1/lead 1 values are weighted 9x.

If none of the values are null, then the denominator for the weighted avg would be 100. The one caveat is that if there are null values, we still want to calculate a moving average (unless a little over 1/2 of the values are null).

So, for example, if the 9 values before the current val are null, the denominator would be 55. IF over 1/2 the values are null, then we would output NULL for the weighted average. We could also use the logic where we say if the denominator is less than 40 or something, output null.

I've attached a screenshot to explain what I am saying in case it is confusing, hopefully this clears things up:

![](https://i.stack.imgur.com/osPpz.png)

In [55]:
from pyspark.sql.functions import expr, sort_array, collect_list, struct
from pyspark.sql import Window

sdf = spark.createDataFrame([
    (0, 0.5), (1, 0.6), (2, 0.65), (3, 0.7), (4, 0.77),
    (5, 0.8), (6, 0.7), (7, 0.9), (8, 0.99), (9, 0.95)
], ["time", "val"])

sdf.show()

+----+----+
|time| val|
+----+----+
|   0| 0.5|
|   1| 0.6|
|   2|0.65|
|   3| 0.7|
|   4|0.77|
|   5| 0.8|
|   6| 0.7|
|   7| 0.9|
|   8|0.99|
|   9|0.95|
+----+----+



In [56]:
N = 3

w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)

# note that the index for array_position is 1-based, `i` in transform function is 0-based
sdf1 = sdf.withColumn('data', sort_array(collect_list(struct('time','val')).over(w1))) \
    .withColumn('idx', expr("array_position(data, (time,val))-1")) \
    .withColumn('weights', expr("transform(data, (x,i) ->  10 - abs(i-idx))"))

sdf1.toPandas()

Unnamed: 0,time,val,data,idx,weights
0,0,0.5,"[(0, 0.5), (1, 0.6), (2, 0.65), (3, 0.7)]",0,"[10, 9, 8, 7]"
1,1,0.6,"[(0, 0.5), (1, 0.6), (2, 0.65), (3, 0.7), (4, 0.77)]",1,"[9, 10, 9, 8, 7]"
2,2,0.65,"[(0, 0.5), (1, 0.6), (2, 0.65), (3, 0.7), (4, 0.77), (5, 0.8)]",2,"[8, 9, 10, 9, 8, 7]"
3,3,0.7,"[(0, 0.5), (1, 0.6), (2, 0.65), (3, 0.7), (4, 0.77), (5, 0.8), (6, 0.7)]",3,"[7, 8, 9, 10, 9, 8, 7]"
4,4,0.77,"[(1, 0.6), (2, 0.65), (3, 0.7), (4, 0.77), (5, 0.8), (6, 0.7), (7, 0.9)]",3,"[7, 8, 9, 10, 9, 8, 7]"
5,5,0.8,"[(2, 0.65), (3, 0.7), (4, 0.77), (5, 0.8), (6, 0.7), (7, 0.9), (8, 0.99)]",3,"[7, 8, 9, 10, 9, 8, 7]"
6,6,0.7,"[(3, 0.7), (4, 0.77), (5, 0.8), (6, 0.7), (7, 0.9), (8, 0.99), (9, 0.95)]",3,"[7, 8, 9, 10, 9, 8, 7]"
7,7,0.9,"[(4, 0.77), (5, 0.8), (6, 0.7), (7, 0.9), (8, 0.99), (9, 0.95)]",3,"[7, 8, 9, 10, 9, 8]"
8,8,0.99,"[(5, 0.8), (6, 0.7), (7, 0.9), (8, 0.99), (9, 0.95)]",3,"[7, 8, 9, 10, 9]"
9,9,0.95,"[(6, 0.7), (7, 0.9), (8, 0.99), (9, 0.95)]",3,"[7, 8, 9, 10]"


In [57]:
N = 9

w1 = Window.partitionBy().orderBy('time').rowsBetween(-N,N)


sdf2 = (sdf
# data
.withColumn('data',          sort_array(collect_list(struct('time','val')).over(w1)))

# idx
.withColumn('idx', 
            expr("array_position(data, (time,val))-1"))

# weights
.withColumn('weights', 
            expr("transform(data, (x,i) ->  10 - abs(i-idx))"))
        
# sum_weights
.withColumn('sum_weights',
            expr("aggregate(weights, 0D, (acc,x) -> acc+x)"))
        
# weighted_val
.withColumn('weighted_val',
            expr("""
      aggregate(
        zip_with(data,weights, (x,y) -> x.val*y),
        0D, 
        (acc,x) -> acc+x,
        acc -> acc/sum_weights
      )"""))
        
# filter columns
.drop("data", "idx", "sum_weights", "weights")
       
)

sdf2.toPandas()

Unnamed: 0,time,val,weighted_val
0,0,0.5,0.682727
1,1,0.6,0.700159
2,2,0.65,0.716957
3,3,0.7,0.733288
4,4,0.77,0.7492
5,5,0.8,0.764133
6,6,0.7,0.778493
7,7,0.9,0.796377
8,8,0.99,0.81381
9,9,0.95,0.829273


In [58]:
sdf = (sdf
        .withColumn('val1',col('val')*2)
        .withColumn('val2',col('val')*3)
        .withColumn('val3',col('val')*4)
       )


sdf.toPandas()

Unnamed: 0,time,val,val1,val2,val3
0,0,0.5,1.0,1.5,2.0
1,1,0.6,1.2,1.8,2.4
2,2,0.65,1.3,1.95,2.6
3,3,0.7,1.4,2.1,2.8
4,4,0.77,1.54,2.31,3.08
5,5,0.8,1.6,2.4,3.2
6,6,0.7,1.4,2.1,2.8
7,7,0.9,1.8,2.7,3.6
8,8,0.99,1.98,2.97,3.96
9,9,0.95,1.9,2.85,3.8


In [59]:
cols = ['val1', 'val2', 'val3']

weighted_vals = lambda val: """
    aggregate(
      zip_with(data,weights, (x,y) -> x.{0}*y),
      0D,(acc,x) -> acc+x, acc -> acc/sum_weights
    ) as weighted_{0}
""".format(val)

sdf2 = (sdf
        
# data
.withColumn('data',
  sort_array(collect_list(struct('time',*cols)).over(w1)))

# idx
.withColumn('idx',
  expr("array_position(data, (time,{}))-1".format(','.join(cols))))
        
# weights   
.withColumn('weights',
  expr("transform(data, (x,i) ->  10 - abs(i-idx))"))
        
# sum_weights
.withColumn('sum_weights',
    expr("aggregate(weights, 0D, (acc,x) -> acc+x)"))
        
# select only few columns
.selectExpr(sdf.columns + [ weighted_vals(c) for c in cols ])
       
)


sdf2.toPandas()

Unnamed: 0,time,val,val1,val2,val3,weighted_val1,weighted_val2,weighted_val3
0,0,0.5,1.0,1.5,2.0,1.365455,2.048182,2.730909
1,1,0.6,1.2,1.8,2.4,1.400317,2.100476,2.800635
2,2,0.65,1.3,1.95,2.6,1.433913,2.15087,2.867826
3,3,0.7,1.4,2.1,2.8,1.466575,2.199863,2.933151
4,4,0.77,1.54,2.31,3.08,1.4984,2.2476,2.9968
5,5,0.8,1.6,2.4,3.2,1.528267,2.2924,3.056533
6,6,0.7,1.4,2.1,2.8,1.556986,2.335479,3.113973
7,7,0.9,1.8,2.7,3.6,1.592754,2.38913,3.185507
8,8,0.99,1.98,2.97,3.96,1.627619,2.441429,3.255238
9,9,0.95,1.9,2.85,3.8,1.658545,2.487818,3.317091


In [60]:
sdf2 = (sdf
# data
.withColumn('data',
  sort_array(collect_list(struct('time',*cols)).over(w1)))
        
# idx
.withColumn('idx',
    expr("array_position(data, (time,{}))-1".format(','.join(cols))))
        
# weights
.withColumn('weights',
  expr("transform(data, (x,i) ->  10 - abs(i-idx))"))
        
# sum_wts
.withColumn('sum_weights',
  expr("aggregate(weights, 0D, (acc,x) -> acc+x)"))
        
# vals
.withColumn("vals",
  expr(""" 
   aggregate( 
     zip_with(data, weights, (x,y) -> (x.val1*y as val1, x.val2*y as val2)),
     (0D as val1, 0D as val2), 
     (acc,x) -> (acc.val1 + x.val1, acc.val2 + x.val2),
     acc -> (acc.val1/sum_weights as weighted_val1, acc.val2/sum_weights as weighted_val2)
   )     
   """))
        
# filter cols
.select(*sdf.columns, "vals.*")
       
)

sdf2.toPandas()

Unnamed: 0,time,val,val1,val2,val3,weighted_val1,weighted_val2
0,0,0.5,1.0,1.5,2.0,1.365455,2.048182
1,1,0.6,1.2,1.8,2.4,1.400317,2.100476
2,2,0.65,1.3,1.95,2.6,1.433913,2.15087
3,3,0.7,1.4,2.1,2.8,1.466575,2.199863
4,4,0.77,1.54,2.31,3.08,1.4984,2.2476
5,5,0.8,1.6,2.4,3.2,1.528267,2.2924
6,6,0.7,1.4,2.1,2.8,1.556986,2.335479
7,7,0.9,1.8,2.7,3.6,1.592754,2.38913
8,8,0.99,1.98,2.97,3.96,1.627619,2.441429
9,9,0.95,1.9,2.85,3.8,1.658545,2.487818
