In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_unixtime, year, month, day, col, to_timestamp

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

awsRegion = ""
glueDatabaseName = ""
glueTableName = ""

sourceDirectory = "s3://" # raw, unpartitioned data
destinationDirectory = "s3://" # final partitioned resting space

# Epochseconds column name -- if you are using another date/time format ensure you change the SQL operator for parsing
epochColumn = ""

In [None]:
# Load DF, convert Epochseconds to timestamp
df = spark.read.format("parquet").option("compression", "zstd").load(sourceDirectory)
df = df.withColumn(epochColumn, from_unixtime(epochColumn))

In [None]:
# Extract year, month, and day from epoch seconds column for partitioning
df = df.withColumn("Year", year(epochColumn))
df = df.withColumn("Month", month(epochColumn))
df = df.withColumn("Day", day(epochColumn))

In [None]:
from pyspark.sql.types import TimestampType

# ensure that converted time column is cast as a timestamp correctly
df = df.withColumn(epochColumn, to_timestamp(epochColumn).cast(TimestampType()))

# write out to destination in append mode
df.write.partitionBy("Year", "Month", "Day").format("parquet").option("compression", "zstd").mode("append").save(destinationDirectory)

# read the unique partitions
df = spark.read.format("parquet").load(destinationDirectory)
partitions = df.select("Year", "Month", "Day").distinct().collect()

# read the schema
schema = df.schema
print(schema)

In [None]:
# setup deps for creating Glue Table
import boto3
from pyspark.sql.types import *

glue = boto3.client("glue", region_name=awsRegion)

# convert Spark DF types -> Athena engine v3 (Trino-ish?) types
def sparkDataTypeToAthenaDataType(sparkDataType):
    mapping = {
        IntegerType: "int",
        LongType: "bigint",
        DoubleType: "double",
        FloatType: "float",
        StringType: "string",
        BooleanType: "boolean",
        DateType: "date",
        TimestampType: "timestamp",
    }
    return mapping.get(type(sparkDataType), "string")  # Default to string type if unknown

In [None]:

def getGlueTableColumns(schema, partitionKeys):
    columns = []

    for field in schema.fields:
        if field.name not in partitionKeys:  # Skip partition keys
            athenaDataType = sparkDataTypeToAthenaDataType(field.dataType)
            columns.append({"Name": field.name, "Type": athenaDataType})
    
    return columns

def createGlueTable(glueDatabaseName, glueTableName, columns, partitionKeys, destinationDirectory):
    glue.create_table(
        DatabaseName=glueDatabaseName,
        TableInput={
            "Name": glueTableName,
            "StorageDescriptor": {
                "Columns": columns,
                "Location": destinationDirectory,
                "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
                "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
                "SerdeInfo": {
                    "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
                },
            },
            "PartitionKeys": [{"Name": key, "Type": "string"} for key in partitionKeys],
            "TableType": "EXTERNAL_TABLE"
        }
    )

# get the columns, sans partitions
partitionKeys = ["Year", "Month", "Day"]
columns = getGlueTableColumns(schema, partitionKeys)

In [None]:
try:
    c = createGlueTable(glueDatabaseName, glueTableName, columns, partitionKeys, destinationDirectory)
    print("table create successfully")
    print(c)
except Exception as e:
    raise e

In [None]:
def addPartitionsToTable(glueDatabaseName, glueTableName, partitions, destinationDirectory):
    partitionInputs = []
    for partition in partitions:
        year, month, day = partition["Year"], partition["Month"], partition["Day"]

        # Construct the s3 uri for this specific partition
        partitionLocation = f"{destinationDirectory}/Year={year}/Month={month}/Day={day}"
        partitionInput = {
            "Values": [str(year), str(month), str(day)],
            "StorageDescriptor": {
                "Columns": [],  # This can be empty as columns are defined at the table level
                "Location": partitionLocation,
                "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
                "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
                "SerdeInfo": {
                    "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
                }
            }
        }
        partitionInputs.append(partitionInput)

    # Use the AWS Glue client to batch create partitions
    try:
        glue.batch_create_partition(
            DatabaseName=glueDatabaseName,
            TableName=glueTableName,
            PartitionInputList=partitionInputs
        )
    except Exception as e:
        raise e

def createUniqueChunks(data, maxPartitionCount=95):
    unique_data = list(set(data))  # Remove duplicates to ensure uniqueness
    chunks = [unique_data[i:i + maxPartitionCount] for i in range(0, len(unique_data), maxPartitionCount)]
    return chunks

def partitionAndProcess(data):
    if len(data) > 95:
        chunks = createUniqueChunks(data)
        for chunk in chunks:
            addPartitionsToTable(glueDatabaseName, glueTableName, chunk, destinationDirectory)
    else:
        process_chunk(data)

# Using the partitions collected from your DataFrame - split them if there are more than 95 and bulk add the data to the table
partitionChunk = partitionAndProcess(partitions)

In [None]:
spark.stop()