In [1]:
import sys
assert sys.version_info >= (3, 5) # make sure we have Python 3.5+

from pyspark.sql import SparkSession, functions, types
spark = SparkSession.builder.appName('colour prediction').getOrCreate()
spark.sparkContext.setLogLevel('WARN')
assert spark.version >= '2.4' # make sure we have Spark 2.4+

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, SQLTransformer
from pyspark.ml.regression import *
from pyspark.ml.evaluation import RegressionEvaluator


tmax_schema = types.StructType([
    types.StructField('station', types.StringType()),
    types.StructField('date', types.DateType()),
    types.StructField('latitude', types.FloatType()),
    types.StructField('longitude', types.FloatType()),
    types.StructField('elevation', types.FloatType()),
    types.StructField('tmax', types.FloatType()),
])


def main(inputs, output):
    # read data
    data = spark.read.csv(inputs, schema=tmax_schema)

    # prepare train and validation set
    train, validation = data.randomSplit([0.75, 0.25])
    train = train.cache()
    validation = validation.cache()

    # transform date to day-of-year
    stm_yesterday_tmax = 'SELECT    today.station as station, \
                                    today.date, \
                                    dayofyear(today.date) as dayofyear, \
                                    today.latitude as latitude, \
                                    today.longitude as longitude, \
                                    today.elevation as elevation, \
                                    today.tmax as tmax \
                            FROM __THIS__ as today '
    transformer = SQLTransformer(statement=stm_yesterday_tmax)

    # input columns
    assembler = VectorAssembler(inputCols=['dayofyear', 'latitude', 'longitude', 'elevation'], outputCol='features')

    # output column
    regressor = GBTRegressor(featuresCol='features', labelCol='tmax', maxIter=20, maxDepth=10)

    # pipeline
    pipeline = Pipeline(stages=[transformer, assembler, regressor])

    # train model
    model = pipeline.fit(train)

    # make predictions
    predictions = model.transform(validation)
    predictions.show()

    # evaluate model
    r2_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='tmax', metricName='r2')
    r2 = r2_evaluator.evaluate(predictions)
    
    rmse_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='tmax', metricName='rmse')
    rmse = rmse_evaluator.evaluate(predictions)

    # save model
    model.write().overwrite().save(output)

    # print score
    print("r2: %f" % (r2))
    print("rmse: %f" % (rmse))


if __name__ == '__main__':
    inputs = 'tmax-3'
    output = 'weather-model'

    main(inputs, output)

+-----------+----------+---------+--------+---------+---------+----+--------------------+------------------+
|    station|      date|dayofyear|latitude|longitude|elevation|tmax|            features|        prediction|
+-----------+----------+---------+--------+---------+---------+----+--------------------+------------------+
|AG000060390|1973-09-06|      249| 36.7167|     3.25|     24.0|29.2|[249.0,36.7167015...|29.827628461733227|
|AG000060390|1973-09-30|      273| 36.7167|     3.25|     24.0|29.3|[273.0,36.7167015...| 26.26146630500702|
|AG000060390|1988-05-06|      127| 36.7167|     3.25|     24.0|23.1|[127.0,36.7167015...|22.959274273946637|
|AG000060611|1965-01-02|        2|   28.05|   9.6331|    561.0|12.6|[2.0,28.049999237...|19.387651933328403|
|AG000060680|1965-04-29|      119|    22.8|   5.4331|   1362.0|28.8|[119.0,22.7999992...| 26.05643550621891|
|AGE00147716|1890-10-26|      299|    35.1|    -1.85|     83.0|21.0|[299.0,35.0999984...|22.322371338922267|
|AGE00147716|1890-1