In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from loguru import logger

In [2]:
spark: SparkSession = (
            SparkSession.builder.master("spark://spark-master:7077")
            .appName("tft-analyzer")
            .config("spark.driver.extraJavaOptions", "-Duser.timezone=GMT")
            .config("spark.executor.extraJavaOptions", "-Duser.timezone=GMT")
            .config("spark.sql.session.timeZone", "UTC")
            .config(
                "spark.jars.packages",
                "io.delta:delta-core_2.12:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-avro_2.12:3.3.0",
            )
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config(
                "spark.delta.logStore.class",
                "org.apache.spark.sql.delta.storage.HDFSLogStore",
            )
            .config(
                "spark.sql.catalog.spark_catalog",
                "org.apache.spark.sql.delta.catalog.DeltaCatalog",
            )
            .config(
                "spark.sql.warehouse.dir", "hdfs://namenode:9000/user/hive/warehouse"
            )
            .config(
                "spark.hadoop.javax.jdo.option.ConnectionURL",
                "jdbc:postgresql://hive-metastore-postgresql/metastore",
            )
            .config(
                "spark.hadoop.javax.jdo.option.ConnectionDriverName",
                "org.postgresql.Driver",
            )
            .config("spark.hadoop.javax.jdo.option.ConnectionUserName", "hive")
            .config("spark.hadoop.javax.jdo.option.ConnectionPassword", "hive")
            .config("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083")
            .config("spark.pyspark.python", "python3")
            .enableHiveSupport()
            .getOrCreate()
        )

In [8]:
def read_delta(table: str, spark):
    logger.info(f'Started reading from "{table}"...')
    df: DataFrame = spark.read.format("delta").table(table)
    logger.info(f'Finished reading from "{table}".')
    return df

In [9]:
composition_metrics = read_delta("gold.composition_metrics", spark)

[32m2024-01-25 02:07:58.116[0m | [1mINFO    [0m | [36m__main__[0m:[36mread_delta[0m:[36m2[0m - [1mStarted reading from "gold.composition_metrics"...[0m
[32m2024-01-25 02:08:09.843[0m | [1mINFO    [0m | [36m__main__[0m:[36mread_delta[0m:[36m4[0m - [1mFinished reading from "gold.composition_metrics".[0m


In [11]:
composition_metrics.show(25, False)

+-----------------+-----------------+--------------------+------------------+-------------------+------------------+------------------+----+
|trait1           |trait2           |strength            |avg_placement     |pick_rate          |top_4_rate        |top_1_rate        |rank|
+-----------------+-----------------+--------------------+------------------+-------------------+------------------+------------------+----+
|Set10_Dazzler    |Set10_Funk       |0.0958270017278458  |4.638658774536127 |0.11490588949605345|42.9611790403181  |9.594945542146233 |1   |
|Set10_Hyperpop   |Set10_Funk       |0.08321607780839112 |3.5652207159070763|0.13615664845173042|67.17268301652224 |21.580913623962164|2   |
|Set10_Brawler    |Set10_Funk       |0.054045472351984085|4.661235243123546 |0.12234365513054038|46.51819872409271 |12.241406786490645|3   |
|Set10_Fighter    |Set10_Country    |0.04434958223544184 |4.008341508372336 |0.09517304189435337|58.45189423417696 |18.58585983142065 |4   |
|Set10_Brawle