In [1]:
import findspark

In [2]:
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType,
                               IntegerType, StructType)

# from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
spark = SparkSession.builder.appName('Basics').getOrCreate()

In [5]:
data_schema = [StructField('_c0', IntegerType(), True), 
               StructField('0', IntegerType(), True),
               StructField('1', IntegerType(), True),
               StructField('2', IntegerType(), True),
               StructField('3', IntegerType(), True),
               StructField('4', StringType(), True),
               StructField('5', IntegerType(), True),
               StructField('6', IntegerType(), True)]
final_struc = StructType(fields = data_schema)

In [6]:
df = spark.read.csv('clean_input_sample.csv', header=True, schema=final_struc)
df = df.withColumnRenamed('_c0', 'index') \
        .withColumnRenamed('0', 'station') \
        .withColumnRenamed('1', 'day_of_week') \
        .withColumnRenamed('2', 'time_of_day') \
        .withColumnRenamed('3', 'net_flow') \
        .withColumnRenamed('4', 'date') \
        .withColumnRenamed('5', 'num_of_incid') \
        .withColumnRenamed('6', 'incid_in_effect')
df.head(2)

[Row(index=0, station=0, day_of_week=4, time_of_day=0, net_flow=2488, date='1012016', num_of_incid=0, incid_in_effect=0),
 Row(index=1, station=0, day_of_week=0, time_of_day=2, net_flow=-6066, date='1042016', num_of_incid=0, incid_in_effect=0)]

In [7]:
assembler = VectorAssembler(inputCols = ['station', 'day_of_week',
                            'time_of_day', 'num_of_incid',
                            'incid_in_effect'], 
                            outputCol = 'features')
output = assembler.transform(df)
output.printSchema()

root
 |-- index: integer (nullable = true)
 |-- station: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- time_of_day: integer (nullable = true)
 |-- net_flow: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- num_of_incid: integer (nullable = true)
 |-- incid_in_effect: integer (nullable = true)
 |-- features: vector (nullable = true)



In [8]:
final_data = output.select('features', 'net_flow')
final_data.show()

+--------------------+--------+
|            features|net_flow|
+--------------------+--------+
|       (5,[1],[4.0])|    2488|
|       (5,[2],[2.0])|   -6066|
| (5,[1,2],[1.0,2.0])|   -6975|
| (5,[1,2],[2.0,2.0])|   -6740|
|       (5,[1],[3.0])|    5058|
|       (5,[1],[4.0])|    5198|
|       (5,[1],[5.0])|    3362|
| (5,[1,2],[6.0,4.0])|    4616|
|[0.0,0.0,1.0,1.0,...|    -152|
| (5,[1,2],[1.0,1.0])|    1016|
| (5,[1,2],[2.0,1.0])|      82|
| (5,[1,2],[3.0,1.0])|     100|
| (5,[1,2],[5.0,3.0])|     386|
| (5,[1,2],[6.0,3.0])|     650|
|           (5,[],[])|    2258|
|       (5,[1],[1.0])|    4678|
| (5,[1,2],[2.0,4.0])|   11070|
| (5,[1,2],[3.0,4.0])|   11018|
| (5,[1,2],[4.0,2.0])|   -5860|
| (5,[1,2],[5.0,2.0])|   -1720|
+--------------------+--------+
only showing top 20 rows



In [9]:
featureIndexer = VectorIndexer(inputCol="features",
                               outputCol="indexedFeatures",
                               maxCategories=7).fit(final_data)

In [10]:
(train_data, test_data) = final_data.randomSplit([0.7, 0.3])

In [11]:
train_data.describe().show()

+-------+------------------+
|summary|          net_flow|
+-------+------------------+
|  count|             21144|
|   mean| 662.6977393113885|
| stddev|2684.4736844061204|
|    min|            -21476|
|    max|             27975|
+-------+------------------+



In [12]:
test_data.describe().show()

+-------+------------------+
|summary|          net_flow|
+-------+------------------+
|  count|              8856|
|   mean| 641.8454155374887|
| stddev|2624.7603781545367|
|    min|            -17266|
|    max|             25397|
+-------+------------------+



In [13]:
rf = RandomForestRegressor(featuresCol = 'indexedFeatures',
                           labelCol="net_flow",
                           seed=100)
# gb = GBTRegressor(labelCol = 'net_flow',
#                   featuresCol="indexedFeatures",
#                   maxIter=10)

In [14]:
pipeline = Pipeline(stages=[featureIndexer, rf])
# rf_model = pipeline.fit(train_data)

In [15]:
rf_model = pipeline.fit(train_data)

In [16]:
predictions = rf_model.transform(train_data)
evaluator = RegressionEvaluator(
    labelCol="net_flow", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2 on train data = %g" % r2)

R2 on train data = 0.24448


In [17]:
predictions = rf_model.transform(test_data)

In [18]:
predictions.select("prediction", "net_flow", "features").show(5)

+------------------+--------+-------------+
|        prediction|net_flow|     features|
+------------------+--------+-------------+
|2559.4922198841623|    2164|    (5,[],[])|
|2559.4922198841623|     460|(5,[0],[1.0])|
|2559.4922198841623|     842|(5,[0],[1.0])|
|2559.4922198841623|     900|(5,[0],[2.0])|
|2559.4922198841623|    1730|(5,[0],[2.0])|
+------------------+--------+-------------+
only showing top 5 rows



In [19]:
evaluator = RegressionEvaluator(
    labelCol="net_flow", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2 on test data = %g" % r2)

R2 on test data = 0.24384
