# 1. Import necess lib and create Spark Session #

In [27]:
# Import lib
import pyspark
import pyspark.sql.functions as F

# Import Spark Session
from pyspark.sql import SparkSession

# Import functions/Classes for sparkml

from pyspark.ml.regression import LinearRegression

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

# Import functions/Classes for pipeline creation
from pyspark.ml import Pipeline

# Import functions/Classes for metrics (evaluatio model)
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 29, Finished, Available)

In [28]:

# Create Spark Session
spark = SparkSession.builder.appName("LinearRegression_UCRPART") \
                            .config('spark.cores.max', "16") \
                            .config("spark.executor.memory", "70g") \
                            .config("spark.driver.memory", "50g") \
                            .config("spark.memory.offHeap.enabled",True) \
                            .config("spark.memory.offHeap.size","16g") \
                            .getOrCreate()
    
# Print Spark version
print("Spark version: ", spark.version)

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 30, Finished, Available)

Spark version:  3.4.1.5.3-117503204


# 2. Extract data from Lakehouse and Select column need #

In [29]:
# Define file_path
file_path = "Files/boston_crime.csv"

# Read dataset into Spark dataframe
df_boston = spark.read.format("csv") \
                      .option("header", True) \
                      .option("inferSchema", True) \
                      .load(file_path)

# Show dataframe
df_boston.show(5)

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 31, Finished, Available)

+---------------+------------+--------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+---------------+-----------+------------+--------------------+
|INCIDENT_NUMBER|OFFENSE_CODE|  OFFENSE_CODE_GROUP| OFFENSE_DESCRIPTION|DISTRICT|REPORTING_AREA|SHOOTING|   OCCURRED_ON_DATE|YEAR|MONTH|DAY_OF_WEEK|HOUR|  UCR_PART|         STREET|        Lat|        Long|            Location|
+---------------+------------+--------------------+--------------------+--------+--------------+--------+-------------------+----+-----+-----------+----+----------+---------------+-----------+------------+--------------------+
|     I162009737|        3803|Motor Vehicle Acc...|M/V ACCIDENT - PE...|     D14|           796|      No|2016-02-05 11:14:00|2016|    2|     Friday|  11|Part Three|    HARVARD AVE|42.35085371|-71.13125747|(42.35085371, -71...|
|     I162009734|        3803|Motor Vehicle Acc...|M/V ACCIDENT - PE...|     C11|           

In [30]:
# Column selected
selected_cols = ["OFFENSE_CODE_GROUP",
                 "DISTRICT",
                 "SHOOTING",
                 "HOUR",
                 "DAY_OF_WEEK",
                 "MONTH",
                 "UCR_PART"]

# Selected dataframe
df_boston_feature = df_boston[selected_cols]

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 32, Finished, Available)

In [31]:
# Print schema
df_boston_feature.printSchema()

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 33, Finished, Available)

root
 |-- OFFENSE_CODE_GROUP: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- SHOOTING: string (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- UCR_PART: string (nullable = true)



In [32]:
# Print the total number of rows
print("Total number of rows: ", df_boston_feature.count())

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 34, Finished, Available)

Total number of rows:  295277


# 3. Create Train, Test data #

In [33]:
(training_data, testing_data) = df_boston_feature.randomSplit([0.8, 0.2], seed = 42)

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 35, Finished, Available)

In [34]:
training_data.count()

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 36, Finished, Available)

236507

In [35]:
testing_data.count()

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 37, Finished, Available)

58770

# 4. Create Pipeline for Linear Regression Model #

In [36]:
# Stage 1: Change data of column UCR_PART from Part one -> Indexer
indexer = StringIndexer(inputCol = 'UCR_PART', outputCol = 'UCR_PART_Label')

# Stage 2: Change data of column DISTRICT, SHOOTING, DAY_OF_WEEK To number
index_feature = StringIndexer(inputCols = ['OFFENSE_CODE_GROUP', 'DISTRICT', 'SHOOTING', 'DAY_OF_WEEK'], 
                              outputCols = ['OFFENSE_CODE_GROUP_index', 'DISTRICT_index', 
                                            'SHOOTING_index', 'DAY_OF_WEEK_index'])

# Stage 3: Create feature with columns: ['OFFENSE_CODE_GROUP', 'DISTRICT', 'SHOOTING', 'HOUR', 'DAY_OF_WEEK', 'MONTH']
vector = VectorAssembler(inputCols = ['OFFENSE_CODE_GROUP_index', 'DISTRICT_index', 'SHOOTING_index', 
                                      'HOUR', 'DAY_OF_WEEK_index', 'MONTH']
                        , outputCol = 'features')

# Stage 4: Create instance of Linear Regression Model
lr = LinearRegression(featuresCol = "features", labelCol = "UCR_PART_Label")

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 38, Finished, Available)

In [37]:
# Create the Pipeline 
lr_pipeline = Pipeline(stages = [indexer, index_feature,vector, lr])

# Fit Pipeline
lr_model = lr_pipeline.fit(training_data)

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 39, Finished, Available)



In [38]:
# Summary of model
ps = [str(x).split("_")[0] for x in lr_pipeline.getStages()]

print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])
print("Pipeline Stage 4 = ", ps[3])

print("Label column = ", lr.getLabelCol())

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 40, Finished, Available)

Pipeline Stage 1 =  StringIndexer
Pipeline Stage 2 =  StringIndexer
Pipeline Stage 3 =  VectorAssembler
Pipeline Stage 4 =  LinearRegression
Label column =  UCR_PART_Label


# 5. Evaluate Model #

In [39]:
# Predict on test data
predictions = lr_model.transform(testing_data)
predictions.select("UCR_PART_Label", "prediction").show(5)

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 41, Finished, Available)

+--------------+------------------+
|UCR_PART_Label|        prediction|
+--------------+------------------+
|           2.0|0.6683949492885153|
|           2.0|0.7087752106250108|
|           2.0|0.7127638110700087|
|           2.0|0.7116161844373631|
|           2.0|0.7035895578587156|
+--------------+------------------+
only showing top 5 rows



In [40]:
evaluator_mse = RegressionEvaluator(labelCol="UCR_PART_Label", predictionCol="prediction", metricName="mse")
mse = evaluator_mse.evaluate(predictions)

evaluator_mae = RegressionEvaluator(labelCol="UCR_PART_Label", predictionCol="prediction", metricName="mae")
mae = evaluator_mae.evaluate(predictions)

evaluator_r2 = RegressionEvaluator(labelCol="UCR_PART_Label", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 42, Finished, Available)



In [41]:

# Print the result of evaluate model
print("Mean Squared Error =", np.round(mse, 2))
print("Mean Absolute Error =", np.round(mae, 2))
print("R Squared =", np.round(r2, 2))

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 43, Finished, Available)

Mean Squared Error = 0.6
Mean Absolute Error = 0.69
R Squared = 0.01


# 6. Save model #

In [43]:
# Save model to model_ucr_part
lr_model.write().overwrite().save("Files/model_ucr_part")

StatementMeta(, dcbc3bb9-3aec-449f-9340-15af0bc66182, 45, Finished, Available)