In [1]:
from pyspark.sql import SparkSession;

# warehouse_location points to the default location for managed databases and tables
from os.path import abspath
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("ISM6562 PySpark Tutorials") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()


# Let's get the SparkContext object. It's the entry point to the Spark API. It's created when you create a sparksession
sc = spark.sparkContext

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

23/10/26 15:19:09 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.21.5.100 instead (on interface eth0)
23/10/26 15:19:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/26 15:19:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session WebUI Port: 4040


In [2]:
spark

# Estimating Incident Duration using Linear Regression

The Incident Management dataset has about 141712 records of 24918 incidents. Each state of the incident is being captured as an individual record with few exceptions where the closed state of an incident is recorded more than once. With the help of the below segment of the code, we load and clean the Incident Management data so that only one record representing the truly closed state per incident is obtained.

### Loading our data

If you've successfully run the previous incident management notebook, you should have the data in the hive store.

In [3]:
spark.sql("show tables").show()

23/10/26 15:19:14 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/10/26 15:19:14 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/10/26 15:19:16 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
23/10/26 15:19:16 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore student@127.0.0.1
23/10/26 15:19:16 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|  default|fake_friends|      false|
|  default|   incidents|      false|
|  default|movieratings|      false|
|  default|      movies|      false|
+---------+------------+-----------+



In [4]:
df = spark.sql("select * from incidents")
df.show(5)

23/10/26 15:19:18 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 0:>                                                          (0 + 1) / 1]

+----------+--------------+------+------------------+------------+-------------+--------+-----------+--------------+---------------+--------------+---------------+--------------+--------------+------------+------------+-----------+---------------+-----------+-------+----------+----------+------------+----------------+------------+---------+-----------------------+-------------+-------------+---+------+---------+-----------+---------------+---------------+--------------+-------------------+-------------------+-------------------+-------------------+-------------------+----------+----------+-----------+-----------+----------+--------+
|    number|incident_state|active|reassignment_count|reopen_count|sys_mod_count|made_sla|  caller_id|     opened_by|      opened_at|sys_created_by| sys_created_at|sys_updated_by|sys_updated_at|contact_type|    location|   category|    subcategory|  u_symptom|cmdb_ci|    impact|   urgency|    priority|assignment_group| assigned_to|knowledge|u_priority_confir

                                                                                

The data set has multiple states(New, Active, Awaiting user info, Resolved, Closed etc. ) of an incident. With the help  of the below command, we are just filtering one record per incident, that has the truly closed state of the incident. 

In [5]:
df_closed_incidents=df.filter("incident_state=='Closed'").\
    sort("sys_mod_count",ascending=False).\
    dropDuplicates(["number"])

In [6]:
df_closed_incidents.show()



+----------+--------------+------+------------------+------------+-------------+--------+-----------+--------------+---------------+--------------+---------------+--------------+----------------+------------+------------+-----------+---------------+-----------+-------+----------+----------+------------+----------------+------------+---------+-----------------------+-------------+-------------+---+------+---------+-----------+---------------+----------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+----------+----------+-----------+-----------+----------+--------+
|    number|incident_state|active|reassignment_count|reopen_count|sys_mod_count|made_sla|  caller_id|     opened_by|      opened_at|sys_created_by| sys_created_at|sys_updated_by|  sys_updated_at|contact_type|    location|   category|    subcategory|  u_symptom|cmdb_ci|    impact|   urgency|    priority|assignment_group| assigned_to|knowledge|u_priority

                                                                                

Let's select a the subset of columns we are interested in for our analysis.

In [7]:
df_closed_incidents =df_closed_incidents.select([
    'reassignment_count',
    'reopen_count',
    'sys_mod_count',
    'opened_by',
    'location',
    'category',
    'subcategory',
    'priority',
    'assignment_group',
    'assigned_to',
    'knowledge',
    'resolved_by',
    'duration'
    ]
)

In [8]:
df_closed_incidents.count()

24918

In [9]:
df_closed_incidents = df_closed_incidents.dropna()

In [10]:
df_closed_incidents.count()

23362

In [11]:
df_closed_incidents.show()

+------------------+------------+-------------+--------------+------------+-----------+---------------+------------+----------------+------------+---------+---------------+--------+
|reassignment_count|reopen_count|sys_mod_count|     opened_by|    location|   category|    subcategory|    priority|assignment_group| assigned_to|knowledge|    resolved_by|duration|
+------------------+------------+-------------+--------------+------------+-----------+---------------+------------+----------------+------------+---------+---------------+--------+
|                 0|           0|            4|  Opened by  8|Location 143|Category 55|Subcategory 170|3 - Moderate|        Group 56|           ?|     true|Resolved by 149|       0|
|                 1|           0|            8|Opened by  397|Location 165|Category 40|Subcategory 215|3 - Moderate|        Group 24| Resolver 89|     true| Resolved by 81|       1|
|                 0|           0|            6|  Opened by  8|Location 204|Category 20|Sub

## Model Training

We earlier created the incident table with 'inferSchema' set to true. This means that the schema of the table is inferred from the data. The knowledge column was properly inferred as a boolean, however, in the pipeline below with use StringIndexer on this column - and therefore, need to convert this column data type to a string.

In [12]:
df_closed_incidents.printSchema()

root
 |-- reassignment_count: integer (nullable = true)
 |-- reopen_count: integer (nullable = true)
 |-- sys_mod_count: integer (nullable = true)
 |-- opened_by: string (nullable = true)
 |-- location: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- priority: string (nullable = true)
 |-- assignment_group: string (nullable = true)
 |-- assigned_to: string (nullable = true)
 |-- knowledge: boolean (nullable = true)
 |-- resolved_by: string (nullable = true)
 |-- duration: integer (nullable = true)



In [13]:
from pyspark.sql.types import IntegerType,BooleanType,DateType, StringType

df_closed_incidents = df_closed_incidents.withColumn("knowledge",df_closed_incidents.knowledge.cast(StringType()))

In [14]:
df_closed_incidents.printSchema()

root
 |-- reassignment_count: integer (nullable = true)
 |-- reopen_count: integer (nullable = true)
 |-- sys_mod_count: integer (nullable = true)
 |-- opened_by: string (nullable = true)
 |-- location: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- priority: string (nullable = true)
 |-- assignment_group: string (nullable = true)
 |-- assigned_to: string (nullable = true)
 |-- knowledge: string (nullable = true)
 |-- resolved_by: string (nullable = true)
 |-- duration: integer (nullable = true)



Now that we have our data ready, let's do a train test split (70/30).

In [15]:
train_data,test_data=df_closed_incidents.randomSplit([0.7,0.3])

In [16]:
from pyspark.ml.feature import StringIndexer
# Use StringIndexer to convert the categorical columns to hold numerical data
 
opened_by_indexer = StringIndexer(inputCol='opened_by',outputCol='opened_by_index',handleInvalid='keep')
location_indexer = StringIndexer(inputCol='location',outputCol='location_index',handleInvalid='keep')
category_indexer = StringIndexer(inputCol='category',outputCol='category_index',handleInvalid='keep')
subcategory_indexer = StringIndexer(inputCol='subcategory',outputCol='subcategory_index',handleInvalid='keep')
priority_indexer = StringIndexer(inputCol='priority',outputCol='priority_index',handleInvalid='keep')
assignment_group_indexer = StringIndexer(inputCol='assignment_group',outputCol='assignment_group_index',handleInvalid='keep')
assigned_to_indexer = StringIndexer(inputCol='assigned_to',outputCol='assigned_to_index',handleInvalid='keep')
knowledge_indexer = StringIndexer(inputCol='knowledge',outputCol='knowledge_index',handleInvalid='keep')
resolved_by_indexer = StringIndexer(inputCol='resolved_by',outputCol='resolved_by_index',handleInvalid='keep')

In [17]:
from pyspark.ml.feature import VectorAssembler
# Vector assembler is used to create a vector of input features
 
assembler = VectorAssembler(
    inputCols=[
        "opened_by_index",
        'location_index',
        'category_index',
        'subcategory_index',
        'priority_index',
        'assignment_group_index',
        'assigned_to_index',
        'knowledge_index',
        'resolved_by_index'
    ],
    outputCol="features"
)

In [18]:
from pyspark.ml import Pipeline

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data
# https://spark.apache.org/docs/latest/ml-pipeline.html
 
pipe = Pipeline(stages=[
    opened_by_indexer,
    location_indexer,
    category_indexer,
    subcategory_indexer,
    priority_indexer,
    assignment_group_indexer,
    assigned_to_indexer,
    knowledge_indexer,
    resolved_by_indexer,
    assembler
    ]
)

In [19]:
fitted_pipe=pipe.fit(train_data)

In [20]:
train_data=fitted_pipe.transform(train_data)
train_data.show()

+------------------+------------+-------------+--------------+------------+-----------+---------------+------------+----------------+------------+---------+--------------+--------+---------------+--------------+--------------+-----------------+--------------+----------------------+-----------------+---------------+-----------------+--------------------+
|reassignment_count|reopen_count|sys_mod_count|     opened_by|    location|   category|    subcategory|    priority|assignment_group| assigned_to|knowledge|   resolved_by|duration|opened_by_index|location_index|category_index|subcategory_index|priority_index|assignment_group_index|assigned_to_index|knowledge_index|resolved_by_index|            features|
+------------------+------------+-------------+--------------+------------+-----------+---------------+------------+----------------+------------+---------+--------------+--------+---------------+--------------+--------------+-----------------+--------------+----------------------+------

In [21]:
test_data=fitted_pipe.transform(test_data)
test_data.show()

+------------------+------------+-------------+--------------+------------+-----------+---------------+------------+----------------+------------+---------+--------------+--------+---------------+--------------+--------------+-----------------+--------------+----------------------+-----------------+---------------+-----------------+--------------------+
|reassignment_count|reopen_count|sys_mod_count|     opened_by|    location|   category|    subcategory|    priority|assignment_group| assigned_to|knowledge|   resolved_by|duration|opened_by_index|location_index|category_index|subcategory_index|priority_index|assignment_group_index|assigned_to_index|knowledge_index|resolved_by_index|            features|
+------------------+------------+-------------+--------------+------------+-----------+---------------+------------+----------------+------------+---------+--------------+--------+---------------+--------------+--------------+-----------------+--------------+----------------------+------

In [22]:
# For those interested in utilizing the ML/AI power of Tensorflow with Spark....
# https://github.com/tensorflow/ecosystem/tree/master/spark/spark-tensorflow-distributor

# In this course, we'll use the SparkML (admitedely, it's not as powerful as Tensorflow, but 
# it's easy to use and demonstrate ML on a Spark Cluster)

from pyspark.ml.regression import LinearRegression

lr_model = LinearRegression(labelCol='duration')
fit_model = lr_model.fit(train_data.select(['features','duration']))


23/10/26 15:19:38 WARN Instrumentation: [a81c48bd] regParam is zero, which might cause numerical instability and overfitting.


In [23]:
results = fit_model.transform(test_data)
results.show()

+------------------+------------+-------------+--------------+------------+-----------+---------------+------------+----------------+------------+---------+--------------+--------+---------------+--------------+--------------+-----------------+--------------+----------------------+-----------------+---------------+-----------------+--------------------+------------------+
|reassignment_count|reopen_count|sys_mod_count|     opened_by|    location|   category|    subcategory|    priority|assignment_group| assigned_to|knowledge|   resolved_by|duration|opened_by_index|location_index|category_index|subcategory_index|priority_index|assignment_group_index|assigned_to_index|knowledge_index|resolved_by_index|            features|        prediction|
+------------------+------------+-------------+--------------+------------+-----------+---------------+------------+----------------+------------+---------+--------------+--------+---------------+--------------+--------------+-----------------+------

In [24]:
results.select(['duration','prediction']).show()

+--------+------------------+
|duration|        prediction|
+--------+------------------+
|       0| 8.404583441744268|
|       0|0.2547233818362512|
|       0| -4.22413012244369|
|       0|-5.130376022163256|
|       0|0.4598020920678584|
|       0| -5.76256244943567|
|       0|   10.802162486883|
|       0|10.213336681295155|
|       0| 4.482917557095654|
|       0|6.2164837280935075|
|       0|6.2164837280935075|
|       0|6.2164837280935075|
|       0|6.2164837280935075|
|       0|6.2164837280935075|
|       0|6.2164837280935075|
|       0| 15.87182587673362|
|       0| 5.692393690834956|
|       0| 5.113970510698998|
|       0|  7.06816162023801|
|       0| 7.149880359440196|
+--------+------------------+
only showing top 20 rows



## Evaluate the peformance of the Linear Regression Model

In [25]:
test_results = fit_model.evaluate(test_data)

In [26]:
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| -8.404583441744268|
|-0.2547233818362512|
|   4.22413012244369|
|  5.130376022163256|
|-0.4598020920678584|
|   5.76256244943567|
|   -10.802162486883|
|-10.213336681295155|
| -4.482917557095654|
|-6.2164837280935075|
|-6.2164837280935075|
|-6.2164837280935075|
|-6.2164837280935075|
|-6.2164837280935075|
|-6.2164837280935075|
| -15.87182587673362|
| -5.692393690834956|
| -5.113970510698998|
|  -7.06816162023801|
| -7.149880359440196|
+-------------------+
only showing top 20 rows



In [27]:
print(f"{'RMSE:':7s} {test_results.rootMeanSquaredError:>7.3f}")
print(f"{'Ex Var:':7s} {test_results.explainedVariance:>7.3f}")
print(f"{'MAE:':7s} {test_results.meanAbsoluteError:>7.3f}")
print(f"{'MSE:':7s} {test_results.meanSquaredError:>7.3f}")
print(f"{'RMSE:':7s} {test_results.rootMeanSquaredError:>7.3f}")
print(f"{'R2:':7s} {test_results.r2:>7.3f}")

RMSE:    21.640
Ex Var:  45.647
MAE:      9.126
MSE:    468.289
RMSE:    21.640
R2:       0.076


In [28]:
spark.stop()