# Aggregations

In [1]:
%run ./Includes/paths.py

In [3]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

from delta import *
import pandas as pd

# start spark
builder = (pyspark.sql.SparkSession.builder.appName("Spark-Course")
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"))

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# setting log-level to ERROR to decrease verbosity
# log4j log-levels are: OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL
spark.sparkContext.setLogLevel("ERROR")

# so that we can register UDFs in SQL
spark.builder.enableHiveSupport()

spark

In [4]:
pd.set_option('display.max_columns', None)

In [5]:
def display(spark_df, rows=10):
    return spark_df.limit(rows).toPandas().head(rows)

In [6]:
%load_ext sparksql_magic

In [7]:
titles_schema = StructType([
                            StructField('tconst', StringType(), False),
                            StructField('titleType', StringType(), True),
                            StructField('primaryTitle', StringType(), True),
                            StructField('originalTitle', StringType(), True),
                            StructField('isAdult', IntegerType(), True),
                            StructField('startYear', IntegerType(), True),
                            StructField('endYear', IntegerType(), True),
                            StructField('runtimeMinutes', IntegerType(), True),
                            StructField('genres', StringType(), True)
])

titles_sdf = (spark
                .read
                .schema(titles_schema)
                .option('header', True)
                .option('delimiter', '\t')
                .csv(raw_data_path + 'title_basics.tsv')
)

display(titles_sdf)

                                                                                

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,,1,"Comedy,Short"
5,tt0000006,short,Chinese Opium Den,Chinese Opium Den,0,1894,,1,Short
6,tt0000007,short,Corbett and Courtney Before the Kinetograph,Corbett and Courtney Before the Kinetograph,0,1894,,1,"Short,Sport"
7,tt0000008,short,Edison Kinetoscopic Record of a Sneeze,Edison Kinetoscopic Record of a Sneeze,0,1894,,1,"Documentary,Short"
8,tt0000009,short,Miss Jerry,Miss Jerry,0,1894,,40,"Romance,Short"
9,tt0000010,short,Leaving the Factory,La sortie de l'usine Lumière à Lyon,0,1895,,1,"Documentary,Short"


In [8]:
ratings_schema = StructType([
                            StructField('tconst_ratings', StringType(), False),
                            StructField('averageRating', FloatType(), True),
                            StructField('numVotes', LongType(), True)
])

ratings_sdf = (spark
                .read
                .schema(ratings_schema)
                .option('header', True)
                .option('delimiter', '\t')
                .csv(raw_data_path + 'title_ratings.tsv')
)

display(ratings_sdf)

Unnamed: 0,tconst_ratings,averageRating,numVotes
0,tt0000001,5.7,1876
1,tt0000002,5.9,248
2,tt0000003,6.5,1648
3,tt0000004,5.8,160
4,tt0000005,6.2,2475
5,tt0000006,5.2,165
6,tt0000007,5.4,771
7,tt0000008,5.4,2016
8,tt0000009,5.3,193
9,tt0000010,6.9,6775


In [9]:
# join the two dataframes and cache it
movies_sdf = (titles_sdf
                .join(ratings_sdf,
                titles_sdf['tconst'] == ratings_sdf['tconst_ratings'],
                how='inner')
                .drop('tconst_ratings')
                .coalesce(8) # my executor has 8 cores, so coalescing the dataframe to 8 partitions
)

movies_sdf.cache()

display(movies_sdf)

                                                                                

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres,averageRating,numVotes
0,tt0000658,short,The Puppet's Nightmare,Le cauchemar de Fantoche,0,1908,,2.0,"Animation,Short",6.5,227
1,tt0001732,short,The Lighthouse Keeper,The Lighthouse Keeper,0,1911,,,"Drama,Short",7.1,15
2,tt0002253,short,Home Folks,Home Folks,0,1912,,17.0,"Drama,Short",4.5,14
3,tt0002473,short,The Sands of Dee,The Sands of Dee,0,1912,,17.0,"Romance,Short",6.5,81
4,tt0002588,movie,Zigomar contre Nick Carter,Zigomar contre Nick Carter,0,1912,,48.0,"Crime,Thriller",6.0,37
5,tt0004091,short,His Favorite Pastime,His Favorite Pastime,0,1914,,16.0,"Comedy,Short",5.0,1104
6,tt0004272,movie,Lucille Love: The Girl of Mystery,Lucille Love: The Girl of Mystery,0,1914,,300.0,Action,6.7,24
7,tt0004336,movie,The Million Dollar Mystery,The Million Dollar Mystery,0,1914,,,"Adventure,Mystery",6.2,37
8,tt0005209,movie,Don Quixote,Don Quixote,0,1915,,50.0,Drama,5.6,44
9,tt0005793,movie,Mysteries of London,Mysteries of London,0,1915,,49.0,"Crime,Drama",6.1,11


## Count, Sum, Average, Variance, Covariance

In [55]:
# count() as action
movies_sdf.count()

                                                                                

1239452

In [56]:
# count() as transformation
movies_sdf.selectExpr('count(*) as ct')

DataFrame[ct: bigint]

In [57]:
display(
    movies_sdf.select(count_distinct('primaryTitle'))
)

                                                                                

Unnamed: 0,count(DISTINCT primaryTitle)
0,932725


In [58]:
display(
    movies_sdf.select(approx_count_distinct('primaryTitle', 0.1))
)

Unnamed: 0,approx_count_distinct(primaryTitle)
0,928817


In [59]:
display(
    movies_sdf.select(sum('runtimeMinutes')/60)
)

Unnamed: 0,(sum(runtimeMinutes) / 60)
0,829201.466667


In [60]:
display(
    movies_sdf.select(sum_distinct('startYear')/count_distinct('startYear'))
)

Unnamed: 0,(sum(DISTINCT startYear) / count(DISTINCT startYear))
0,1950.853147


In [61]:
display(
    movies_sdf.select(avg('averageRating'))
)

Unnamed: 0,avg(averageRating)
0,6.911798


In [62]:
display(
    movies_sdf.select(
        var_pop('averageRating'),
        var_samp('averageRating'),
        stddev_pop('averageRating'),
        stddev_samp('averageRating'))
)

Unnamed: 0,var_pop(averageRating),var_samp(averageRating),stddev_pop(averageRating),stddev_samp(averageRating)
0,1.936695,1.936697,1.391652,1.391653


In [63]:
display(
    movies_sdf.select(
        corr('startYear', 'averageRating'),
        covar_pop('startYear', 'averageRating'),
        covar_samp('startYear', 'averageRating')
    )
)

Unnamed: 0,"corr(startYear, averageRating)","covar_pop(startYear, averageRating)","covar_samp(startYear, averageRating)"
0,0.148062,4.396827,4.396831


## Collect List, Collect Set

In [64]:
display(
    movies_sdf.select(collect_set('genres'))
)

Unnamed: 0,collect_set(genres)
0,"[Music,Romance,Sport, Horror,Music,Short, Anim..."


In [65]:
display(
    movies_sdf.limit(10).select(collect_set('primaryTitle'))
)

Unnamed: 0,collect_set(primaryTitle)
0,"[The Lighthouse Keeper, Zigomar contre Nick Ca..."


## Grouping

In [71]:
# count() as action
display(
    movies_sdf.groupby('genres').count()
)

Unnamed: 0,genres,count
0,"Comedy,Sport",1056
1,"Action,Adventure,Fantasy",2888
2,"Documentary,Drama,Fantasy",28
3,"Crime,Horror,Short",92
4,"Animation,Sci-Fi,War",10
5,"Fantasy,Horror,Musical",6
6,"Adult,Horror,Sci-Fi",2
7,"Adult,Comedy,Musical",12
8,"Documentary,News,Reality-TV",11
9,"Animation,Sport,Thriller",27


In [73]:
# count() as transformation
display(
    movies_sdf
        .groupby('genres')
        .agg(count('*'))
)

Unnamed: 0,genres,count(1)
0,"Comedy,Sport",1056
1,"Action,Adventure,Fantasy",2888
2,"Documentary,Drama,Fantasy",28
3,"Crime,Horror,Short",92
4,"Animation,Sci-Fi,War",10
5,"Fantasy,Horror,Musical",6
6,"Adult,Horror,Sci-Fi",2
7,"Adult,Comedy,Musical",12
8,"Documentary,News,Reality-TV",11
9,"Animation,Sport,Thriller",27


In [24]:
# grouping with dict
agg_dict = {
    'averageRating' : 'avg',
    'startYear' : 'min'
}

display(
    movies_sdf
        .groupby('genres')
        .agg(agg_dict)
)

Unnamed: 0,genres,avg(averageRating),min(startYear)
0,"Comedy,Sport",7.185417,1916
1,"Action,Adventure,Fantasy",7.020152,1920
2,"Documentary,Drama,Fantasy",7.046429,1970
3,"Crime,Horror,Short",6.895652,1958
4,"Adult,Comedy,Musical",5.991667,1976
5,"Action,Adult,Short",5.5,1921
6,"Animation,Sport,Thriller",7.285185,2005
7,"Drama,Reality-TV,Short",6.5,2017
8,"Adventure,Family,Fantasy",6.939207,1910
9,"Game-Show,Reality-TV",6.989661,1956


### Grouping Sets

#### Rollups

In [None]:
display(
    movies_sdf
        .where('startYear >= 2021')
        .rollup('isAdult', 'startYear')
        .agg(avg('averageRating').alias('avg_rating'), grouping_id()) # grouping_id() gives the level of aggregation
        .orderBy('isAdult', 'startYear')
)

Unnamed: 0,isAdult,startYear,avg_rating,grouping_id()
0,,,7.155399,3
1,0.0,,7.155663,1
2,0.0,2021.0,7.123715,0
3,0.0,2022.0,7.287559,0
4,1.0,,6.634483,1
5,1.0,2021.0,6.569231,0
6,1.0,2022.0,7.2,0


In [None]:
movies_sdf.createOrReplaceTempView('movies')

In [None]:
%%sparksql
-- equivalent to above
select isAdult, startYear, avg(averageRating), grouping_id()
from movies
where startYear >= 2021
group by rollup(isAdult, startYear)
order by 1, 2

0,1,2,3
isAdult,startYear,avg(averageRating),grouping_id()
,,7.1553990770576075,3
0,,7.155662523958407,1
0,2021,7.12371483200285,0
0,2022,7.287559260239639,0
1,,6.634482794794543,1
1,2021,6.569230813246507,0
1,2022,7.199999968210856,0


In [None]:
%%sparksql
-- equivalent to above
select isAdult, startYear, avg(averageRating), grouping_id()
from movies
where startYear >= 2021
group by isAdult, startYear grouping sets ((isAdult, startYear), (isAdult), ())
order by 1, 2

0,1,2,3
isAdult,startYear,avg(averageRating),grouping_id()
,,7.1553990770576075,3
0,,7.155662523958407,1
0,2021,7.12371483200285,0
0,2022,7.287559260239639,0
1,,6.634482794794543,1
1,2021,6.569230813246507,0
1,2022,7.199999968210856,0


#### Cubes

In [None]:
display(
    movies_sdf
        .where('startYear >= 2021')
        .cube('isAdult', 'startYear')
        .agg(avg('averageRating').alias('avg_rating'), grouping_id())
        .orderBy('isAdult', 'startYear')
)

Unnamed: 0,isAdult,startYear,avg_rating,grouping_id()
0,,,7.155399,3
1,,2021.0,7.123403,2
2,,2022.0,7.287536,2
3,0.0,,7.155663,1
4,0.0,2021.0,7.123715,0
5,0.0,2022.0,7.287559,0
6,1.0,,6.634483,1
7,1.0,2021.0,6.569231,0
8,1.0,2022.0,7.2,0


In [None]:
%%sparksql
-- equivalent to above
select isAdult, startYear, avg(averageRating), grouping_id()
from movies
where startYear > 2020
group by cube(isAdult, startYear)
order by 1, 2

0,1,2,3
isAdult,startYear,avg(averageRating),grouping_id()
,,7.1553990770576075,3
,2021,7.123402696889341,2
,2022,7.287535773305081,2
0,,7.155662523958407,1
0,2021,7.12371483200285,0
0,2022,7.287559260239639,0
1,,6.634482794794543,1
1,2021,6.569230813246507,0
1,2022,7.199999968210856,0


In [None]:
%%sparksql
-- equivalent to above
select isAdult, startYear, avg(averageRating), grouping_id()
from movies
where startYear > 2020
group by isAdult, startYear grouping sets ((isAdult, startYear), (isAdult), (startYear), ())
order by 1, 2

0,1,2,3
isAdult,startYear,avg(averageRating),grouping_id()
,,7.1553990770576075,3
,2021,7.123402696889341,2
,2022,7.287535773305081,2
0,,7.155662523958407,1
0,2021,7.12371483200285,0
0,2022,7.287559260239639,0
1,,6.634482794794543,1
1,2021,6.569230813246507,0
1,2022,7.199999968210856,0


When using grouping sets, it's useful to think of what totals you want. For e.g. the above has totals by `isAdult`, by `startYear`, and the full total. 

## Window Functions

In [26]:
from pyspark.sql.window import Window

window_spec = (
    Window.partitionBy('genres')
        .orderBy(desc('startYear'))
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

window_spec

<pyspark.sql.window.WindowSpec at 0x7f0da6a4b040>

In [27]:
max_rating_till_year_genre = max(col('averageRating')).over(window_spec)

# max_rating_till_year_genre is a Column transformation; it won't be populated until it is used on a dataframe. 
max_rating_till_year_genre

Column<'max(averageRating) OVER (PARTITION BY genres ORDER BY startYear DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

In [30]:
display(
    movies_sdf.select('genres',
                        'startYear',
                        max_rating_till_year_genre.alias('maxRating'))               
)

Unnamed: 0,genres,startYear,maxRating
0,"Action,Adult,Fantasy",2018,7.7
1,"Action,Adult,Fantasy",2017,7.7
2,"Action,Adult,Fantasy",2016,7.7
3,"Action,Adult,Fantasy",2015,7.7
4,"Action,Adult,Fantasy",2012,7.7
5,"Action,Adult,Fantasy",2011,7.7
6,"Action,Adult,Fantasy",2007,7.7
7,"Action,Adult,Fantasy",1987,7.7
8,"Action,Adult,Short",1979,5.2
9,"Action,Adult,Short",1921,5.8


In [41]:
# dense_rank() gives ranks w/o any gap: e.g. 1, 1, 1, 2 instead of 1, 1, 1, 4
highest_rated = dense_rank().over(Window.partitionBy('genres')
                                        .orderBy(desc('averageRating')))

display(
    movies_sdf.select(col('primaryTitle').alias('bestMovie'), 'genres', 'startYear', highest_rated.alias('dense_rank'))
                .where('genres = "Action,Animation,Sci-Fi"')
                .orderBy('dense_rank')
)

Unnamed: 0,bestMovie,genres,startYear,dense_rank
0,The Race,"Action,Animation,Sci-Fi",2001,1
1,Extreme,"Action,Animation,Sci-Fi",2001,1
2,Fun in the Sun,"Action,Animation,Sci-Fi",2000,1
3,Strong Will,"Action,Animation,Sci-Fi",2021,2
4,Trouble in Paradise,"Action,Animation,Sci-Fi",2006,3
5,Shooting Stars,"Action,Animation,Sci-Fi",2001,3
6,Tango,"Action,Animation,Sci-Fi",2016,4
7,Captain,"Action,Animation,Sci-Fi",2021,4
8,Sphinxes,"Action,Animation,Sci-Fi",2000,4
9,Showdown,"Action,Animation,Sci-Fi",2021,4


## Pivot

In [42]:
display(
    movies_sdf
        .where('startYear >= 2018')
        .groupby('startYear')
        .pivot('titleType')
        .agg(avg('averageRating').alias('avg_rating'), max('averageRating').alias('max_rating'))
        .orderBy('startYear')
)

Unnamed: 0,startYear,movie_avg_rating,movie_max_rating,short_avg_rating,short_max_rating,tvEpisode_avg_rating,tvEpisode_max_rating,tvMiniSeries_avg_rating,tvMiniSeries_max_rating,tvMovie_avg_rating,tvMovie_max_rating,tvSeries_avg_rating,tvSeries_max_rating,tvShort_avg_rating,tvShort_max_rating,tvSpecial_avg_rating,tvSpecial_max_rating,video_avg_rating,video_max_rating,videoGame_avg_rating,videoGame_max_rating
0,2018,6.146288,10.0,7.116452,10.0,7.419993,10.0,7.007593,9.8,6.472547,9.8,6.890508,10.0,7.073438,9.7,6.534366,10.0,6.832857,10.0,6.921596,9.7
1,2019,6.1519,10.0,7.054535,10.0,7.443993,10.0,7.103475,10.0,6.439578,10.0,6.905398,10.0,6.951064,9.7,6.768992,10.0,6.878863,10.0,6.976515,9.6
2,2020,6.178183,10.0,7.160732,10.0,7.320513,10.0,7.21099,10.0,6.44011,9.8,6.888847,10.0,7.287879,9.6,6.735861,10.0,7.014048,10.0,7.047962,9.5
3,2021,6.336892,10.0,7.6095,10.0,7.318597,10.0,7.248395,9.8,6.386696,9.9,6.865513,10.0,6.847059,9.4,6.678125,10.0,7.474721,10.0,7.170996,9.7
4,2022,6.630151,10.0,8.207925,10.0,7.446491,10.0,7.19619,10.0,6.338764,10.0,7.054702,10.0,7.0,7.7,6.86125,9.6,7.845833,10.0,7.468085,9.5


In [43]:
# spark.stop()