## Read Params

In [None]:
import datetime
import ast

startDate = datetime.datetime.now()

entity = str(getArgument("entity")).replace("'","")
print ("Param -\'entity': " + str(entity))

filesToProcessArg = str(getArgument("filesToProcess"))
filesToProcess = ast.literal_eval(filesToProcessArg)
print ("Param -\'filesToProcess': " + str(filesToProcess))

executionMode = str(getArgument("executionMode")).replace("'","")
print ("Param -\'executionMode': " + executionMode)

etlDateString = str(getArgument("etlDate"))
etlDate = datetime.datetime.strptime(etlDateString, "%Y-%m-%d  %H:%M:%S")
print ("Param -\'etlDate': " + str(etlDate))

dataSegment = 'pfu'
dataSegmentFriendlyName = 'PFU'


## Read RawData Datalake

In [None]:
try:
    mainData = spark.read.format("avro").load([file for file in filesToProcess if file.find("instance_BB") >= 0][0] + "*.avro")
    mainData = mainData.cache()
except:
    entityLog.exceptionMessage("Read raw datalake files")
    raise

## Arrange & format fields from sources

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

try:
    keyColumn = '{0}Id'.format(entity)

    mappingMainData ={keyColumn:keyColumn,'ID':'snowflakeId','SOURCE_ID':'sourceId','NAME':'name','DESCRIPTION':'description','TYPE':'type','STAGE':'stage','ROW_INSERTED_TIME':'rowInsertedDatetime','ROW_UPDATED_TIME':'rowUpdatedDatetime','ROW_DELETED_TIME':'rowDeletedDatetime','downloadDate':'downloadDate',}

    mainDataTransformed = mainData.withColumn("downloadDate",lit(etlDate))
    mainDataTransformed = mainDataTransformed.withColumn(keyColumn,col("ID"))

    columns2date = ["ROW_INSERTED_TIME","ROW_UPDATED_TIME","ROW_DELETED_TIME"]
    for colname in columns2date:
        mainDataTransformed = mainDataTransformed.withColumn(colname, to_timestamp(col(colname), 'yyyy-MM-dd HH:mm:ss.SSSSSSS'))


    mainDataTransformed = mainDataTransformed.select([col(x).alias(y) for x,y in mappingMainData.items()])

except:
    entityLog.exceptionMessage("Arrange & format fields from sources")
    raise

## Ingest to Datalake Parquet

In [None]:
from necsiaadbutils import *
from datetime import datetime

try:
    partitionConfig = {"partitionFormat":"repartition","partitionValue":20}
    entityDLT = DatalakeTransformation(dataSegment,dataSegmentFriendlyName,entity,etlDate,mainDataTransformed,loadType=executionMode)
    entityDLT.runFullTransformation(partitionConfig)
    print("Elapsed time: {0}".format(datetime.now() - startDate))
except:
    entityLog.exceptionMessage("Ingest to Datalake")
    raise