In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import *
from functools import reduce  
from pyspark.sql import DataFrame
from pyspark.sql.window import Window

In [0]:
# Create the Schema
dataSchema = StructType([
    StructField("sender_id", StringType(), True),
    StructField("employee_ids", StringType(), True),
    StructField("assigned_date", StringType(), True),
    StructField("task_name", StringType(), True),
    StructField("task_details", StringType(), True),
    StructField("last_date", StringType(), True),
    StructField("severity", IntegerType(), True),
    StructField("cc", StringType(), True),
    StructField("time_after_cc", IntegerType(), True),
    StructField("bcc", StringType(), True),
    StructField("time_after_bcc", IntegerType(), True)
])

In [0]:
# function to Create DataFrame from all the files

def extract_df(element):
    base_data = spark.read \
       .option("header", True) \
       .schema(dataSchema) \
       .csv(element.path)
    
    # print(base_data)
    return base_data

In [0]:
# load the data by calling the function

data_loc_arr = dbutils.fs.ls("/mnt/rupamemailautomation/managersdata")
list_df = list(map(extract_df, data_loc_arr))

In [0]:
# display the data
# [display(df) for df in list_df]

In [0]:
# make a single df from all the available dfs
df = reduce(DataFrame.union, list_df)

In [0]:
# display(df)

In [0]:
df1 = df.withColumn("assigned_date", to_date("assigned_date", "dd.MM.yy").cast(DateType())) \
        .withColumn("last_date", to_date("last_date", "dd.MM.yy").cast(DateType()))

In [0]:
# display(df)
# display(df1)
# df1.printSchema()

In [0]:
df2 = df1.withColumn("employee_ids", split("employee_ids", ",")) \
         .select("*", explode("employee_ids").alias("employee_id")) \
         .drop("employee_ids")

In [0]:
# display(df2)

In [0]:
df2 = df2.withColumn("employee_id", trim(df2.employee_id))

In [0]:
# display(df2)

In [0]:
# add "completed" column with default value as False and no_of_times_mail_sent column with default value as 0

df3 = df2.withColumn("completed", lit(False)).withColumn("no_of_times_mail_sent", lit(0))

In [0]:
# display(df3)

In [0]:
# df3.select(col("sender_id"), col("employee_id"), col("assigned_date")).filter(col("employee_id") == "senior.manager.one@gofirst.onmicrosoft.com").display()

# df100 = df3.filter(col("employee_id") == "senior.manager.one@gofirst.onmicrosoft.com")

# window_spec_100 = Window.partitionBy(col("sender_id"), col("employee_id"), col("assigned_date")).orderBy(col("last_date"))

# df101 = df100.withColumn("rank_by_employee_sender_date", row_number().over(window_spec_100))

# df101.display()

In [0]:
window_spec = Window.partitionBy(col("sender_id"), col("employee_id"), col("assigned_date")).orderBy(col("last_date"))

In [0]:
df4 = df3.withColumn("rank_by_employee_sender_date", row_number().over(window_spec))

In [0]:
# display(df4)

In [0]:
task_id = regexp_replace(concat(substring_index(col("sender_id"), "@", 1), lit("-"), substring_index(col("employee_id"), "@", 1), lit("-"), regexp_replace(col("assigned_date").cast(StringType()), "-", ""), lit("-"), col("rank_by_employee_sender_date").cast(StringType())),  "\s+", "")

df5 = df4.withColumn("task_id", task_id)

In [0]:
display(df5)

In [0]:
# df5.printSchema()

In [0]:
### Write the data on the Azure SQL Database (Datamart)
### sqldburl

sqldburl = dbutils.secrets.get(scope = "databricks-scope", key = "databricks-app-sqldb-url")
user = dbutils.secrets.get(scope = "databricks-scope", key = "databricks-scope-sqldb-user")
password = dbutils.secrets.get(scope = "databricks-scope", key = "databricks-scope-sqldb-password")

In [0]:
# Assuming 'df' is the DataFrame you want to write to SQL
# 'sqldburl' is the URL of your Azure SQL database, including the database name
# 'demo' is the name of the table you want to create in your Azure SQL database

# Configure the JDBC connection properties
jdbc_url = sqldburl
table_name = "task_details"
properties = {
    "user": user,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [0]:
# Write the DataFrame to SQL using the JDBC connector
df5.write.jdbc(url=jdbc_url, table=table_name, mode="append", properties=properties)

In [0]:
dbutils.notebook.exit("success")