In [1]:
from functools import reduce
import json
from pyspark.sql import Row, SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as W
# from pyspark.sql.functions import (col, collect_list, concat,
#                                    create_map, lit, map_concat,
#                                    map_from_arrays, substring_index,
#                                    to_json, when)

In [2]:
spark = (SparkSession.builder
         .master('local[*]')
         .config('spark.driver.memory', '10G')
         .appName('generate-statistics')
         .getOrCreate())

spark

### 500 Row Dataframe

In [3]:
%%timeit -n1 -r1
df_500 = spark.read.parquet('../data/df_500/')

# This method will create a huge plan 
#  if there is a large number of columns
# An RDD based method might be better 
#  in this case
#
# def wide_to_long_sql(df):
#     df = df_500
#     df_columns = df.columns
#     df_columns_iterator = (
#         (df.select(
#             lit(column).alias('col_name'),
#             col(column).alias('col_value')))
#         for column in df_columns)
#     long_df = reduce(lambda x,y: x.union(y), df_columns_iterator)
#     return long_df

def wide_to_long_dfs(df):
    def _emit_column_records(row):
        for col, val in row.asDict().items():
            yield Row(col_name=col, col_value=val)
    
    df_dtypes = df.dtypes
    
    dtype_dfs = {}
    
    for dtype in set(d for c, d in df_dtypes):
        dtype_cols = [c for c, d in df_dtypes
                      if d == dtype]
        
        dtype_df = (
            spark.createDataFrame(
                df.select(*dtype_cols)
                .rdd.flatMap(_emit_column_records)))
        
        dtype_dfs[dtype] = dtype_df
    
    return dtype_dfs

long_dfs_by_datatype = wide_to_long_dfs(df_500)

def calculate_metrics(df):
    base_metrics_df = (
        df.groupBy('col_name')
        .agg(F.count('col_value').alias('count'),
             F.mean('col_value').alias('mean'),
             F.stddev('col_value').alias('stddev'),
             F.min('col_value').alias('min'),
             F.max('col_value').alias('max')))

    percentiles_df = (
        df
        .withColumn('percent_rank', (F.percent_rank()
                                     .over(
                                         W.partitionBy('col_name')
                                         .orderBy('col_value'))))
        .withColumn('ge_p1', (F.when(F.col('percent_rank') <= 0.01,
                                     0.01 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p5', (F.when(F.col('percent_rank') <= 0.05,
                                     0.05 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p25', (F.when(F.col('percent_rank') <= 0.25,
                                      0.25 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p50', (F.when(F.col('percent_rank') <= 0.5,
                                      0.5 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p75', (F.when(F.col('percent_rank') <= 0.75,
                                      0.75 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p95', (F.when(F.col('percent_rank') <= 0.95,
                                      0.95 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p99', (F.when(F.col('percent_rank') <= 0.99,
                                      0.99 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('min_ge_p1', (F.min(F.col('ge_p1'))
                                  .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p5', (F.min(F.col('ge_p5'))
                                  .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p25', (F.min(F.col('ge_p25'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p50', (F.min(F.col('ge_p50'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p75', (F.min(F.col('ge_p75'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p95', (F.min(F.col('ge_p95'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p99', (F.min(F.col('ge_p99'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('p1', (F.when(F.col('ge_p1') == F.col('min_ge_p1'),
                                  F.col('col_value'))
                           .otherwise(F.lit(None))))
        .withColumn('p5', (F.when(F.col('ge_p5') == F.col('min_ge_p5'),
                                  F.col('col_value'))
                           .otherwise(F.lit(None))))
        .withColumn('p25', (F.when(F.col('ge_p25') == F.col('min_ge_p25'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p50', (F.when(F.col('ge_p50') == F.col('min_ge_p50'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p75', (F.when(F.col('ge_p75') == F.col('min_ge_p75'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p95', (F.when(F.col('ge_p95') == F.col('min_ge_p95'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p99', (F.when(F.col('ge_p99') == F.col('min_ge_p99'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .groupBy('col_name')
        .agg(F.min('p1').alias('p1'),
             F.min('p5').alias('p5'),
             F.min('p25').alias('p25'),
             F.min('p50').alias('p50'),
             F.min('p75').alias('p75'),
             F.min('p95').alias('p95'),
             F.min('p99').alias('p99'))
    )

    result_df = (
        base_metrics_df
        .join(other=percentiles_df,
              on='col_name',
              how='inner')
        .withColumnRenamed('col_name', 'name'))
    
    return result_df

result_list = []
for df in long_dfs_by_datatype.values():
    res_df = calculate_metrics(df)
    result_list.extend([row.asDict() for row
                        in res_df.toLocalIterator()])

with open('../data/metrics_for_500_custom.json', 'w') as f:
    json.dump(result_list, f, indent=1)

8min 12s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


**For the Distributed Stage in the Summary Task:** Input Size / Records	28.5 MB / 25000

**Timing Information for whole job:** 8min 12s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

### 3000 Row Dataframe

In [3]:
%%timeit -n1 -r1
df_3000 = spark.read.parquet('../data/df_3000/')


def wide_to_long_dfs(df):
    def _emit_column_records(row):
        for col, val in row.asDict().items():
            yield Row(col_name=col, col_value=val)
    
    df_dtypes = df.dtypes
    
    dtype_dfs = {}
    
    for dtype in set(d for c, d in df_dtypes):
        dtype_cols = [c for c, d in df_dtypes
                      if d == dtype]
        
        dtype_df = (
            spark.createDataFrame(
                df.select(*dtype_cols)
                .rdd.flatMap(_emit_column_records)))
        
        dtype_dfs[dtype] = dtype_df
    
    return dtype_dfs

long_dfs_by_datatype = wide_to_long_dfs(df_3000)

def calculate_metrics(df):
    base_metrics_df = (
        df.groupBy('col_name')
        .agg(F.count('col_value').alias('count'),
             F.mean('col_value').alias('mean'),
             F.stddev('col_value').alias('stddev'),
             F.min('col_value').alias('min'),
             F.max('col_value').alias('max')))

    percentiles_df = (
        df
        .withColumn('percent_rank', (F.percent_rank()
                                     .over(
                                         W.partitionBy('col_name')
                                         .orderBy('col_value'))))
        .withColumn('ge_p1', (F.when(F.col('percent_rank') <= 0.01,
                                     0.01 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p5', (F.when(F.col('percent_rank') <= 0.05,
                                     0.05 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p25', (F.when(F.col('percent_rank') <= 0.25,
                                      0.25 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p50', (F.when(F.col('percent_rank') <= 0.5,
                                      0.5 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p75', (F.when(F.col('percent_rank') <= 0.75,
                                      0.75 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p95', (F.when(F.col('percent_rank') <= 0.95,
                                      0.95 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p99', (F.when(F.col('percent_rank') <= 0.99,
                                      0.99 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('min_ge_p1', (F.min(F.col('ge_p1'))
                                  .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p5', (F.min(F.col('ge_p5'))
                                  .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p25', (F.min(F.col('ge_p25'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p50', (F.min(F.col('ge_p50'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p75', (F.min(F.col('ge_p75'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p95', (F.min(F.col('ge_p95'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p99', (F.min(F.col('ge_p99'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('p1', (F.when(F.col('ge_p1') == F.col('min_ge_p1'),
                                  F.col('col_value'))
                           .otherwise(F.lit(None))))
        .withColumn('p5', (F.when(F.col('ge_p5') == F.col('min_ge_p5'),
                                  F.col('col_value'))
                           .otherwise(F.lit(None))))
        .withColumn('p25', (F.when(F.col('ge_p25') == F.col('min_ge_p25'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p50', (F.when(F.col('ge_p50') == F.col('min_ge_p50'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p75', (F.when(F.col('ge_p75') == F.col('min_ge_p75'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p95', (F.when(F.col('ge_p95') == F.col('min_ge_p95'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p99', (F.when(F.col('ge_p99') == F.col('min_ge_p99'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .groupBy('col_name')
        .agg(F.min('p1').alias('p1'),
             F.min('p5').alias('p5'),
             F.min('p25').alias('p25'),
             F.min('p50').alias('p50'),
             F.min('p75').alias('p75'),
             F.min('p95').alias('p95'),
             F.min('p99').alias('p99'))
    )

    result_df = (
        base_metrics_df
        .join(other=percentiles_df,
              on='col_name',
              how='inner')
        .withColumnRenamed('col_name', 'name'))
    
    return result_df

result_list = []
for df in long_dfs_by_datatype.values():
    res_df = calculate_metrics(df)
    result_list.extend([row.asDict() for row
                        in res_df.toLocalIterator()])

with open('../data/metrics_for_3000_custom.json', 'w') as f:
    json.dump(result_list, f, indent=1)

50min 19s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


**For the Distributed Stage in the Summary Task:** Input Size / Records	137.1 MB / 11182

**Timing Information for whole job:** 50min 19s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [3]:
%%timeit -n1 -r1
df_500_of_3000 = spark.read.parquet('../data/df_3000/')
df_500_of_3000 = (df_500_of_3000
                  .select(*[col for col
                            in df_500_of_3000.columns
                            if col.endswith('_1')]))

def wide_to_long_dfs(df):
    def _emit_column_records(row):
        for col, val in row.asDict().items():
            yield Row(col_name=col, col_value=val)
    
    df_dtypes = df.dtypes
    
    dtype_dfs = {}
    
    for dtype in set(d for c, d in df_dtypes):
        dtype_cols = [c for c, d in df_dtypes
                      if d == dtype]
        
        dtype_df = (
            spark.createDataFrame(
                df.select(*dtype_cols)
                .rdd.flatMap(_emit_column_records)))
        
        dtype_dfs[dtype] = dtype_df
    
    return dtype_dfs

long_dfs_by_datatype = wide_to_long_dfs(df_500_of_3000)

def calculate_metrics(df):
    base_metrics_df = (
        df.groupBy('col_name')
        .agg(F.count('col_value').alias('count'),
             F.mean('col_value').alias('mean'),
             F.stddev('col_value').alias('stddev'),
             F.min('col_value').alias('min'),
             F.max('col_value').alias('max')))

    percentiles_df = (
        df
        .withColumn('percent_rank', (F.percent_rank()
                                     .over(
                                         W.partitionBy('col_name')
                                         .orderBy('col_value'))))
        .withColumn('ge_p1', (F.when(F.col('percent_rank') <= 0.01,
                                     0.01 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p5', (F.when(F.col('percent_rank') <= 0.05,
                                     0.05 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p25', (F.when(F.col('percent_rank') <= 0.25,
                                      0.25 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p50', (F.when(F.col('percent_rank') <= 0.5,
                                      0.5 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p75', (F.when(F.col('percent_rank') <= 0.75,
                                      0.75 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p95', (F.when(F.col('percent_rank') <= 0.95,
                                      0.95 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('ge_p99', (F.when(F.col('percent_rank') <= 0.99,
                                      0.99 - F.col('percent_rank'))
                              .otherwise(F.lit(None))))
        .withColumn('min_ge_p1', (F.min(F.col('ge_p1'))
                                  .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p5', (F.min(F.col('ge_p5'))
                                  .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p25', (F.min(F.col('ge_p25'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p50', (F.min(F.col('ge_p50'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p75', (F.min(F.col('ge_p75'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p95', (F.min(F.col('ge_p95'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('min_ge_p99', (F.min(F.col('ge_p99'))
                                   .over(W.partitionBy('col_name'))))
        .withColumn('p1', (F.when(F.col('ge_p1') == F.col('min_ge_p1'),
                                  F.col('col_value'))
                           .otherwise(F.lit(None))))
        .withColumn('p5', (F.when(F.col('ge_p5') == F.col('min_ge_p5'),
                                  F.col('col_value'))
                           .otherwise(F.lit(None))))
        .withColumn('p25', (F.when(F.col('ge_p25') == F.col('min_ge_p25'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p50', (F.when(F.col('ge_p50') == F.col('min_ge_p50'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p75', (F.when(F.col('ge_p75') == F.col('min_ge_p75'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p95', (F.when(F.col('ge_p95') == F.col('min_ge_p95'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .withColumn('p99', (F.when(F.col('ge_p99') == F.col('min_ge_p99'),
                                   F.col('col_value'))
                            .otherwise(F.lit(None))))
        .groupBy('col_name')
        .agg(F.min('p1').alias('p1'),
             F.min('p5').alias('p5'),
             F.min('p25').alias('p25'),
             F.min('p50').alias('p50'),
             F.min('p75').alias('p75'),
             F.min('p95').alias('p95'),
             F.min('p99').alias('p99'))
    )

    result_df = (
        base_metrics_df
        .join(other=percentiles_df,
              on='col_name',
              how='inner')
        .withColumnRenamed('col_name', 'name'))
    
    return result_df

result_list = []
for df in long_dfs_by_datatype.values():
    res_df = calculate_metrics(df)
    result_list.extend([row.asDict() for row
                        in res_df.toLocalIterator()])

with open('../data/metrics_for_500_of_3000_custom.json', 'w') as f:
    json.dump(result_list, f, indent=1)

7min 45s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


**For the Distributed Stage in the Summary Task:** Input Size / Records	22.5 MB / 11182

**Timing Information for whole job:** 7min 45s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


# Conclusions

In this implementation, the summaries are getting calculated in time linear w.r.t. the number of input columns. And hence, meets with the expectation.

The choice to convert from wide format to long format was done to logically partition the metrics calculation task for each column. 

There is a further scope for improvement by combining the two separate steps being used to calculate the base metrics and the percentiles. This can potentially bring the execution time down by half. 