In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [53]:
data = [
    [1, '2018-01-01',   0,   0], 
    [1, '2018-01-02',  10,   0],
    [1, '2018-01-03',  50,   0],
    [1, '2018-01-04',   1,  40],
    [1, '2018-01-05',   0,  10],
    [1, '2018-01-06',   0,   0],
    [1, '2018-01-07',   0,   0],
    [1, '2018-01-08',   0,   0], 
    [1, '2018-01-09',   0,   0],
    [1, '2018-01-10',  70,   0],
    [1, '2018-01-11',   1,  70],
    [1, '2018-01-12',   0,   0],
    [1, '2018-01-13',   0,   0],
    [1, '2018-01-14',   0,   0],
    
    [2, '2018-01-01',   0,   0],
    [2, '2018-01-02',   0,   0], 
    [2, '2018-01-03', 100,   0],
    [2, '2018-01-04',   0, 100],
    
    [3, '2018-01-01',   5,   0],
    [3, '2018-01-02',   0,  10],
    [3, '2018-01-03',   0,   0],
    [3, '2018-01-04', 200, 200],
    [3, '2018-01-05',   1,   0],    
    [3, '2018-01-06',   2,   2],
]
df = pd.DataFrame(data, columns=['userid', 'date', 'received', 'spent'])

In [58]:
with SparkSession.builder.appName('test').getOrCreate() as spark:
    spark_df = spark.createDataFrame(df)
    spark_df.createOrReplaceTempView('features_all')
    
    res_df = spark.sql(
        '''
        SELECT 
            *,
            SUM(ROUND(received, -1)) OVER(PARTITION BY userid 
                                          ORDER BY date 
                                        ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING) AS received_sum,
            SUM(ROUND(spent, -1)) OVER(PARTITION BY userid 
                                       ORDER BY date 
                                       ROWS BETWEEN CURRENT ROW AND 2 FOLLOWING) AS spent_sum
        FROM features_all
        ''')
    
#     res_df = res_df.groupBy('userid')\
#                    .agg(F.max('received_sum').alias('max_received'), 
#                         F.max('spent_sum').alias('max_spent'))\

    def test_udf_core(x):
        return str(x)
    def test_udf_core1(x):
        return 77
    
    test_udf  = F.udf(test_udf_core)
    test_udf1 = F.udf(test_udf_core1)
    
    res_df = res_df.groupBy('userid')\
                   .agg(F.collect_list('received_sum').alias('received_sum_agg'), 
                        F.collect_list('spent_sum').alias('spent_sum_agg'))\
                   .withColumn('received_sum_agg', test_udf('received_sum_agg'))\
                   .withColumn('spent_sum_agg', test_udf1('spent_sum_agg'))
    
    display(res_df.toPandas())
    

Unnamed: 0,userid,received_sum_agg,spent_sum_agg
0,1,"[60, 60, 50, 0, 0, 0, 0, 70, 70, 70, 0, 0, 0, 0]",77
1,3,"[10, 200, 200, 200, 0, 0]",77
2,2,"[100, 100, 100, 0]",77


In [51]:
with SparkSession.builder.appName('test').getOrCreate() as spark:
    df = spark.createDataFrame([['A', 1],
                            ['A',1],
                            ['A',0],
                            ['B',0],
                            ['B',0],
                            ['B',1]], schema=['name', 'val'])


    def smooth_mean(x):
        return (sum(x)+5)/(len(x)+5)

    smooth_mean_udf = F.udf(smooth_mean)

    df.groupBy('name')\
      .agg(F.collect_list('val').alias('val'))\
      .withColumn('val', smooth_mean_udf('val')).show()

+----+-----+
|name|  val|
+----+-----+
|   B| 0.75|
|   A|0.875|
+----+-----+



https://www.programcreek.com/python/example/98235/pyspark.sql.functions.collect_list