In [31]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.functions import year
from pyspark.sql.functions import to_date
from pyspark.sql.functions import month

In [3]:
spark = SparkSession.builder.appName('ML').getOrCreate()

In [173]:
# read data
train= spark.read.csv('train.csv',header=True,inferSchema=True)

In [174]:
train.show()

+---------------+-----+--------------+-------+-------------------+---------------------+------+
|         row_id|cfips|        county|  state| first_day_of_month|microbusiness_density|active|
+---------------+-----+--------------+-------+-------------------+---------------------+------+
|1001_2019-08-01| 1001|Autauga County|Alabama|2019-08-01 00:00:00|            3.0076818|  1249|
|1001_2019-09-01| 1001|Autauga County|Alabama|2019-09-01 00:00:00|            2.8848701|  1198|
|1001_2019-10-01| 1001|Autauga County|Alabama|2019-10-01 00:00:00|            3.0558431|  1269|
|1001_2019-11-01| 1001|Autauga County|Alabama|2019-11-01 00:00:00|            2.9932332|  1243|
|1001_2019-12-01| 1001|Autauga County|Alabama|2019-12-01 00:00:00|            2.9932332|  1243|
|1001_2020-01-01| 1001|Autauga County|Alabama|2020-01-01 00:00:00|              2.96909|  1242|
|1001_2020-02-01| 1001|Autauga County|Alabama|2020-02-01 00:00:00|            2.9093256|  1217|
|1001_2020-03-01| 1001|Autauga County|Al

In [175]:
train.printSchema()

root
 |-- row_id: string (nullable = true)
 |-- cfips: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- first_day_of_month: timestamp (nullable = true)
 |-- microbusiness_density: double (nullable = true)
 |-- active: integer (nullable = true)



In [176]:
#[cfips,county,state,first_dat_of_month,active]

In [177]:
# one hot encoder to process county 
varIdxer = StringIndexer(inputCol='county',outputCol='varcounty').fit(train)
train = varIdxer.transform(train)
encodeer = OneHotEncoder(inputCol="varcounty",outputCol="county_Vec")
train = encodeer.fit(train).transform(train)
train.show(5)

+---------------+-----+--------------+-------+-------------------+---------------------+------+---------+------------------+
|         row_id|cfips|        county|  state| first_day_of_month|microbusiness_density|active|varcounty|        county_Vec|
+---------------+-----+--------------+-------+-------------------+---------------------+------+---------+------------------+
|1001_2019-08-01| 1001|Autauga County|Alabama|2019-08-01 00:00:00|            3.0076818|  1249|    494.0|(1870,[494],[1.0])|
|1001_2019-09-01| 1001|Autauga County|Alabama|2019-09-01 00:00:00|            2.8848701|  1198|    494.0|(1870,[494],[1.0])|
|1001_2019-10-01| 1001|Autauga County|Alabama|2019-10-01 00:00:00|            3.0558431|  1269|    494.0|(1870,[494],[1.0])|
|1001_2019-11-01| 1001|Autauga County|Alabama|2019-11-01 00:00:00|            2.9932332|  1243|    494.0|(1870,[494],[1.0])|
|1001_2019-12-01| 1001|Autauga County|Alabama|2019-12-01 00:00:00|            2.9932332|  1243|    494.0|(1870,[494],[1.0])|


In [178]:
# one hot encoder to process state
varIdxer = StringIndexer(inputCol='state',outputCol='varstate').fit(train)
train = varIdxer.transform(train)
encodeer = OneHotEncoder(inputCol="varstate",outputCol="state_Vec")
train = encodeer.fit(train).transform(train)
train.select(['state_Vec']).show(5)

+---------------+
|      state_Vec|
+---------------+
|(50,[19],[1.0])|
|(50,[19],[1.0])|
|(50,[19],[1.0])|
|(50,[19],[1.0])|
|(50,[19],[1.0])|
+---------------+
only showing top 5 rows



In [179]:
#get year of the first day of month
train = train.withColumn('year',year(train.first_day_of_month))
train.select(['first_day_of_month','year']).show()

+-------------------+----+
| first_day_of_month|year|
+-------------------+----+
|2019-08-01 00:00:00|2019|
|2019-09-01 00:00:00|2019|
|2019-10-01 00:00:00|2019|
|2019-11-01 00:00:00|2019|
|2019-12-01 00:00:00|2019|
|2020-01-01 00:00:00|2020|
|2020-02-01 00:00:00|2020|
|2020-03-01 00:00:00|2020|
|2020-04-01 00:00:00|2020|
|2020-05-01 00:00:00|2020|
|2020-06-01 00:00:00|2020|
|2020-07-01 00:00:00|2020|
|2020-08-01 00:00:00|2020|
|2020-09-01 00:00:00|2020|
|2020-10-01 00:00:00|2020|
|2020-11-01 00:00:00|2020|
|2020-12-01 00:00:00|2020|
|2021-01-01 00:00:00|2021|
|2021-02-01 00:00:00|2021|
|2021-03-01 00:00:00|2021|
+-------------------+----+
only showing top 20 rows



In [180]:
### Get Month from date in pyspark
train = train.withColumn('month',month(train.first_day_of_month))
train.select(['first_day_of_month','month']).show()

+-------------------+-----+
| first_day_of_month|month|
+-------------------+-----+
|2019-08-01 00:00:00|    8|
|2019-09-01 00:00:00|    9|
|2019-10-01 00:00:00|   10|
|2019-11-01 00:00:00|   11|
|2019-12-01 00:00:00|   12|
|2020-01-01 00:00:00|    1|
|2020-02-01 00:00:00|    2|
|2020-03-01 00:00:00|    3|
|2020-04-01 00:00:00|    4|
|2020-05-01 00:00:00|    5|
|2020-06-01 00:00:00|    6|
|2020-07-01 00:00:00|    7|
|2020-08-01 00:00:00|    8|
|2020-09-01 00:00:00|    9|
|2020-10-01 00:00:00|   10|
|2020-11-01 00:00:00|   11|
|2020-12-01 00:00:00|   12|
|2021-01-01 00:00:00|    1|
|2021-02-01 00:00:00|    2|
|2021-03-01 00:00:00|    3|
+-------------------+-----+
only showing top 20 rows



In [181]:
train.printSchema()

root
 |-- row_id: string (nullable = true)
 |-- cfips: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- first_day_of_month: timestamp (nullable = true)
 |-- microbusiness_density: double (nullable = true)
 |-- active: integer (nullable = true)
 |-- varcounty: double (nullable = false)
 |-- county_Vec: vector (nullable = true)
 |-- varstate: double (nullable = false)
 |-- state_Vec: vector (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



In [157]:
# create a input vector
assembler = VectorAssembler(inputCols=['county_Vec','state_Vec','year','month'],outputCol = "Independent_fea" )

In [158]:
output = assembler.transform(train)

In [159]:
# output.show()

+---------------+-----+--------------+-------+-------------------+---------------------+------+---------+------------------+--------+---------------+----+-----+--------------------+
|         row_id|cfips|        county|  state| first_day_of_month|microbusiness_density|active|varcounty|        county_Vec|varstate|      state_Vec|year|month|     Independent_fea|
+---------------+-----+--------------+-------+-------------------+---------------------+------+---------+------------------+--------+---------------+----+-----+--------------------+
|1001_2019-08-01| 1001|Autauga County|Alabama|2019-08-01 00:00:00|            3.0076818|  1249|    494.0|(1870,[494],[1.0])|    19.0|(50,[19],[1.0])|2019|    8|(1924,[0,1,496,18...|
|1001_2019-09-01| 1001|Autauga County|Alabama|2019-09-01 00:00:00|            2.8848701|  1198|    494.0|(1870,[494],[1.0])|    19.0|(50,[19],[1.0])|2019|    9|(1924,[0,1,496,18...|
|1001_2019-10-01| 1001|Autauga County|Alabama|2019-10-01 00:00:00|            3.0558431|  

In [160]:
# create a finalized data with only independent variable and output
finalized_data=output.select("independent_fea","microbusiness_density")
finalized_data.show()

In [161]:
finalized_data.show()

+--------------------+---------------------+
|     independent_fea|microbusiness_density|
+--------------------+---------------------+
|(1924,[0,1,496,18...|            3.0076818|
|(1924,[0,1,496,18...|            2.8848701|
|(1924,[0,1,496,18...|            3.0558431|
|(1924,[0,1,496,18...|            2.9932332|
|(1924,[0,1,496,18...|            2.9932332|
|(1924,[0,1,496,18...|              2.96909|
|(1924,[0,1,496,18...|            2.9093256|
|(1924,[0,1,496,18...|            2.9332314|
|(1924,[0,1,496,18...|            3.0001674|
|(1924,[0,1,496,18...|            3.0049484|
|(1924,[0,1,496,18...|            3.0192919|
|(1924,[0,1,496,18...|            3.0838373|
|(1924,[0,1,496,18...|             3.174679|
|(1924,[0,1,496,18...|            3.2057564|
|(1924,[0,1,496,18...|            3.1938035|
|(1924,[0,1,496,18...|            3.0384164|
|(1924,[0,1,496,18...|             3.002558|
|(1924,[0,1,496,18...|            2.9472437|
|(1924,[0,1,496,18...|            3.1061056|
|(1924,[0,

In [162]:
#train test split
train, test = finalized_data.randomSplit([0.75,0.25])
regressor = LinearRegression(featuresCol = 'independent_fea',labelCol = 'microbusiness_density')
regressor = regressor.fit(train)

In [163]:
regressor.coefficients

DenseVector([-0.0, 0.0001, 3.4334, 5.0012, 3.3372, 3.6396, 4.551, 4.4619, 3.309, 4.0051, 3.2233, 4.0532, 7.2724, 2.6413, 2.9813, 4.139, 3.9726, 5.1299, 3.1025, 2.4275, 4.9989, 4.9282, 4.4366, 3.5678, 3.1158, 3.5648, 2.3345, 2.8406, 4.7676, 3.3917, 3.3284, 3.4418, 4.834, 3.6454, 3.6524, 2.8691, 2.9549, 3.321, 3.6575, 3.6699, 3.6721, 3.1241, 4.0373, 4.1948, 4.5381, 5.4551, 3.6258, 4.0467, 3.3946, 3.2765, 4.0253, 3.4729, 3.7043, 3.7457, 2.5538, 3.1336, 3.2633, 2.8067, 3.2158, 2.5755, 2.6133, 3.9794, 7.1163, 4.6895, 3.9299, 3.5708, 3.5643, 2.6171, 3.8544, 3.3918, 4.2164, 3.2554, 4.0352, 2.6707, 3.6935, 0.8712, 2.3761, 6.081, 3.9907, 2.9628, 3.2511, 3.582, 3.1336, 0.3108, 4.1841, 3.5532, 1.96, 2.7565, 3.5775, 3.678, 14.9413, 3.8595, 4.6632, 3.0387, 3.5863, 5.2378, 3.6105, 4.9344, -0.3411, 4.318, 2.6577, -0.0351, 3.3336, 2.7215, 3.5377, 6.2943, 2.0671, 4.0891, 3.106, 1.9598, 3.9466, 2.0852, 2.8349, 3.1402, 8.7619, 5.6841, 3.8117, 1.145, 3.9272, 2.4374, 3.4749, 1.5047, 4.2155, 4.6818, 0.6085,

In [164]:
regressor.intercept

-180.16630917402625

In [165]:
pred_result = regressor.evaluate(test)

In [166]:
pred_result.predictions.show()

+--------------------+---------------------+------------------+
|     independent_fea|microbusiness_density|        prediction|
+--------------------+---------------------+------------------+
|(1924,[0,1,2,1873...|            1.2780961|2.9575622723869515|
|(1924,[0,1,2,1873...|            1.2906882| 2.950390756807707|
|(1924,[0,1,2,1873...|            1.3324115| 3.003675476155763|
|(1924,[0,1,2,1873...|            1.3391467|3.0931903737900086|
|(1924,[0,1,2,1873...|             1.364061|3.0861430583407525|
|(1924,[0,1,2,1873...|            1.4347472|  3.21257123497611|
|(1924,[0,1,2,1873...|            1.4472778|3.1543297094322043|
|(1924,[0,1,2,1873...|            1.4535431| 3.176278956624884|
|(1924,[0,1,2,1875...|            1.4857391|1.8412214257194535|
|(1924,[0,1,2,1875...|            1.4857391|1.8485171414286867|
|(1924,[0,1,2,1875...|            1.5001631|1.6553306151966183|
|(1924,[0,1,2,1875...|            1.5291183|1.7903998160147694|
|(1924,[0,1,2,1875...|             1.559

In [167]:
# mean absolute error, mean squared error
pred_result.meanAbsoluteError, pred_result.meanSquaredError

(1.0566802704570228, 12.245007133204323)