In [1]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder \
                    .appName("Spark Grouping Function Demo") \
                    .master("local[3]") \
                    .enableHiveSupport() \
                    .getOrCreate()

In [2]:
invoice_df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("invoices.csv")
invoice_df.show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|     NULL|WHITE HANGING HEA...|       6|01-12-2010 8.26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 8.26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



In [3]:
NumInvoices = f.countDistinct("InvoiceNo").alias("NumInvoices")
TotalQuantity = f.sum("Quantity").alias("TotalQuantity")
InvoiceValue = f.expr("round(sum(Quantity * UnitPrice),2) as InvoiceValue")


In [4]:
exSummary_df = invoice_df \
        .withColumn("InvoiceDate", f.to_date(f.col("InvoiceDate"), "dd-MM-yyyy H.mm")) \
        .where("year(InvoiceDate) == 2010") \
        .withColumn("WeekNumber", f.weekofyear(f.col("InvoiceDate"))) \
        .groupBy("Country", "WeekNumber") \
        .agg(NumInvoices, TotalQuantity, InvoiceValue)

exSummary_df.coalesce(1) \
        .write \
        .format("parquet") \
        .mode("overwrite") \
        .save("output")

exSummary_df.sort("Country", "WeekNumber").show()

+---------------+----------+-----------+-------------+------------+
|        Country|WeekNumber|NumInvoices|TotalQuantity|InvoiceValue|
+---------------+----------+-----------+-------------+------------+
|      Australia|        48|          1|          107|      358.25|
|      Australia|        49|          1|          214|       258.9|
|      Australia|        50|          2|          133|      387.95|
|        Austria|        50|          2|            3|      257.04|
|        Bahrain|        51|          1|           54|      205.74|
|        Belgium|        48|          1|          528|       346.1|
|        Belgium|        50|          2|          285|      625.16|
|        Belgium|        51|          2|          942|      838.65|
|Channel Islands|        49|          1|           80|      363.53|
|         Cyprus|        50|          1|          917|     1590.82|
|        Denmark|        49|          1|          454|      1281.5|
|           EIRE|        48|          7|        