### **Ingest Teams JSON**

### Configuaration

In [1]:
%run /utils/general_functions

In [11]:
create_mounts()

### Define Schema

In [12]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, TimestampType

teams_schema = StructType([
    StructField('id', IntegerType(), False),
    StructField('abbreviation', StringType(), True),
    StructField('city', StringType(), True),
    StructField('conference', StringType(), True),
    StructField('division', StringType(), True),
    StructField('full_name', StringType(), True),
    StructField('name', StringType(), True)
])

teams_gold_schema = StructType([
    StructField('id', IntegerType(), False),
    StructField('abbreviation', StringType(), True),
    StructField('city', StringType(), True),
    StructField('conference', StringType(), True),
    StructField('division', StringType(), True),
    StructField('full_name', StringType(), True),
    StructField('name', StringType(), True),
    StructField('is_active', BooleanType(), True),
    StructField('eff_start_date', TimestampType(), True),
    StructField('eff_end_date', TimestampType(), True),
    StructField('team_key', IntegerType(), True)
])

### Read Teams File

In [13]:
job_id = mssparkutils.env.getJobId()

# Teams in gold container
teams_gold_df = None

# Teams in bronze container
teams_df = spark.read.json(f'synfs:/{job_id}/mnt/bronze/teams/teams.json', teams_schema)

for table in spark.catalog.listTables('prize_picks_gold'):
    if table.name == 'dim_team':
        teams_gold_df = spark.read.format('delta').load(f'synfs:/{job_id}/mnt/gold/dim_team')
    else:
        teams_gold_df = spark.createDataFrame([], teams_gold_schema)

# New teams
teams_diff_df = teams_df.exceptAll(teams_gold_df.drop('is_active').drop('eff_start_date').drop('eff_end_date').drop('team_key'))

### Transformation

Get Max team_key From Gold Container

In [14]:
from pyspark.sql.functions import max

max_value = None

if teams_gold_df.count() > 0:
    max_value_df = teams_gold_df.agg(max(teams_gold_df.team_key))
    max_value = max_value_df.collect()[0][0]

max_team_key = max_value if max_value is not None else 0

Add Date and key attributes

In [15]:
from pyspark.sql.functions import lit, row_number, current_timestamp
from pyspark.sql import Window

teams_diff_updated_df = None
spec = Window.orderBy(teams_diff_df.full_name.asc())

teams_diff_updated_df = teams_diff_df \
    .withColumn('is_active', lit(True)) \
    .withColumn('eff_start_date', current_timestamp()) \
    .withColumn('eff_end_date', lit('1900-01-01 00:00:00.000').cast('timestamp')) \
    .withColumn('team_key', row_number().over(spec) + max_team_key)


In [16]:
teams_gold_df = teams_gold_df.withColumnRenamed('team_id', 'id')

combined_teams_df = teams_gold_df.unionByName(teams_diff_updated_df)

In [17]:
max_date_df = combined_teams_df.groupBy(combined_teams_df.id.alias('groupby_id')) \
    .agg(max(combined_teams_df.eff_start_date).alias('max_date')) 

Mark Rows With A Dimension Change

In [18]:
teams_scd_df = combined_teams_df.join(max_date_df, (combined_teams_df.id == max_date_df.groupby_id) & (combined_teams_df.eff_start_date == max_date_df.max_date), 'left') \
    .drop(max_date_df.groupby_id)

Update Dimensions

In [19]:
from pyspark.sql.functions import when, col 

teams_final_df = teams_scd_df.withColumnRenamed('id', 'team_id') \
    .withColumn('is_active', when(col('max_date').isNull(), lit(False)).otherwise(col('is_active'))) \
    .withColumn('eff_end_date', when(col('max_date').isNull(), current_timestamp()).otherwise(col('eff_end_date'))) \
    .drop('max_date')

### Write file as table

In [None]:
%%sql
DROP TABLE IF EXISTS prize_picks_silver.dim_team;

In [20]:
container = 'silver'
database = 'prize_picks_silver'
table = 'dim_team'
file_format = 'delta'
merge_condition = 'tgt.team_key == src.team_key'

merge_data(teams_final_df, container, database, table, file_format, merge_condition=merge_condition)