In [0]:
from pyspark.sql.functions import col, min

In [0]:
runs_fact_df = spark.read.format("delta").table("dea_speedrun.gold.runs_fact")
users_dim_df = spark.read.format("delta").table("dea_speedrun.gold.users_dim")
guests_dim_df = spark.read.format("delta").table("dea_speedrun.gold.guests_dim")
games_dim_df = spark.read.format("delta").table("dea_speedrun.gold.games_dim")
reviewers_dim_df = spark.read.format("delta").table("dea_speedrun.gold.reviewers_dim")
categories_dim_df = spark.read.format("delta").table("dea_speedrun.gold.categories_dim")
levels_dim_df = spark.read.format("delta").table("dea_speedrun.gold.levels_dim")

In [0]:
runs_status_df = runs_fact_df.groupBy("status").count()

display(runs_status_df)

runs_status_df.write.mode("overwrite").format("delta").saveAsTable("dea_speedrun.metrics.runs_by_status")

In [0]:
%sql
SELECT * FROM dea_speedrun.metrics.runs_by_status

Databricks visualization. Run in Databricks to view.

In [0]:
runs_by_game = runs_fact_df.groupBy("game_sk").count()
runs_by_game_name = runs_by_game.join(games_dim_df, runs_by_game.game_sk == games_dim_df.game_sk).select("name", "count")
display(runs_by_game_name)

runs_by_game_name.write.mode("overwrite").format("delta").saveAsTable("dea_speedrun.metrics.most_played_games")

In [0]:
%sql
SELECT * FROM dea_speedrun.metrics.most_played_games ORDER BY count DESC LIMIT 25

Databricks visualization. Run in Databricks to view.

In [0]:
display(runs_fact_df)

In [0]:
most_active_reviewers = runs_fact_df.groupBy("reviewer_sk").count().dropna() \
                                    .join(reviewers_dim_df, runs_fact_df.reviewer_sk == reviewers_dim_df.reviewer_sk) \
                                    .select("name", "count").orderBy("count", ascending=False)
display(most_active_reviewers)

most_active_reviewers.write.mode("overwrite").format("delta").saveAsTable("dea_speedrun.metrics.most_active_reviewers")

In [0]:
%sql
SELECT * FROM dea_speedrun.metrics.most_active_reviewers LIMIT 25

Databricks visualization. Run in Databricks to view.

In [0]:
display(levels_dim_df)

In [0]:
top25games = [game.name for game in runs_by_game_name.orderBy("count", ascending=False).limit(25).select("name").collect()]
categories_renamed_df = categories_dim_df.withColumnRenamed("name", "category_name")
levels_renamed_df = levels_dim_df.withColumnRenamed("name", "level_name")

top25games_df = runs_fact_df.join(games_dim_df, runs_fact_df.game_sk == games_dim_df.game_sk) \
                            .join(categories_renamed_df, runs_fact_df.category_sk == categories_renamed_df.category_sk) \
                            .join(levels_renamed_df, runs_fact_df.level_sk == levels_renamed_df.level_sk, "left") \
                            .filter(col("name").isin(top25games))

top25games_fastest_times = top25games_df.groupBy("name", "level_name", "category_name").agg(min("primary_time").alias("fastest_time")).sort("name")
display(top25games_fastest_times)

top25games_fastest_times.write.mode("overwrite").option("mergeSchema", "true").format("delta").saveAsTable("dea_speedrun.metrics.top25games_fastest_times")

In [0]:
%sql
SELECT * FROM dea_speedrun.metrics.top25games_fastest_times

Databricks visualization. Run in Databricks to view.