In [None]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

In [None]:
import sys

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.context import SparkContext
from pyspark.sql import SparkSession, functions as F, Window

D_PLAYER_INFO_COLS = [
    "player_id",
    "active",
    "batSide_code",
    "batSide_description",
    "birthCity",
    "birthCountry",
    "birthDate",
    "boxscoreName",
    "currentAge",
    "draftYear",
    "firstLastName",
    "firstName",
    "height",
    "lastName",
    "lastPlayedDate",
    "link",
    "middleName",
    "mlbDebutDate",
    "nameSlug",
    "pitchHand_code",
    "pitchHand_description",
    "primaryNumber",
    "primaryPosition_abbreviation",
    "primaryPosition_code",
    "primaryPosition_name",
    "primaryPosition_type",
    "weight",
]

In [None]:
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
df = glueContext.create_dynamic_frame.from_catalog(
    database="zavant", table_name="game_players"
).toDF()
df = df.withColumn(
    "max_game_pk", F.max("game_pk").over(Window.partitionBy("player_id"))
)
df = df.filter(F.col("game_pk") == F.col("max_game_pk")).drop("max_game_pk")
df = df.select(D_PLAYER_INFO_COLS)

In [None]:
s3output = glueContext.getSink(
    path="s3://zavant-datamart/d-player-info",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="s3output",
)
s3output.setCatalogInfo(catalogDatabase="zavant", catalogTableName="d_player_info")
s3output.setFormat("glueparquet")
s3output.writeDataFrame(df, glueContext)