In [None]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
race_dyf = (glueContext.create_dynamic_frame_from_options(connection_type='s3', 
                                                     connection_options={"paths":['s3://[BUCKET_NAME]/raw/race'],"recurse":True},
                                                     format="json",format_options={"multiLine": True}))
quali_dyf = (glueContext.create_dynamic_frame_from_options(connection_type='s3', 
                                                     connection_options={"paths":['s3://[BUCKET_NAME]/raw/qualifying'],"recurse":True},
                                                     format="json",format_options={"multiLine": True}))

In [None]:
race_df = race_dyf.toDF()
quali_df = quali_dyf.toDF()

In [15]:
def struct_unravel(nested_df):
    
    list_schema = [((), nested_df)]

    flat_columns = []

    while len(list_schema) > 0:

          parents, df = list_schema.pop()
          flat_cols = [  col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],))) for c in df.dtypes if c[1][:6] != "struct"   ]
      
          struct_cols = [  c[0]   for c in df.dtypes if c[1][:6] == "struct"   ]
      
          flat_columns.extend(flat_cols)
          for i in struct_cols:
                projected_df = df.select(i + ".*")
                list_schema.append((parents + (i,), projected_df))
    return nested_df.select(flat_columns)




In [None]:
def array_unnesting(df):
    array_cols = [c[0] for c in df.dtypes if c[1][:5]=="array"]
    while len(array_cols)>0:
        for c in array_cols:
            df = df.withColumn(c,explode_outer(c))
        df = struct_unravel(df)
        array_cols = [c[0] for c in df.dtypes if c[1][:5]=="array"]
    return df

In [None]:
race_df = array_unnesting(race_df)
quali_df = array_unnesting(quali_df)

In [None]:
def column_name_correction(df):    
    for i in df.columns:
        if "_url" in i:
            df = df.drop(i)
        else:
            temp = i.removeprefix("data_").lower()
            df = df.withColumnRenamed(i,temp)

In [None]:
column_name_correction(race_df)
column_name_correction(quali_df)

In [None]:
race_dyf = DynamicFrame.fromDF(race_df,glueContext,"race_dynamic_frame")
quali_dyf = DynamicFrame.fromDF(quali_df,glueContext,"quali_dynamic_frame")

In [21]:
s3outputRace = glueContext.getSink(
  path="s3://[BUCKET_NAME]/Processed/race/",
  connection_type="s3",
  updateBehavior="UPDATE IN DATABASE AND APPEND",
  partitionKeys=["season","round"],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3outputRace.setCatalogInfo(
  catalogDatabase="[DB_NAME]", catalogTableName="raceResults"
)
s3outputRace.setFormat("glueparquet")
s3outputRace.writeFrame(race_dyf)

<awsglue.dynamicframe.DynamicFrame object at 0x7f4eae1f4bd0>


In [None]:
s3outputRace = glueContext.getSink(
  path="s3://[BUCKET_NAME]/Processed/quali/",
  connection_type="s3",
  updateBehavior="APPEND",
  partitionKeys=["season","round"],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3outputRace.setCatalogInfo(
  catalogDatabase="[DB_NAME]", catalogTableName="qualiResults"
)
s3outputRace.setFormat("glueparquet")
s3outputRace.writeFrame(quali_dyf)