In [1]:
#from pyspark.sql import SparkSession
#spark_session = SparkSession.builder.appName('abc').getOrCreate()

In [2]:
# the type transformer
from pyspark.ml.feature import SQLTransformer

sql_transformer01 = SQLTransformer(
    statement="""
        SELECT
            cast(season as int),
            cast(yr as int),
            cast(mnth as int),
            cast(holiday as int),
            cast(weekday as int),
            cast(workingday as int),
            cast(weathersit as int),
            cast(temp as double),
            cast(atemp as double),
            cast(hum as double),
            cast(windspeed as double),
            cast(cnt as int) as label
        FROM __THIS__
    """)

In [4]:
# the assembly transfomer
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(). \
    setInputCols(["season","yr","mnth","holiday","weekday","workingday","weathersit","temp",
                 "atemp","hum","windspeed"]). \
    setOutputCol("features")

In [3]:
# the selection of needed variables transformer
sql_transformer02 = SQLTransformer(
    statement = """
        SELECT
            features,
            label
        FROM __THIS__
    """
    )

In [5]:
# The linear regression model
from pyspark.ml.regression import LinearRegression
lr = LinearRegression()

In [6]:
# Assembly the pipeline
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    sql_transformer01,
    assembler,
    sql_transformer02,
    lr
])

In [8]:
# Read the data
train_data = spark_session.read.csv('day.csv', header=True)

In [9]:
# fit the pipeline
pipeline_model = pipeline.fit(train_data)

In [10]:
# now this model is a transformer
train_evaluate = pipeline_model.transform(train_data)

train_evaluate.limit(5).toPandas()

Unnamed: 0,features,label,prediction
0,"[1.0, 0.0, 1.0, 0.0, 6.0, 0.0, 2.0, 0.344167, ...",985,1898.431797
1,"[1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 2.0, 0.363478, ...",801,1374.425553
2,"[1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.196364, ...",1349,1512.844903
3,"[1.0, 0.0, 1.0, 0.0, 2.0, 1.0, 1.0, 0.2, 0.212...",1562,1739.506787
4,"[1.0, 0.0, 1.0, 0.0, 3.0, 1.0, 1.0, 0.226957, ...",1600,2012.868269


In [15]:
# evaluate

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()

r2 = evaluator.evaluate(train_evaluate, {evaluator.metricName: "r2"})
rmse = evaluator.evaluate(train_evaluate, {evaluator.metricName: "rmse"})

print("r2 {:.3f} rmse: {:.3f}".format(r2, rmse))

r2 0.800 rmse: 865.226


In [16]:
# save the models
pipeline_model.save("pipeline_model")