# Import Libraries

In [37]:
%run utilities/global_class

In [38]:
%run utilities/package

In [39]:
#staging file format
container_name = "hanifsystem"
account_name = "hanifdatalake"
stg_fileformat = "parquet"
stg_delimiter = ","
stg_withheader = "true"
stg_main_folder = "synapse/workspaces/AdventureWorks2022/humanresources_shift"
#curated file format
cur_fileformat = "delta"
cur_delimiter = ","
cur_withheader = "true"
cur_main_folder = "synapse/workspaces/curated/AdventureWorks/humanresources_shift"
#mart file format
mart_fileformat = "parquet"
mart_delimiter = ","
mart_withheader = "true"
mart_main_folder = "synapse/workspaces/datamart/AdventureWorks2022/humanresources_shift"

In [40]:
start_date = datetime.strftime(datetime.now(), '%Y%m%d')
ingest_range_day_structured = -2

date_1 = datetime.strptime(start_date, '%Y%m%d')
result_1 = date_1 + timedelta(days = ingest_range_day_structured)
filter_date = result_1.strftime('%Y%m%d')

print(filter_date)

In [41]:
set_stg_path = PathConstructor(container_name, account_name, stg_main_folder)
stg_path = set_stg_path.pathconstructor()
print(stg_path)
set_cur_path = PathConstructor(container_name, account_name, cur_main_folder)
cur_path = set_cur_path.pathconstructor()
print(cur_path)

# Staging Section

In [42]:
#df_stg_without_schema
df_stg_without_schema = spark.read.format(stg_fileformat)\
.option('header',stg_withheader)\
.option('inferSchema','true')\
.load(stg_path)
display(df_stg_without_schema.limit(10))

In [43]:
df_stg_without_schema.printSchema()

In [44]:
col_name = df_stg_without_schema.columns
lower_col_name = [name.lower() for name in col_name]
df_stg = df_stg_without_schema.toDF(*lower_col_name)

In [45]:
df_stg.printSchema()

In [46]:
display(df_stg.limit(10))

In [47]:
df_stg =df_stg.withColumnRenamed('modifieddate','modifiedutcdate')
df_stg.printSchema()

In [48]:
partition_column = ["shiftid"]

df_stg_final = df_stg.withColumn("rank", row_number() \
                                        .over(Window.partitionBy(*partition_column) \
                                        .orderBy(desc("modifiedutcdate")))) \
                     .withColumn("curated_date", F.lit(datetime.now())) \
                     .where("rank == 1").drop("rank")

# Curated Section

In [49]:
try:
    set_df_cur = ReadFile(cur_path, cur_fileformat, cur_delimiter, cur_withheader)
    df_cur = set_df_cur.readfrompath()
except Exception as ex:
    df_final = df_stg_final.coalesce(1)
    df_final.write.format('delta') \
            .mode('overwrite') \
            .save(cur_path)

    set_df_cur = ReadFile(cur_path, cur_fileformat, cur_delimiter, cur_withheader)
    df_cur = set_df_cur.readfrompath()

In [50]:
display(df_cur)

In [51]:
df_cur.createOrReplaceTempView("targetTableName")
df_stg_final.createOrReplaceTempView("updatesTableName")

In [52]:
df_cur.printSchema()

In [53]:
spark.sql("""
        MERGE INTO targetTableName
        USING updatesTableName
        ON date_format(updatesTableName.modifiedutcdate, 'yyyy-MM-dd') >= TO_DATE('{0}','yyyyMMdd') AND
           targetTableName.shiftid = updatesTableName.shiftid
        WHEN MATCHED THEN UPDATE SET
                targetTableName.name = updatesTableName.name,
                targetTableName.starttime = updatesTableName.starttime,
                targetTableName.endtime = updatesTableName.endtime,
                targetTableName.modifiedutcdate = updatesTableName.modifiedutcdate,
                targetTableName.curated_date = updatesTableName.curated_date                                
        WHEN NOT MATCHED AND (date_format(updatesTableName.modifiedutcdate, 'yyyy-MM-dd') >= TO_DATE('{1}','yyyyMMdd')) THEN INSERT * """.format(filter_date, filter_date)
)

# Mart Section

In [54]:
def WriteFile(df, final_path):
    try: 
        spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs","false")
        df_final = df.coalesce(1)
        df_final.write.format('parquet') \
                .mode('overwrite') \
                .save(final_path)
                                        
        print("Write Success")
    except Exception as ex:
        print("Write Failed", str(ex))

In [55]:
set_mart_path = PathConstructor(container_name, account_name, mart_main_folder)
mart_path = set_mart_path.pathconstructor()

print("Source: ", cur_path, "Mart: ", mart_path)

In [56]:
#Create empty dataframe
df_mart = spark.createDataFrame([], StructType([]))
df_mart = df_cur

# print(df_mart.count())

In [57]:
from dateutil.relativedelta import relativedelta

periode = datetime.now() + relativedelta(months = -2)
periode = periode.strftime("%Y-%m") + "-01"
print(periode)
df_mart_final = df_mart.filter(col("curated_date") >= (lit(periode)))

In [58]:
partition_list = df_mart_final.select(year(col("curated_date")).alias("year"), month(col("curated_date")) \
.alias("month")).dropDuplicates().orderBy(col("year").asc()).orderBy(col("month").asc()).collect()

print(partition_list)

In [59]:
for partition in partition_list:
    final_path = mart_path + '/' + str(partition.year) + str(partition.month).zfill(2)
    print('Partition path', final_path)

    try:
        mssparkutils.fs.rm(final_path, True)
    except Exception as e:
        pass

    df_final = df_mart_final.filter(year(col("curated_date")) == partition.year).filter(month(col("curated_date")) == partition.month)

    WriteFile(df_final, final_path)

In [60]:
display(df_final)

In [61]:
df_final.printSchema()