- session intialization

In [110]:
from pyspark.sql import SparkSession;
from pyspark.context import SparkContext
spark = SparkSession \
    .builder \
    .appName("Linear_Regression") \
    .getOrCreate()

sc = spark.sparkContext

In [111]:
spark

In [157]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer


In [113]:
df = spark.read.csv('./data/Restaurant_Scores.csv',header=True, inferSchema=True)
df.show(5)

+-----------+--------------------+----------------+-------------+--------------+--------------------+-----------------+------------------+--------------------+---------------------+-------------+---------------+----------------+--------------------+--------------------+---------------------+-------------+-------------+---------------------+------------------------+----------------------------+----------------------+
|business_id|       business_name|business_address|business_city|business_state|business_postal_code|business_latitude|business_longitude|   business_location|business_phone_number|inspection_id|inspection_date|inspection_score|     inspection_type|        violation_id|violation_description|risk_category|Neighborhoods|SF Find Neighborhoods|Current Police Districts|Current Supervisor Districts|Analysis Neighborhoods|
+-----------+--------------------+----------------+-------------+--------------+--------------------+-----------------+------------------+--------------------+-

In [114]:
df.describe().show()

+-------+-----------------+-----------------+----------------+-------------+--------------+--------------------+--------------------+-------------------+--------------------+---------------------+-------------+---------------+-----------------+--------------------+--------------------+---------------------+-------------+-----------------+---------------------+------------------------+----------------------------+----------------------+
|summary|      business_id|    business_name|business_address|business_city|business_state|business_postal_code|   business_latitude| business_longitude|   business_location|business_phone_number|inspection_id|inspection_date| inspection_score|     inspection_type|        violation_id|violation_description|risk_category|    Neighborhoods|SF Find Neighborhoods|Current Police Districts|Current Supervisor Districts|Analysis Neighborhoods|
+-------+-----------------+-----------------+----------------+-------------+--------------+--------------------+--------

In [115]:
from pyspark.sql.functions import col, sum as spark_sum

missing_counts = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Show the result
missing_counts.show()

+-----------+-------------+----------------+-------------+--------------+--------------------+-----------------+------------------+-----------------+---------------------+-------------+---------------+----------------+---------------+------------+---------------------+-------------+-------------+---------------------+------------------------+----------------------------+----------------------+
|business_id|business_name|business_address|business_city|business_state|business_postal_code|business_latitude|business_longitude|business_location|business_phone_number|inspection_id|inspection_date|inspection_score|inspection_type|violation_id|violation_description|risk_category|Neighborhoods|SF Find Neighborhoods|Current Police Districts|Current Supervisor Districts|Analysis Neighborhoods|
+-----------+-------------+----------------+-------------+--------------+--------------------+-----------------+------------------+-----------------+---------------------+-------------+---------------+-----

In [116]:
df = df.dropna(how='any')

In [117]:
missing_counts = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Show the result
missing_counts.show()

+-----------+-------------+----------------+-------------+--------------+--------------------+-----------------+------------------+-----------------+---------------------+-------------+---------------+----------------+---------------+------------+---------------------+-------------+-------------+---------------------+------------------------+----------------------------+----------------------+
|business_id|business_name|business_address|business_city|business_state|business_postal_code|business_latitude|business_longitude|business_location|business_phone_number|inspection_id|inspection_date|inspection_score|inspection_type|violation_id|violation_description|risk_category|Neighborhoods|SF Find Neighborhoods|Current Police Districts|Current Supervisor Districts|Analysis Neighborhoods|
+-----------+-------------+----------------+-------------+--------------+--------------------+-----------------+------------------+-----------------+---------------------+-------------+---------------+-----

In [118]:
df.show(1)

+-----------+--------------------+----------------+-------------+--------------+--------------------+-----------------+------------------+--------------------+---------------------+-------------+---------------+----------------+--------------------+--------------------+---------------------+-------------+-------------+---------------------+------------------------+----------------------------+----------------------+
|business_id|       business_name|business_address|business_city|business_state|business_postal_code|business_latitude|business_longitude|   business_location|business_phone_number|inspection_id|inspection_date|inspection_score|     inspection_type|        violation_id|violation_description|risk_category|Neighborhoods|SF Find Neighborhoods|Current Police Districts|Current Supervisor Districts|Analysis Neighborhoods|
+-----------+--------------------+----------------+-------------+--------------+--------------------+-----------------+------------------+--------------------+-

In [119]:
df.printSchema()

root
 |-- business_id: integer (nullable = true)
 |-- business_name: string (nullable = true)
 |-- business_address: string (nullable = true)
 |-- business_city: string (nullable = true)
 |-- business_state: string (nullable = true)
 |-- business_postal_code: integer (nullable = true)
 |-- business_latitude: double (nullable = true)
 |-- business_longitude: double (nullable = true)
 |-- business_location: string (nullable = true)
 |-- business_phone_number: long (nullable = true)
 |-- inspection_id: string (nullable = true)
 |-- inspection_date: string (nullable = true)
 |-- inspection_score: integer (nullable = true)
 |-- inspection_type: string (nullable = true)
 |-- violation_id: string (nullable = true)
 |-- violation_description: string (nullable = true)
 |-- risk_category: string (nullable = true)
 |-- Neighborhoods: integer (nullable = true)
 |-- SF Find Neighborhoods: integer (nullable = true)
 |-- Current Police Districts: integer (nullable = true)
 |-- Current Supervisor Dist

In [120]:
df.dtypes

[('business_id', 'int'),
 ('business_name', 'string'),
 ('business_address', 'string'),
 ('business_city', 'string'),
 ('business_state', 'string'),
 ('business_postal_code', 'int'),
 ('business_latitude', 'double'),
 ('business_longitude', 'double'),
 ('business_location', 'string'),
 ('business_phone_number', 'bigint'),
 ('inspection_id', 'string'),
 ('inspection_date', 'string'),
 ('inspection_score', 'int'),
 ('inspection_type', 'string'),
 ('violation_id', 'string'),
 ('violation_description', 'string'),
 ('risk_category', 'string'),
 ('Neighborhoods', 'int'),
 ('SF Find Neighborhoods', 'int'),
 ('Current Police Districts', 'int'),
 ('Current Supervisor Districts', 'int'),
 ('Analysis Neighborhoods', 'int')]

In [121]:
from pyspark.sql.types import StringType

def is_categorical(column):
    return isinstance(df.schema[column].dataType, StringType)

# Select only categorical columns
categorical_columns = [column for column in df.columns if is_categorical(column)]

In [122]:
#categorical_columns = categorical_columns[:-1]
categorical_columns

['business_name',
 'business_address',
 'business_city',
 'business_state',
 'business_location',
 'inspection_id',
 'inspection_date',
 'inspection_type',
 'violation_id',
 'violation_description',
 'risk_category']

- Before building the model, we need to assemble the input features into a single feature vector using the VectorAssembler class. Then, we will split the dataset into a training set (80%) and a testing set (20%).

- Indexing

In [123]:
indexed_df = df
for col in categorical_columns:
    indexer = StringIndexer(inputCol=col, outputCol=col+'_indexed')
    indexed_df = indexer.fit(indexed_df).transform(indexed_df)
    indexed_df = indexed_df.drop(col)
indexed_df.show()

+-----------+--------------------+-----------------+------------------+---------------------+----------------+-------------+---------------------+------------------------+----------------------------+----------------------+---------------------+------------------------+---------------------+----------------------+-------------------------+---------------------+-----------------------+-----------------------+--------------------+-----------------------------+---------------------+
|business_id|business_postal_code|business_latitude|business_longitude|business_phone_number|inspection_score|Neighborhoods|SF Find Neighborhoods|Current Police Districts|Current Supervisor Districts|Analysis Neighborhoods|business_name_indexed|business_address_indexed|business_city_indexed|business_state_indexed|business_location_indexed|inspection_id_indexed|inspection_date_indexed|inspection_type_indexed|violation_id_indexed|violation_description_indexed|risk_category_indexed|
+-----------+-----------------

In [124]:
df = indexed_df
df.dtypes

[('business_id', 'int'),
 ('business_postal_code', 'int'),
 ('business_latitude', 'double'),
 ('business_longitude', 'double'),
 ('business_phone_number', 'bigint'),
 ('inspection_score', 'int'),
 ('Neighborhoods', 'int'),
 ('SF Find Neighborhoods', 'int'),
 ('Current Police Districts', 'int'),
 ('Current Supervisor Districts', 'int'),
 ('Analysis Neighborhoods', 'int'),
 ('business_name_indexed', 'double'),
 ('business_address_indexed', 'double'),
 ('business_city_indexed', 'double'),
 ('business_state_indexed', 'double'),
 ('business_location_indexed', 'double'),
 ('inspection_id_indexed', 'double'),
 ('inspection_date_indexed', 'double'),
 ('inspection_type_indexed', 'double'),
 ('violation_id_indexed', 'double'),
 ('violation_description_indexed', 'double'),
 ('risk_category_indexed', 'double')]

- risk_category is still a string but it need to be a numeric for regression

In [126]:
X = df.drop('risk_category_indexed')
Y = df.select('risk_category_indexed')

In [127]:
Y.show(5)

+---------------------+
|risk_category_indexed|
+---------------------+
|                  1.0|
|                  0.0|
|                  0.0|
|                  0.0|
|                  0.0|
+---------------------+
only showing top 5 rows



In [128]:
X.show(5)

+-----------+--------------------+-----------------+------------------+---------------------+----------------+-------------+---------------------+------------------------+----------------------------+----------------------+---------------------+------------------------+---------------------+----------------------+-------------------------+---------------------+-----------------------+-----------------------+--------------------+-----------------------------+
|business_id|business_postal_code|business_latitude|business_longitude|business_phone_number|inspection_score|Neighborhoods|SF Find Neighborhoods|Current Police Districts|Current Supervisor Districts|Analysis Neighborhoods|business_name_indexed|business_address_indexed|business_city_indexed|business_state_indexed|business_location_indexed|inspection_id_indexed|inspection_date_indexed|inspection_type_indexed|violation_id_indexed|violation_description_indexed|
+-----------+--------------------+-----------------+------------------+---

In [129]:
assembler = VectorAssembler(inputCols=X.columns, outputCol="features")
df = assembler.transform(df)

In [141]:
df.count()

372

- splitting Data

In [143]:
train_data, test_data = df.randomSplit([0.6, 0.4], seed=42)

In [144]:
train_data.show(5)

+-----------+--------------------+-----------------+------------------+---------------------+----------------+-------------+---------------------+------------------------+----------------------------+----------------------+---------------------+------------------------+---------------------+----------------------+-------------------------+---------------------+-----------------------+-----------------------+--------------------+-----------------------------+---------------------+--------------------+
|business_id|business_postal_code|business_latitude|business_longitude|business_phone_number|inspection_score|Neighborhoods|SF Find Neighborhoods|Current Police Districts|Current Supervisor Districts|Analysis Neighborhoods|business_name_indexed|business_address_indexed|business_city_indexed|business_state_indexed|business_location_indexed|inspection_id_indexed|inspection_date_indexed|inspection_type_indexed|violation_id_indexed|violation_description_indexed|risk_category_indexed|          

In [145]:
train_data.count()

232

In [146]:
train_data.select('risk_category_indexed').describe().show()

+-------+---------------------+
|summary|risk_category_indexed|
+-------+---------------------+
|  count|                  232|
|   mean|   0.7413793103448276|
| stddev|   0.7457897949632883|
|    min|                  0.0|
|    max|                  2.0|
+-------+---------------------+



In [152]:
linear_regression = LinearRegression( featuresCol="features", labelCol="risk_category_indexed")
model = linear_regression.fit(train_data)

In [161]:
trainingSummary = model.summary

In [162]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show(5)
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

numIterations: 53
objectiveHistory: [0.5, 0.47103108546474026, 0.45026587348723546, 0.43484623865304706, 0.4322868474320628, 0.430680663275946, 0.42921206171257176, 0.42874974015608835, 0.4283586125626996, 0.4281467034115237, 0.4279816322786544, 0.4275150883635731, 0.4270466630715034, 0.4259063594794795, 0.4255723004258086, 0.4252867966330839, 0.42510570034499573, 0.42508024931515387, 0.4250627143374529, 0.4250378434675738, 0.4250327849187513, 0.4250208201245257, 0.42501937758157304, 0.4250183592426381, 0.42501666937851745, 0.4250146396480502, 0.42501041528981454, 0.4250059511282941, 0.4250000921717244, 0.4249959402628135, 0.42499123003875205, 0.4249870028153807, 0.4249855845275149, 0.42498512064747296, 0.4249843832178001, 0.424984281689997, 0.4249841676871099, 0.4249840567094295, 0.42498403913833294, 0.42498402350014, 0.4249840172906786, 0.42498401364246796, 0.4249840105312003, 0.4249840083078268, 0.42498400504478717, 0.4249840020851777, 0.42498400060452035, 0.42498399869662784, 0.424

In [155]:
predictions = model.transform(test_data)

In [166]:
predictions.select("risk_category_indexed","features").show(5)

+---------------------+--------------------+
|risk_category_indexed|            features|
+---------------------+--------------------+
|                  1.0|[146.0,94103.0,37...|
|                  1.0|[146.0,94103.0,37...|
|                  0.0|[146.0,94103.0,37...|
|                  0.0|[146.0,94103.0,37...|
|                  0.0|[151.0,94117.0,37...|
+---------------------+--------------------+
only showing top 5 rows



In [168]:
evaluator = RegressionEvaluator( predictionCol = 'prediction', labelCol='risk_category_indexed', metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_r2 = RegressionEvaluator(predictionCol = 'prediction', labelCol='risk_category_indexed', metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print("R-squared (R2) on test data: {:.3f}".format(r2))

Root Mean Squared Error (RMSE) on test data: 0.797
R-squared (R2) on test data: -0.125
