In [1]:
import pyspark
from pyspark.sql import SparkSession, functions as F, types as T, Window as W
from functools import reduce

In [None]:
# pip install pyspark

In [7]:
spark_session = SparkSession.builder \
        .appName("Learn_Cube_Rollup") \
        .getOrCreate()
sc = spark_session.sparkContext
sqlCtx = pyspark.SQLContext(sc)

In [8]:
dataframe_spark = spark_session.read.format("csv")\
    .option("header", "true")\
    .load("../input/sample-sc")

In [9]:
dataframe_spark.show(10)

+-----+-----------+---------------+----------------+-----------+-----------+
|   D1|         D2|             D3|            Date|         M1|         M2|
+-----+-----------+---------------+----------------+-----------+-----------+
|LATAM|Home Office|Central America|2015-01-01T00:00|129.9900055|         26|
|LATAM|Home Office|Central America|2015-01-01T00:00|         50|        8.5|
|LATAM|Home Office|Central America|2015-01-01T00:00| 399.980011|         20|
|LATAM|   Consumer|  South America|2015-01-01T00:00|239.9600067|31.19000053|
|LATAM|Home Office|Central America|2015-01-01T00:00|        100|         13|
|LATAM|  Corporate|  South America|2015-01-01T00:00|         50|          8|
|LATAM|  Corporate|  South America|2015-01-01T00:00| 299.980011|          0|
|LATAM|  Corporate|  South America|2015-01-01T00:00| 399.980011|         12|
|LATAM|  Corporate|  South America|2015-01-01T00:00| 399.980011|         16|
|LATAM|  Corporate|Central America|2015-01-01T00:00|74.97000122|18.73999977|

In [10]:
# dimension_col = [D1, D2, D3]
measure_cols = ["M1", "M2"]

In [11]:
dataframe_spark_rollup = dataframe_spark.select(["D1","D2","D3", "Date", "M1", "M2"])\
    .rollup(["D2","D3","D1", "Date"])\
    .agg(*((F.sum(col)).alias('{}'.format(col)) for col in ["M1", "M2"]))\
    .sort(["D1","D2","D3", "Date", "M1", "M2"]).cache()

dataframe_spark_rollup = dataframe_spark_rollup.filter(
    ~reduce(lambda x, y: x & y, [dataframe_spark_rollup[c].isNull() for c in ["Date"]]))\
            .show()

+--------+---------------+------+----------------+------------------+------------------+
|      D2|             D3|    D1|            Date|                M1|                M2|
+--------+---------------+------+----------------+------------------+------------------+
|Consumer|Northern Europe|Europe|2015-05-31T00:00|     3629.37003334|     298.070001601|
|Consumer|Northern Europe|Europe|2015-06-01T00:00| 5551.330070579998|     492.690002441|
|Consumer|Northern Europe|Europe|2015-06-02T00:00|       999.9500275|             165.0|
|Consumer|Northern Europe|Europe|2015-06-03T00:00|     4017.52008836|378.07000016999996|
|Consumer|Northern Europe|Europe|2015-06-04T00:00|     1389.64001466|     146.719999785|
|Consumer|Northern Europe|Europe|2015-06-05T00:00|     2439.69005204|      241.48999977|
|Consumer|Northern Europe|Europe|2015-06-06T00:00|      759.92002112|      65.799999715|
|Consumer|Northern Europe|Europe|2015-06-07T00:00|     3149.78006746|310.38000107000005|
|Consumer|Northern Eu

In [12]:
dataframe_spark_cube = dataframe_spark.select(["D1","D2","D3", "Date", "M1", "M2"])\
    .cube(["D2","D3","D1", "Date"])\
    .agg(*((F.sum(col)).alias('{}'.format(col)) for col in ["M1", "M2"]))\
    .sort(["D1","D2","D3", "Date", "M1", "M2"]).cache()

dataframe_spark_cube = dataframe_spark_cube.filter(
    ~reduce(lambda x, y: x & y, [dataframe_spark_cube[c].isNull() for c in ["Date"]]))\
            .show()

+----+----+----+----------------+------------------+------------------+
|  D2|  D3|  D1|            Date|                M1|                M2|
+----+----+----+----------------+------------------+------------------+
|null|null|null|2015-01-01T00:00| 32806.09068968997|3426.4000041880004|
|null|null|null|2015-01-02T00:00|29818.210575399968|3016.3599987919997|
|null|null|null|2015-01-03T00:00|36348.710647989974|3489.4499867789987|
|null|null|null|2015-01-04T00:00|35738.970669140974|3869.5100001929995|
|null|null|null|2015-01-05T00:00|31067.910602679956|    3534.920024937|
|null|null|null|2015-01-06T00:00|33395.710577099955|3320.5900055659995|
|null|null|null|2015-01-07T00:00| 32923.22068821996|2989.8399987519997|
|null|null|null|2015-01-08T00:00| 37590.17087038999|    3718.580007559|
|null|null|null|2015-01-09T00:00|26004.930517179982|2596.4800077089994|
|null|null|null|2015-01-10T00:00| 37053.86075832997|3535.1099916819985|
|null|null|null|2015-01-11T00:00| 34164.16064508997|      3266.8