# Cleaning And Transformations

### Creating Spark Session

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CapstoneSession") \
    .config("fs.azure.account.key.<redacted>.blob.core.windows.net", "redacted") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()



### Connecting to cosmos DB

In [0]:
endpoint = 'https://<redacted>.documents.azure.com:443/'
accountkey = '<redacted>'
database = 'auditdatabase'
container = 'auditcapstone'

df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", endpoint) \
    .option("spark.cosmos.accountKey", accountkey) \
    .option("spark.cosmos.database", database) \
    .option("spark.cosmos.container", container) \
    .load()

In [0]:
# cache the data for optimization

cached_df = df.cache()

In [0]:
# check if the data is complete

cached_df.count()

141712

### Data Cleaning

In [0]:
# drop the id column added by cosmos
cached_df = cached_df.drop('id')

In [0]:
# drop duplicate values in the dataframe
cached_df  = cached_df.dropDuplicates()

In [0]:
# count after removing duplicates

cached_df.count()

141712

In [0]:
# reorder the columns as per the original document

selected_columns = [
    "number", "incident_state", "active", "reassignment_count", "reopen_count", "sys_mod_count", 
    "made_sla", "caller_id", "opened_by", "opened_at", "sys_created_by", "sys_created_at", 
    "sys_updated_by", "sys_updated_at", "contact_type", "location", "category", "subcategory", 
    "u_symptom", "cmdb_ci", "impact", "urgency", "priority", "assignment_group", "assigned_to", 
    "knowledge", "u_priority_confirmation", "notify", "problem_id", "rfc", "vendor", 
    "caused_by", "closed_code", "resolved_by", "resolved_at", "closed_at"
]

cached_df = cached_df.select(*selected_columns)


In [0]:
# remove the unwanted row (added as a reference to cosmos)

from pyspark.sql.functions import to_timestamp, col
cached_df = cached_df.where((col('caused_by') != 'code 5'))


In [0]:
# sort each incidents by the order of occurance
 
from pyspark.sql.functions import to_timestamp, col, expr

sort_order = {
    "Active": 1,
    "New": 2,
    "Awaiting User Info": 3,
    "Awaiting Evidence": 4,
    "Awaiting Vendor": 5,
    "Awaiting Problem": 6,
    "Resolved": 7,
    "Closed": 8,
    "-100": 9,
}

df_with_order = cached_df.withColumn("sort_order", expr(f"""
    CASE incident_state
        {' '.join([f'WHEN "{key}" THEN {value}' for key, value in sort_order.items()])}
        ELSE 99  -- Default value for unexpected states
    END
"""))

# Sort the DataFrame by 'number', 'sys_mod_at', and the new 'sort_order'
sorted_df = df_with_order.orderBy("number", "sys_updated_at", "sort_order")

# Drop the helper column if necessary
sorted_df = sorted_df.drop("sort_order")

In [0]:
sorted_df.toPandas()

Unnamed: 0,number,incident_state,active,reassignment_count,reopen_count,sys_mod_count,made_sla,caller_id,opened_by,opened_at,sys_created_by,sys_created_at,sys_updated_by,sys_updated_at,contact_type,location,category,subcategory,u_symptom,cmdb_ci,impact,urgency,priority,assignment_group,assigned_to,knowledge,u_priority_confirmation,notify,problem_id,rfc,vendor,caused_by,closed_code,resolved_by,resolved_at,closed_at
0,INC0000045,New,True,0,0,0,True,Caller 2403,Opened by 8,29/2/2016 01:16,Created by 6,29/2/2016 01:23,Updated by 21,29/2/2016 01:23,Phone,Location 143,Category 55,Subcategory 170,Symptom 72,?,2 - Medium,2 - Medium,3 - Moderate,Group 56,?,True,False,Do Not Notify,?,?,?,?,code 5,Resolved by 149,29/2/2016 11:29,5/3/2016 12:00
1,INC0000045,Resolved,True,0,0,2,True,Caller 2403,Opened by 8,29/2/2016 01:16,Created by 6,29/2/2016 01:23,Updated by 642,29/2/2016 08:53,Phone,Location 143,Category 55,Subcategory 170,Symptom 72,?,2 - Medium,2 - Medium,3 - Moderate,Group 56,?,True,False,Do Not Notify,?,?,?,?,code 5,Resolved by 149,29/2/2016 11:29,5/3/2016 12:00
2,INC0000045,Resolved,True,0,0,3,True,Caller 2403,Opened by 8,29/2/2016 01:16,Created by 6,29/2/2016 01:23,Updated by 804,29/2/2016 11:29,Phone,Location 143,Category 55,Subcategory 170,Symptom 72,?,2 - Medium,2 - Medium,3 - Moderate,Group 56,?,True,False,Do Not Notify,?,?,?,?,code 5,Resolved by 149,29/2/2016 11:29,5/3/2016 12:00
3,INC0000045,Closed,False,0,0,4,True,Caller 2403,Opened by 8,29/2/2016 01:16,Created by 6,29/2/2016 01:23,Updated by 908,5/3/2016 12:00,Phone,Location 143,Category 55,Subcategory 170,Symptom 72,?,2 - Medium,2 - Medium,3 - Moderate,Group 56,?,True,False,Do Not Notify,?,?,?,?,code 5,Resolved by 149,29/2/2016 11:29,5/3/2016 12:00
4,INC0000047,Active,True,1,0,5,True,Caller 2403,Opened by 397,29/2/2016 04:40,Created by 171,29/2/2016 04:57,Updated by 332,1/3/2016 09:14,Phone,Location 165,Category 40,Subcategory 215,Symptom 471,?,2 - Medium,2 - Medium,3 - Moderate,Group 24,Resolver 31,True,False,Do Not Notify,?,?,?,?,code 5,Resolved by 81,1/3/2016 09:52,6/3/2016 10:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
141707,INC0120835,Closed,False,1,0,4,True,Caller 116,Opened by 12,16/2/2017 09:09,?,?,Updated by 27,16/2/2017 09:53,Email,Location 204,Category 42,Subcategory 223,Symptom 494,?,2 - Medium,2 - Medium,3 - Moderate,Group 31,Resolver 10,False,True,Do Not Notify,?,?,?,?,code 9,Resolved by 9,16/2/2017 09:53,16/2/2017 09:53
141708,INC0121064,Active,True,0,0,0,True,Caller 116,Opened by 12,16/2/2017 14:17,?,?,Updated by 908,16/2/2017 14:17,Email,Location 204,Category 42,Subcategory 223,Symptom 494,?,2 - Medium,2 - Medium,3 - Moderate,Group 70,Resolver 10,False,False,Do Not Notify,?,?,?,?,code 6,Resolved by 9,16/2/2017 16:38,16/2/2017 16:38
141709,INC0121064,Active,True,1,0,1,True,Caller 116,Opened by 12,16/2/2017 14:17,?,?,Updated by 60,16/2/2017 15:20,Email,Location 204,Category 42,Subcategory 223,Symptom 494,?,2 - Medium,2 - Medium,3 - Moderate,Group 31,?,False,False,Do Not Notify,?,?,?,?,code 6,Resolved by 9,16/2/2017 16:38,16/2/2017 16:38
141710,INC0121064,Resolved,True,1,0,2,True,Caller 116,Opened by 12,16/2/2017 14:17,?,?,Updated by 27,16/2/2017 16:38,Email,Location 204,Category 42,Subcategory 223,Symptom 494,?,2 - Medium,2 - Medium,3 - Moderate,Group 31,Resolver 10,False,True,Do Not Notify,?,?,?,?,code 6,Resolved by 9,16/2/2017 16:38,16/2/2017 16:38


### Handling Null Values

In [0]:
# replace question marks with Null
sorted_df = sorted_df.na.replace("?", None)

In [0]:
total_rows = sorted_df.count()

null_count = sorted_df.select([(F.count(F.when(F.col(c).isNull(), c)) / total_rows * 100).alias(c) for c in sorted_df.columns])

# Convert the result to a Pandas DataFrame
null_count_pd = null_count.toPandas()

# Filter columns with null counts greater than 0
null_count_pd.loc[:, null_count_pd.iloc[0] > 0]

Unnamed: 0,caller_id,opened_by,sys_created_by,sys_created_at,location,category,subcategory,u_symptom,cmdb_ci,assignment_group,assigned_to,problem_id,rfc,vendor,caused_by,closed_code,resolved_by,resolved_at
0,0.020464,3.411849,37.453427,37.453427,0.05363,0.055041,0.078328,23.261262,99.685983,10.029496,19.402732,98.380518,99.300694,99.82782,99.98377,0.503839,0.159478,2.216467


In [0]:
# columns that having more than 90% of null values are removed
null_removed = sorted_df.drop("problem_id", "rfc", "vendor","caused_by","cmdb_ci")

In [0]:
cleaned = null_removed.fillna('Unknown')

In [0]:
total_rows = sorted_df.count()

null_count = cleaned.select([(F.count(F.when(F.col(c).isNull(), c)) / total_rows * 100).alias(c) for c in null_removed.columns])

null_count_pd = null_count.toPandas()

null_count_pd.loc[:, null_count_pd.iloc[0] > 0]

0


### Type Casting

In [0]:
cleaned.printSchema()

root
 |-- number: string (nullable = false)
 |-- incident_state: string (nullable = false)
 |-- active: string (nullable = false)
 |-- reassignment_count: string (nullable = false)
 |-- reopen_count: string (nullable = false)
 |-- sys_mod_count: string (nullable = false)
 |-- made_sla: string (nullable = false)
 |-- caller_id: string (nullable = false)
 |-- opened_by: string (nullable = false)
 |-- opened_at: string (nullable = false)
 |-- sys_created_by: string (nullable = false)
 |-- sys_created_at: string (nullable = false)
 |-- sys_updated_by: string (nullable = false)
 |-- sys_updated_at: string (nullable = false)
 |-- contact_type: string (nullable = false)
 |-- location: string (nullable = false)
 |-- category: string (nullable = false)
 |-- subcategory: string (nullable = false)
 |-- u_symptom: string (nullable = false)
 |-- impact: string (nullable = false)
 |-- urgency: string (nullable = false)
 |-- priority: string (nullable = false)
 |-- assignment_group: string (nullable 

In [0]:
from pyspark.sql.functions import col, to_timestamp, to_date, year, month
 
# Cast columns to original data types
final = cleaned \
    .withColumn("active", col("active").cast("boolean")) \
    .withColumn("reassignment_count", col("reassignment_count").cast("integer")) \
    .withColumn("reopen_count", col("reopen_count").cast("integer")) \
    .withColumn("sys_mod_count", col("sys_mod_count").cast("integer")) \
    .withColumn("made_sla", col("made_sla").cast("boolean")) \
    .withColumn("knowledge", col("knowledge").cast("boolean")) \
    .withColumn("u_priority_confirmation", col("u_priority_confirmation").cast("boolean")) \
    .withColumn("opened_at", to_timestamp(col("opened_at"), "dd/MM/yyyy HH:mm")) \
    .withColumn("sys_created_at", to_timestamp(col("sys_created_at"), "dd/MM/yyyy HH:mm")) \
    .withColumn("sys_updated_at", to_timestamp(col("sys_updated_at"), "dd/MM/yyyy HH:mm")) \
    .withColumn("resolved_at", to_timestamp(col("resolved_at"), "dd/MM/yyyy HH:mm")) \
    .withColumn("closed_at", to_timestamp(col("closed_at"), "dd/MM/yyyy HH:mm"))
 

In [0]:
final.printSchema()

root
 |-- number: string (nullable = false)
 |-- incident_state: string (nullable = false)
 |-- active: boolean (nullable = true)
 |-- reassignment_count: integer (nullable = true)
 |-- reopen_count: integer (nullable = true)
 |-- sys_mod_count: integer (nullable = true)
 |-- made_sla: boolean (nullable = true)
 |-- caller_id: string (nullable = false)
 |-- opened_by: string (nullable = false)
 |-- opened_at: timestamp (nullable = true)
 |-- sys_created_by: string (nullable = false)
 |-- sys_created_at: timestamp (nullable = true)
 |-- sys_updated_by: string (nullable = false)
 |-- sys_updated_at: timestamp (nullable = true)
 |-- contact_type: string (nullable = false)
 |-- location: string (nullable = false)
 |-- category: string (nullable = false)
 |-- subcategory: string (nullable = false)
 |-- u_symptom: string (nullable = false)
 |-- impact: string (nullable = false)
 |-- urgency: string (nullable = false)
 |-- priority: string (nullable = false)
 |-- assignment_group: string (nul

In [0]:
# add opened month, opened year, closed month, closed year columns

from pyspark.sql.functions import month, year

final = final \
    .withColumn("opened_month", month(to_timestamp(col("opened_at"), "dd/MM/yyyy HH:mm")))\
    .withColumn("opened_year", year(to_timestamp(col("opened_at"), "dd/MM/yyyy HH:mm")))\
    .withColumn("closed_month", month(to_timestamp(col("closed_at"), "dd/MM/yyyy HH:mm")))\
    .withColumn("closed_year", year(to_timestamp(col("closed_at"), "dd/MM/yyyy HH:mm")))
 

In [0]:
final.withColumn("time_taken", col('closed_at') - col('opened_at')).select('time_taken').show(truncate=False)

+-----------------------------------+
|time_taken                         |
+-----------------------------------+
|INTERVAL '5 10:44:00' DAY TO SECOND|
|INTERVAL '5 10:44:00' DAY TO SECOND|
|INTERVAL '5 10:44:00' DAY TO SECOND|
|INTERVAL '5 10:44:00' DAY TO SECOND|
|INTERVAL '6 05:20:00' DAY TO SECOND|
|INTERVAL '6 05:20:00' DAY TO SECOND|
|INTERVAL '6 05:20:00' DAY TO SECOND|
|INTERVAL '6 05:20:00' DAY TO SECOND|
|INTERVAL '6 05:20:00' DAY TO SECOND|
|INTERVAL '6 05:20:00' DAY TO SECOND|
|INTERVAL '6 05:20:00' DAY TO SECOND|
|INTERVAL '6 05:20:00' DAY TO SECOND|
|INTERVAL '6 05:20:00' DAY TO SECOND|
|INTERVAL '5 20:50:00' DAY TO SECOND|
|INTERVAL '5 20:50:00' DAY TO SECOND|
|INTERVAL '5 20:50:00' DAY TO SECOND|
|INTERVAL '5 20:50:00' DAY TO SECOND|
|INTERVAL '5 20:50:00' DAY TO SECOND|
|INTERVAL '5 20:50:00' DAY TO SECOND|
|INTERVAL '5 20:50:00' DAY TO SECOND|
+-----------------------------------+
only showing top 20 rows



In [0]:
# add time taken to close an open incident in seconds

from pyspark.sql.functions import from_unixtime, unix_timestamp

final2 = final.withColumn("time_taken", col('closed_at') - col('opened_at'))
final2 = final2.withColumn("total_seconds", expr("unix_timestamp(closed_at) - unix_timestamp(opened_at)"))
final2.select("total_seconds").show(truncate=False)

+-------------+
|total_seconds|
+-------------+
|470640       |
|470640       |
|470640       |
|470640       |
|537600       |
|537600       |
|537600       |
|537600       |
|537600       |
|537600       |
|537600       |
|537600       |
|537600       |
|507000       |
|507000       |
|507000       |
|507000       |
|507000       |
|507000       |
|507000       |
+-------------+
only showing top 20 rows



In [0]:
final2.printSchema()

root
 |-- number: string (nullable = false)
 |-- incident_state: string (nullable = false)
 |-- active: boolean (nullable = true)
 |-- reassignment_count: integer (nullable = true)
 |-- reopen_count: integer (nullable = true)
 |-- sys_mod_count: integer (nullable = true)
 |-- made_sla: boolean (nullable = true)
 |-- caller_id: string (nullable = false)
 |-- opened_by: string (nullable = false)
 |-- opened_at: timestamp (nullable = true)
 |-- sys_created_by: string (nullable = false)
 |-- sys_created_at: timestamp (nullable = true)
 |-- sys_updated_by: string (nullable = false)
 |-- sys_updated_at: timestamp (nullable = true)
 |-- contact_type: string (nullable = false)
 |-- location: string (nullable = false)
 |-- category: string (nullable = false)
 |-- subcategory: string (nullable = false)
 |-- u_symptom: string (nullable = false)
 |-- impact: string (nullable = false)
 |-- urgency: string (nullable = false)
 |-- priority: string (nullable = false)
 |-- assignment_group: string (nul

In [0]:
total_rows = sorted_df.count()

null_count = final.select([(F.count(F.when(F.col(c).isNull(), c)) / total_rows * 100).alias(c) for c in final.columns])

null_count_pd = null_count.toPandas()

null_count_pd.loc[:, null_count_pd.iloc[0] > 0]

Unnamed: 0,sys_created_at,resolved_at
0,37.453427,2.216467


In [0]:
final2.toPandas()

Unnamed: 0,number,incident_state,active,reassignment_count,reopen_count,sys_mod_count,made_sla,caller_id,opened_by,opened_at,sys_created_by,sys_created_at,sys_updated_by,sys_updated_at,contact_type,location,category,subcategory,u_symptom,impact,urgency,priority,assignment_group,assigned_to,knowledge,u_priority_confirmation,notify,closed_code,resolved_by,resolved_at,closed_at,opened_month,opened_year,closed_month,closed_year,time_taken,total_seconds
0,INC0000045,New,True,0,0,0,True,Caller 2403,Opened by 8,2016-02-29 01:16:00,Created by 6,2016-02-29 01:23:00,Updated by 21,2016-02-29 01:23:00,Phone,Location 143,Category 55,Subcategory 170,Symptom 72,2 - Medium,2 - Medium,3 - Moderate,Group 56,Unknown,True,False,Do Not Notify,code 5,Resolved by 149,2016-02-29 11:29:00,2016-03-05 12:00:00,2,2016,3,2016,5 days 10:44:00,470640
1,INC0000045,Resolved,True,0,0,2,True,Caller 2403,Opened by 8,2016-02-29 01:16:00,Created by 6,2016-02-29 01:23:00,Updated by 642,2016-02-29 08:53:00,Phone,Location 143,Category 55,Subcategory 170,Symptom 72,2 - Medium,2 - Medium,3 - Moderate,Group 56,Unknown,True,False,Do Not Notify,code 5,Resolved by 149,2016-02-29 11:29:00,2016-03-05 12:00:00,2,2016,3,2016,5 days 10:44:00,470640
2,INC0000045,Resolved,True,0,0,3,True,Caller 2403,Opened by 8,2016-02-29 01:16:00,Created by 6,2016-02-29 01:23:00,Updated by 804,2016-02-29 11:29:00,Phone,Location 143,Category 55,Subcategory 170,Symptom 72,2 - Medium,2 - Medium,3 - Moderate,Group 56,Unknown,True,False,Do Not Notify,code 5,Resolved by 149,2016-02-29 11:29:00,2016-03-05 12:00:00,2,2016,3,2016,5 days 10:44:00,470640
3,INC0000045,Closed,False,0,0,4,True,Caller 2403,Opened by 8,2016-02-29 01:16:00,Created by 6,2016-02-29 01:23:00,Updated by 908,2016-03-05 12:00:00,Phone,Location 143,Category 55,Subcategory 170,Symptom 72,2 - Medium,2 - Medium,3 - Moderate,Group 56,Unknown,True,False,Do Not Notify,code 5,Resolved by 149,2016-02-29 11:29:00,2016-03-05 12:00:00,2,2016,3,2016,5 days 10:44:00,470640
4,INC0000047,Active,True,1,0,5,True,Caller 2403,Opened by 397,2016-02-29 04:40:00,Created by 171,2016-02-29 04:57:00,Updated by 332,2016-03-01 09:14:00,Phone,Location 165,Category 40,Subcategory 215,Symptom 471,2 - Medium,2 - Medium,3 - Moderate,Group 24,Resolver 31,True,False,Do Not Notify,code 5,Resolved by 81,2016-03-01 09:52:00,2016-03-06 10:00:00,2,2016,3,2016,6 days 05:20:00,537600
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
141707,INC0120835,Closed,False,1,0,4,True,Caller 116,Opened by 12,2017-02-16 09:09:00,Unknown,NaT,Updated by 27,2017-02-16 09:53:00,Email,Location 204,Category 42,Subcategory 223,Symptom 494,2 - Medium,2 - Medium,3 - Moderate,Group 31,Resolver 10,False,True,Do Not Notify,code 9,Resolved by 9,2017-02-16 09:53:00,2017-02-16 09:53:00,2,2017,2,2017,0 days 00:44:00,2640
141708,INC0121064,Active,True,0,0,0,True,Caller 116,Opened by 12,2017-02-16 14:17:00,Unknown,NaT,Updated by 908,2017-02-16 14:17:00,Email,Location 204,Category 42,Subcategory 223,Symptom 494,2 - Medium,2 - Medium,3 - Moderate,Group 70,Resolver 10,False,False,Do Not Notify,code 6,Resolved by 9,2017-02-16 16:38:00,2017-02-16 16:38:00,2,2017,2,2017,0 days 02:21:00,8460
141709,INC0121064,Active,True,1,0,1,True,Caller 116,Opened by 12,2017-02-16 14:17:00,Unknown,NaT,Updated by 60,2017-02-16 15:20:00,Email,Location 204,Category 42,Subcategory 223,Symptom 494,2 - Medium,2 - Medium,3 - Moderate,Group 31,Unknown,False,False,Do Not Notify,code 6,Resolved by 9,2017-02-16 16:38:00,2017-02-16 16:38:00,2,2017,2,2017,0 days 02:21:00,8460
141710,INC0121064,Resolved,True,1,0,2,True,Caller 116,Opened by 12,2017-02-16 14:17:00,Unknown,NaT,Updated by 27,2017-02-16 16:38:00,Email,Location 204,Category 42,Subcategory 223,Symptom 494,2 - Medium,2 - Medium,3 - Moderate,Group 31,Resolver 10,False,True,Do Not Notify,code 6,Resolved by 9,2017-02-16 16:38:00,2017-02-16 16:38:00,2,2017,2,2017,0 days 02:21:00,8460


In [0]:
final2.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("wasbs://capstonetransformed@ustcapstonestorage.blob.core.windows.net/transformed.parquet")