# Overview

This notebook represents a glue job that reads .CSV files from S3, converts them into Spark DataFrames and performs transformations on those DataFrames before writing the frames as a table in the delta lake. To run the notebook locally, comment out the additional *script_location* and *temp_dir* configurations.

In [None]:
%idle_timeout 2880
%glue_version 4.0
%number_of_workers 2
%worker_type G.1X

In [None]:
%%configure
{
    "datalake-formats": "delta",
    
    # Replace BUCKET_NAME with the name of your S3 bucket
    # Comment out these lines if running the notebook locally
    
    "script_location": "s3://your-name-delta-lake-project-1/scripts/",
    "temp_dir": "s3://your-name-delta-lake-project-1/scripts/temp/",
}

In [None]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext

from pyspark.sql.functions import col, lit, rank
from pyspark.sql.window import Window

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

In [None]:
BUCKET_NAME=''

In [None]:
df_total = glueContext.create_data_frame.from_catalog(
    database="delta_lake",
    table_name="top_performers_delta")

In [None]:
import boto3

s3_resource = boto3.resource('s3')

In [None]:
# Iterate through files in an s3 folder with boto3
bucket = s3_resource.Bucket(f'{BUCKET_NAME}')

for obj in bucket.objects.filter(Prefix='database/raw/', ):

    if not obj.key.endswith('.csv'):
        continue

    # Get year from file name
    year = int(obj.key.split('/')[2].split('.')[0])

    # Read csv file from s3 and convert to DataFrame
    df = glueContext.create_data_frame.from_options(
        connection_type='s3',
        connection_options={'paths': [f's3a://{BUCKET_NAME}/{obj.key}']},
        format='csv',
        format_options={'withHeader': True},
        transformation_ctx='datasource0')
    
    df = df.withColumn("FantasyPoints", df["FantasyPoints"].cast("double"))
    
    # Create a window specification to get top 12 players by position
    # Note: This will actually return all players that have a top 12 fantasy score
    # which means that there may be more than 12 players for a given position
    windowSpec = Window.partitionBy('Pos').orderBy(df['FantasyPoints'].desc())
    
    df_csv = df.filter(col("Pos").isin(["QB", "WR", "RB"])) \
        .select('Pos', 'Player', 'FantasyPoints') \
        .withColumn("rank", rank().over(windowSpec)) \
        .filter(col("rank") <= 12)
    
    # Clean up additional columns and add year
    df_csv = df_csv.withColumnRenamed("Pos", "Position") \
        .withColumn("Year", lit(year)) \
        .select("Year", "Position", "Player", "FantasyPoints")

    df_total = df_total.union(df_csv)

In [None]:
# Write pyspark dataframe to delta lake
df_total.write.format("delta").mode("overwrite").save(f"s3a://{BUCKET_NAME}/database/top_performers_delta")