In [0]:
from pyspark.sql import functions as f
from pyspark.sql import Window

In [0]:
invoice_df = spark.read \
    .format("csv")  \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/FileStore/sample_data/invoices.csv")

In [0]:
invoice_df.select(f.count("*").alias("Count *"),
                 f.sum("Quantity").alias("TotalQuantity"),
                 f.avg("UnitPrice").alias("AvgPrice"),
                 f.countDistinct("InvoiceNo").alias("CounDistinct")
                 ).show()

+-------+-------------+-----------------+------------+
|Count *|TotalQuantity|         AvgPrice|CounDistinct|
+-------+-------------+-----------------+------------+
| 541909|      5176450|4.611113626088477|       25900|
+-------+-------------+-----------------+------------+



In [0]:
invoice_df.selectExpr(
    "count(1) as `count 1`",
    "count(StockCode) as `count field`",
    "sum(Quantity) as TotalQuantity",
    "avg(UnitPrice) as AvgUnitPrice"
).show()

+-------+-----------+-------------+-----------------+
|count 1|count field|TotalQuantity|     AvgUnitPrice|
+-------+-----------+-------------+-----------------+
| 541909|     541908|      5176450|4.611113626089747|
+-------+-----------+-------------+-----------------+



In [0]:
%sql
drop view if exists sales;

In [0]:

invoice_df.createOrReplaceGlobalTempView("sales")
summary_sql = spark.sql("""
                        SELECT Country, InvoiceNo,
                        sum(Quantity) as TotalQuantity,
                        round(sum(Quantity*UnitPrice),2) as InvoiceValue
                        FROM global_temp.sales
                        GROUP BY Country, InvoiceNo
                        """)
display(summary_sql)                    

Country,InvoiceNo,TotalQuantity,InvoiceValue
United Kingdom,536446,329,440.89
United Kingdom,536508,216,155.52
United Kingdom,537018,-3,0.0
United Kingdom,537401,-24,0.0
United Kingdom,537811,74,268.86
United Kingdom,C537824,-2,-14.9
United Kingdom,538895,370,247.38
United Kingdom,540453,341,302.45
United Kingdom,541291,217,305.81
United Kingdom,536627,64,306.2


In [0]:
display(invoice_df)

In [0]:
summary_df = invoice_df.groupBy("Country", "InvoiceNo" ) \
        .agg(f.sum("Quantity").alias("TotalQuantity"),
            f.round(f.sum(f.expr("Quantity * UnitPrice")), 2).alias("InvoiceValue")
        )
summary_df.show()

+--------------+---------+-------------+------------+
|       Country|InvoiceNo|TotalQuantity|InvoiceValue|
+--------------+---------+-------------+------------+
|United Kingdom|   536446|          329|      440.89|
|United Kingdom|   536508|          216|      155.52|
|United Kingdom|   537018|           -3|         0.0|
|United Kingdom|   537401|          -24|         0.0|
|United Kingdom|   537811|           74|      268.86|
|United Kingdom|  C537824|           -2|       -14.9|
|United Kingdom|   538895|          370|      247.38|
|United Kingdom|   540453|          341|      302.45|
|United Kingdom|   541291|          217|      305.81|
|United Kingdom|   536627|           64|       306.2|
|United Kingdom|   537224|          700|     1415.97|
|United Kingdom|   537230|            1|        2.95|
|United Kingdom|   537682|           72|        99.6|
|United Kingdom|  C538749|           -1|       -9.95|
|        Israel|  C539037|          -56|     -227.44|
|United Kingdom|   539076|  

In [0]:
display(invoice_df)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,,WHITE HANGING HEART T-LIGHT HOLDER,6,01-12-2010 8.26,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,01-12-2010 8.26,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,01-12-2010 8.26,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,01-12-2010 8.26,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,01-12-2010 8.26,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,01-12-2010 8.26,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,01-12-2010 8.26,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,01-12-2010 8.28,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,01-12-2010 8.28,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,01-12-2010 8.34,1.69,13047.0,United Kingdom


In [0]:
invoice_df.createOrReplaceGlobalTempView("sales")
summary_sql = spark.sql("""
                        SELECT Country, weekofyear(to_timestamp(InvoiceDate, 'dd-MM-yyyy H.mm')) as WeekNumber,
                        count("InvoiceNo") as NumInvoices,
                        sum(Quantity) as TotalQuantity,
                        round(sum(Quantity*UnitPrice),2) as InvoiceValue
                        FROM global_temp.sales
                        GROUP BY Country, WeekNumber
                        """)
display(summary_sql)

Country,WeekNumber,NumInvoices,TotalQuantity,InvoiceValue
Spain,49,111,323,446.15
Germany,48,413,3837,8610.98
Italy,4,31,206,419.35
Spain,2,128,2126,6280.15
Lithuania,48,34,622,1598.06
Germany,49,374,5040,10412.52
Bahrain,51,1,54,205.74
Iceland,49,42,511,936.61
EIRE,51,66,95,276.84
Sweden,1,14,292,507.56


In [0]:
invoice_df_with_week = invoice_df.withColumn(
    "WeekNumber", 
    f.weekofyear(f.to_timestamp("InvoiceDate", 'dd-MM-yyyy H.mm'))
)
summary_df = invoice_df_with_week.groupBy("Country", "WeekNumber") \
        .agg(
            f.countDistinct("InvoiceNo").alias("NumInvoices"),
            f.sum("Quantity").alias("TotalQuantity"),
            f.round(f.sum(f.expr("Quantity * UnitPrice")), 2).alias("InvoiceValue")
        )
display(summary_df)

Country,WeekNumber,NumInvoices,TotalQuantity,InvoiceValue
EIRE,17,1,163,362.9
Channel Islands,46,1,78,211.63
Norway,47,1,594,2016.78
France,10,4,471,998.49
France,6,3,971,978.44
Germany,17,4,1184,2553.76
Spain,17,1,267,436.3
Japan,30,1,2208,3680.4
Austria,21,1,434,1083.58
Iceland,44,1,676,1294.32


In [0]:
NumInvoices = f.countDistinct("InvoiceDate").alias("NumInvoices")
TotalQuantity = f.sum("Quantity").alias("Total Quantity")
InvoiceValue = f.expr("round(sum(Quantity * UnitPrice), 2) as InvoiceValue")
exSummary = invoice_df \
    .withColumn("InvoiceDate", f.to_date(f.col("InvoiceDate"), "dd-MM-yyyy H.mm"))\
    .withColumn("WeekNumber", f.weekofyear("InvoiceDate")) \
    .groupBy("Country","WeekNumber") \
    .agg(NumInvoices, TotalQuantity, InvoiceValue) \
    .sort("Country", "WeekNumber")

display(exSummary)

Country,WeekNumber,NumInvoices,Total Quantity,InvoiceValue
Australia,1,1,4802,7154.38
Australia,2,3,394,721.1
Australia,3,3,338,757.55
Australia,4,1,110,384.68
Australia,6,1,58,192.0
Australia,7,1,8384,14022.92
Australia,8,1,217,412.55
Australia,9,1,10162,16558.14
Australia,10,1,90,210.9
Australia,12,1,77,286.25


In [0]:
exSummary.coalesce(1) \
    .write \
    .format("parquet") \
    .mode("overwrite") \
    .save("/FileStore/sample_data/dataSink/agg_output")


In [0]:
new_summary_df = spark.read \
    .format("parquet") \
    .load("/FileStore/sample_data/dataSink/output.parquet/part-00000-tid-2228203500283238251-60e56e7e-6d7b-4f89-a8f0-26af6c3297cf-247-1-c000.snappy.parquet")


In [0]:
running_total_window = Window.partitionBy("Country") \
    .orderBy("WeekNumber") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [0]:
final_df = new_summary_df.withColumn("RunningTotal", f.sum("InvoiceValue").over(running_total_window))

In [0]:
display(final_df)

Country,WeekNumber,NumInvoices,Total Quantity,InvoiceValue,RunningTotal
Australia,1,1,4802,7154.38,7154.38
Australia,2,3,394,721.1,7875.48
Australia,3,3,338,757.55,8633.03
Australia,4,1,110,384.68,9017.71
Australia,6,1,58,192.0,9209.71
Australia,7,1,8384,14022.92,23232.63
Australia,8,1,217,412.55,23645.18
Australia,9,1,10162,16558.14,40203.32
Australia,10,1,90,210.9,40414.22
Australia,12,1,77,286.25,40700.47
