In [75]:
### BEGIN STRIP ###
import findspark
import pyspark

findspark.init()

spark = (pyspark.sql.SparkSession.builder \
         .master('local') \
         .appName('Tidy Data') \
         .getOrCreate())

sc = spark.sparkContext
### END STRIP ###

# Tidy data and nested schemas

## Tidy data
Data tidying is the concept of structuring datasets to facilitate analysis.

The principles of tidy data have been described in 2013 by statistician [Hadley Wickman](http://hadley.nz/) and closely tied to the principles of relational databases and Codd's relational algebra. They provide a standard way to organize data values within a dataset and can be synthetized as:

- Each variable forms a column.
- Each observation forms a row.
- Each type of observational unit forms a table.

We strongly advice you take the time to read [the original paper from Wickam](https://vita.had.co.nz/papers/tidy-data.pdf).

## Array operations and nested schemas

In [18]:
from pyspark.sql import functions as F
from pyspark.sql import Row

Let's say we have some data about users, here we create a RDD from a dict, but in real life, we would obtain it through a pipeline or a query from a database.

In [19]:
users_dct = [
    {'id': 1, 'name': 'George', 'orders': [50.61, 31.32, 20.9]},
    {'id': 2, 'name': 'Hugues', 'orders': [133.8, 59.0, 40.03, 27.91]}
]
users_rdd = sc.parallelize(users_dct)
users_df = spark.createDataFrame(users_rdd.map(lambda x: Row(**x)))
users_df.show()

+---+------+--------------------+
| id|  name|              orders|
+---+------+--------------------+
|  1|George|[50.61, 31.32, 20.9]|
|  2|Hugues|[133.8, 59.0, 40....|
+---+------+--------------------+



In [62]:
from pyspark.sql.types import *

In [63]:
users_dct = [
    {'id': 1, 'name': 'George', 'orders': [50, 31, 20]},
    {'id': 2, 'name': 'Hugues', 'orders': [133, 59, 40, 27]}
]
users_rdd = sc.parallelize(users_dct)

schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('orders', ArrayType(IntegerType()), True)
])


users_df = spark.createDataFrame(users_rdd.map(lambda x: Row(**x)), schema=schema)
users_df.printSchema()
users_df.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+---+------+-----------------+
| id|  name|           orders|
+---+------+-----------------+
|  1|George|     [50, 31, 20]|
|  2|Hugues|[133, 59, 40, 27]|
+---+------+-----------------+



#### `F.size(...)`

In [68]:
users_df \
    .withColumn('orders_quantity', F.size('orders')) \
    .drop('orders') \
    .show()

+---+------+---------------+
| id|  name|orders_quantity|
+---+------+---------------+
|  1|George|              3|
|  2|Hugues|              4|
+---+------+---------------+



We get the size of the array, which is pretty nice, but what if we want to compute other aggregates like sum or average? It appears it's not trivial, we will go through one method but there are other, you can read more about it [here](https://databricks.com/blog/2017/05/24/working-with-nested-data-using-higher-order-functions-in-sql-on-databricks.html).

#### `F.explode(...)`

Before we try to compute aggregate, let's ask another question: what if we want one row per order?  
In effect, an order is an observational unit, and following the tidy principles, deserve it's own table.

In [25]:
orders_df = users_df.withColumn('orders', F.explode('orders'))
orders_df.printSchema()
orders_df.show()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- orders: double (nullable = true)

+---+------+------+
| id|  name|orders|
+---+------+------+
|  1|George| 50.61|
|  1|George| 31.32|
|  1|George|  20.9|
|  2|Hugues| 133.8|
|  2|Hugues|  59.0|
|  2|Hugues| 40.03|
|  2|Hugues| 27.91|
+---+------+------+



---
Now we can compute the average order by customer with a `.groupBy(...)`.

In [73]:
orders_df.groupBy('id', 'name') \
    .mean('orders') \
    .show()

+---+------+-----------------+
| id|  name|      avg(orders)|
+---+------+-----------------+
|  1|George|34.27666666666667|
|  2|Hugues|           65.185|
+---+------+-----------------+



---

The opposite transformation is **`.collect_list(...)`**.

In [None]:
orders_df.groupBy('id', 'name') \
    .agg(F.collect_list('orders').alias('orders')) \
    .show()

We got our original DataFrame back.

### Nested schema
This time our schema is a bit more difficult, we have a list of users with their orders, but not only we have the order amount, we also have the details.

In [None]:
from pyspark.sql.types import *

In [None]:
users = [
    {'id': 1, 'name': 'George', 'orders': [
        {'id': 1, 'value': 55.1},
        {'id': 2, 'value': 78.31},
        {'id': 4, 'value': 52.13}
    ]},
    {'id': 2, 'name': 'Hughes', 'orders': [
        {'id': 3, 'value': 31.19},
        {'id': 5, 'value': 131.1}
    ]}
]
users_rdd = sc.parallelize(users)

schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('orders', ArrayType(
        StructType([
            StructField('id', IntegerType(), True),
            StructField('value', FloatType(), True)
        ])
    ), True)
])

users_df = spark.createDataFrame(users_rdd, schema=schema)
users_df.printSchema()
users_df.show()

In [None]:
orders_df = users_df.withColumn('orders', F.explode('orders'))
orders_df.printSchema()
orders_df.show()

We can access nested fields using `.getField(fieldname)`

In [None]:
orders_df \
    .withColumn('order_id', F.col('orders').getField('id')) \
    .show()

Or using **`.`** notation.

In [None]:
orders_df \
    .withColumn('order_id', F.col('orders.id')) \
    .show()

In [None]:
orders_df_flattened = orders_df \
    .withColumn('order_id', F.col('orders.id')) \
    .withColumn('order_value', F.col('orders.value')) \
    .drop('orders')
orders_df_flattened.show()

In [None]:
orders_df_flattened \
    .groupBy('name') \
    .sum('order_value') \
    .orderBy('sum(order_value)') \
    .show()

In [None]:
# Aliasing inline and descending sort
orders_df_flattened \
    .groupBy('name') \
    .agg(F.sum('order_value').alias('total_value')) \
    .orderBy(F.desc('total_value')) \
    .show()

### Even harder

In [None]:
users = [
    {'id': 1, 'name': 'George', 'orders': [
        {'id': 1, 'items': [
            {'id': 1, 'category': 'shirt', 'price': 80, 'quantity': 4},
            {'id': 2, 'category': 'jeans', 'price': 130, 'quantity': 2}
        ]},
        {'id': 4, 'items': [
            {'id': 1, 'category': 'shirt', 'price': 80, 'quantity': 1},
            {'id': 3, 'category': 'shoes', 'price': 240, 'quantity': 1}
        ]}
    ]},
    {'id': 2, 'name': 'Hughes', 'orders': [
        {'id': 2, 'items': [
            {'id': 4, 'category': 'shorts', 'price': 120, 'quantity': 3},
            {'id': 1, 'category': 'shirt', 'price': 180, 'quantity': 2},
            {'id': 3, 'category': 'shoes', 'prices': 240, 'quantity': 1}
        ]},
        {'id': 3, 'items': [
            {'id': 5, 'category': 'suit', 'price': 2000, 'quantity': 1}
        ]}
    ]}
]
users_rdd = sc.parallelize(users)

schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('orders', ArrayType(
        StructType([
            StructField('id', IntegerType(), True),
            StructField('items', ArrayType(
                StructType([
                    StructField('id', IntegerType(), True),
                    StructField('category', StringType(), True),
                    StructField('price', IntegerType(), True),
                    StructField('quantity', IntegerType(), True)
                ])
            ))
        ])
    ), True)
])

users_df = spark.createDataFrame(users_rdd, schema=schema)
users_df.printSchema()
users_df.show()

In [None]:
orders_df = users_df.withColumn('orders', F.explode('orders'))
orders_df.show()

In [None]:
# TODO: build this step by step and show intermediary results
items_df = (
    orders_df.withColumn('order_id', F.col('orders.id'))
    .withColumn('items', F.col('orders.items'))
    .drop('orders')
    .withColumnRenamed('name', 'user_name')
    .withColumnRenamed('id', 'user_id')
    .withColumn('items', F.explode('items'))
    .withColumn('item_id', F.col('items.id'))
    .withColumn('item_category', F.col('items.category'))
    .withColumn('item_price', F.col('items.price'))
    .withColumn('item_quantity', F.col('items.quantity'))
    .withColumn('total_price', F.col('item_price') * F.col('item_quantity'))
    .drop('items')
)
items_df.show()

### Advanced groupBy

In [None]:
items_df \
    .groupBy('item_category') \
    .sum('item_quantity') \
    .orderBy(F.desc('sum(item_quantity)')) \
    .show()

You might want to alias, in this case, you change `.sum()` for `.agg()`.

In [None]:
items_df \
    .groupBy('item_category') \
    .agg(F.sum('item_quantity').alias('total_quantity')) \
    .orderBy(F.desc('total_quantity')) \
    .show()

If I want to alias..

In [None]:
items_df \
    .groupBy('item_category') \
    .agg((F.sum('total_price') / F.sum('item_quantity')).alias('avg_sale')) \
    .orderBy(F.desc('avg_sale')) \
    .show()

## Resources

- [Automatically and Elegantly flatten DataFrame in Spark SQL](https://stackoverflow.com/questions/37471346/automatically-and-elegantly-flatten-dataframe-in-spark-sql) on StackOverflow