In [49]:
import org.apache.spark.sql.types.{StringType, IntegerType, FloatType, StructType, StructField}

val schema = StructType(
    List(
            StructField("country", StringType),
            StructField("weeknum", IntegerType),
            StructField("numInvoices", IntegerType),
            StructField("totalquantity", IntegerType),
            StructField("invoicevalue", FloatType)

        ))

val InvoiceDF = spark.read
                .format("csv")
                .option("header", false)
                .schema(schema)
                .option("path", "/user/itv004483/sparkinput/windowdata.csv")
                .load()


schema = StructType(StructField(country,StringType,true), StructField(weeknum,IntegerType,true), StructField(numInvoices,IntegerType,true), StructField(totalquantity,IntegerType,true), StructField(invoicevalue,FloatType,true))
InvoiceDF = [country: string, weeknum: int ... 3 more fields]


[country: string, weeknum: int ... 3 more fields]

In [50]:
//find the running total per week for each country

import org.apache.spark.sql.functions.{col, sum}
import org.apache.spark.sql.expressions.Window
                                      
val period = Window
                .partitionBy(col("country"))
                .orderBy(col("weeknum"))
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)


val RunInvDF = InvoiceDF
            .withColumn("RunningInvoice",sum("invoicevalue").over(period))
            .sort("country","weeknum")
           

RunInvDF.show            

+---------------+-------+-----------+-------------+------------+------------------+
|        country|weeknum|numInvoices|totalquantity|invoicevalue|    RunningInvoice|
+---------------+-------+-----------+-------------+------------+------------------+
|      Australia|     48|          1|          107|      358.25|            358.25|
|      Australia|     49|          1|          214|       258.9| 617.1499938964844|
|      Australia|     50|          2|          133|      387.95|1005.1000061035156|
|        Austria|     50|          2|            3|      257.04| 257.0400085449219|
|        Bahrain|     51|          1|           54|      205.74|205.74000549316406|
|        Belgium|     48|          1|          528|       346.1| 346.1000061035156|
|        Belgium|     50|          2|          285|      625.16| 971.2599792480469|
|        Belgium|     51|          2|          942|      838.65|1809.9100036621094|
|Channel Islands|     49|          1|           80|      363.53| 363.5299987

period = org.apache.spark.sql.expressions.WindowSpec@2e286e31
RunInvDF = [country: string, weeknum: int ... 4 more fields]


[country: string, weeknum: int ... 4 more fields]

In [51]:
//validate with spark sql
InvoiceDF.createOrReplaceTempView("InvoiceDFTBL")

spark.sql("""
select country, weeknum, invoicevalue, sum(invoicevalue) over(partition by country order by weeknum) as RunningTotal
from InvoiceDFTBL order by country, weeknum
""").show

+---------------+-------+------------+------------------+
|        country|weeknum|invoicevalue|      RunningTotal|
+---------------+-------+------------+------------------+
|      Australia|     48|      358.25|            358.25|
|      Australia|     49|       258.9| 617.1499938964844|
|      Australia|     50|      387.95|1005.1000061035156|
|        Austria|     50|      257.04| 257.0400085449219|
|        Bahrain|     51|      205.74|205.74000549316406|
|        Belgium|     48|       346.1| 346.1000061035156|
|        Belgium|     50|      625.16| 971.2599792480469|
|        Belgium|     51|      838.65|1809.9100036621094|
|Channel Islands|     49|      363.53| 363.5299987792969|
|         Cyprus|     50|     1590.82|1590.8199462890625|
|        Denmark|     49|      1281.5|            1281.5|
|        Finland|     50|       892.8| 892.7999877929688|
|         France|     48|     2808.16| 2808.159912109375|
|         France|     49|     4527.01| 7335.169677734375|
|         Fran

In [52]:
//add totoal invoice value

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.sum

val totInvWindow = Window
                    .partitionBy("country")

val totInvDF = RunInvDF
                    .withColumn("Total Invoice value", sum("invoicevalue").over(totInvWindow))
                    .sort("country","weeknum")

totInvDF.show


+---------------+-------+-----------+-------------+------------+------------------+-------------------+
|        country|weeknum|numInvoices|totalquantity|invoicevalue|    RunningInvoice|Total Invoice value|
+---------------+-------+-----------+-------------+------------+------------------+-------------------+
|      Australia|     48|          1|          107|      358.25|            358.25| 1005.1000061035156|
|      Australia|     49|          1|          214|       258.9| 617.1499938964844| 1005.1000061035156|
|      Australia|     50|          2|          133|      387.95|1005.1000061035156| 1005.1000061035156|
|        Austria|     50|          2|            3|      257.04| 257.0400085449219|  257.0400085449219|
|        Bahrain|     51|          1|           54|      205.74|205.74000549316406| 205.74000549316406|
|        Belgium|     48|          1|          528|       346.1| 346.1000061035156| 1809.9100036621094|
|        Belgium|     50|          2|          285|      625.16|

totInvWindow = org.apache.spark.sql.expressions.WindowSpec@3077a9c9
totInvDF = [country: string, weeknum: int ... 5 more fields]


[country: string, weeknum: int ... 5 more fields]

In [53]:
//validate with spark SQL

RunInvDF.createOrReplaceTempView("RunInvDF")
spark.sql("""
select country, weeknum, invoicevalue, RunningInvoice, sum(invoicevalue) over(partition by country) as TotalInvoice
from RunInvDF order by country, weeknum
""").show

+---------------+-------+------------+------------------+------------------+
|        country|weeknum|invoicevalue|    RunningInvoice|      TotalInvoice|
+---------------+-------+------------+------------------+------------------+
|      Australia|     48|      358.25|            358.25|1005.1000061035156|
|      Australia|     49|       258.9| 617.1499938964844|1005.1000061035156|
|      Australia|     50|      387.95|1005.1000061035156|1005.1000061035156|
|        Austria|     50|      257.04| 257.0400085449219| 257.0400085449219|
|        Bahrain|     51|      205.74|205.74000549316406|205.74000549316406|
|        Belgium|     48|       346.1| 346.1000061035156|1809.9100036621094|
|        Belgium|     50|      625.16| 971.2599792480469|1809.9100036621094|
|        Belgium|     51|      838.65|1809.9100036621094|1809.9100036621094|
|Channel Islands|     49|      363.53| 363.5299987792969| 363.5299987792969|
|         Cyprus|     50|     1590.82|1590.8199462890625|1590.8199462890625|