In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, monotonically_increasing_id, col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from datetime import datetime

sc = SparkContext()
spark = SparkSession(sc)

In [2]:
## Importing the dataset for train & test from CoinDesk...
def data_import(file = ""):
    if file == "":
        return None

    df = spark.read.format("csv").load(file, inferSchema=True, header=True)

    print("\n\n\nImported Train/Test Dataset Preview:")
    df.show(5)

    return df


def data_preprocessing(df, label):

    ## Discarding unnecessary column 'Currency'
    #df = df.drop('Currency')
    
    ## While 'Date' is not considered a feature provided it is an object,
    ## 'Closing Price (USD)' is the label.
    ## Thus, the rest of the columns after discarding the two would represent the feature columns
    feature_cols = df.drop('Date', label).columns
    
    ## 'Date' is already formatted, thus casting to 'timestamp' without converting.
    ## Although not a feature column, it is kept in the DataFrame for plotting the Series later.
    df = df.withColumn('Date', df.Date.cast('timestamp'))

    return df.sort('Date'), feature_cols


## Assembling the features vector to be used by the Linear Regressor later.
## This will add a new column called "features" (as named below) containing
## the values, per row, from each nominated feature column.
def feature_vector(feature_columns):
    
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    assembler = assembler.transform(data)
    
    return assembler


def lregresor(train, test, label):
    
    ## The Linear Regressor will use the newly added column "features" (as nominated below).
    ## It will then make predictions based on the "Closing Price (USD)" labeled training set.  
    lr = LinearRegression(featuresCol="features", labelCol=label)
    
    ## Training on the labeled dataset...
    model = lr.fit(train)
    
    ## Evaluating on the lable-free test dataset
    evaluation = model.evaluate(test)
    print("Absolute mean error: ", evaluation.meanAbsoluteError)
    print("Mean squared error, root: ", evaluation.rootMeanSquaredError)
    print("r2: ", evaluation.r2)
    
    ## Predicting on the lable-free test dataset
    predict = model.transform(test)
    return predict, evaluation


## Computing a datetime object from a given date string.
def get_timeline(str_date):
    return datetime.strptime(str_date, "%Y-%m-%d")


## To plot the predictions and the true values, it is essential to export to CSV
## For plotting a time Series it is also required to sort the dataset, before exporting.
def fields_to_csv(df, fields=None, timeline=False, timeline_col=None, file="output"):

    if fields is None:
        return None

    sel = df.select(fields)

    if timeline is True:
        if timeline_col is None:
            return None
        sel = sel.sort(timeline_col, ascending=True)

    sel = sel.persist()

    print("\n\n\nSaved " + file + "Dataset Preview:")
    sel.show(n=20, truncate=False)

    sel.write.save(file + ".csv", format="csv")

In [3]:
data = data_import(file="appl_daily.csv")

label = 'Close'
data, feature_columns = data_preprocessing(df=data, label=label)




Imported Train/Test Dataset Preview:
+----------+----------+----------+----------+----------+--------+----------+
|      Date|      Open|      High|       Low|     Close|  Volume| Adj Close|
+----------+----------+----------+----------+----------+--------+----------+
|2015-07-09|123.849998|124.059998|119.220001|    120.07|77821600|    120.07|
|2015-07-08|124.480003|124.639999|122.540001|    122.57|60490200|    122.57|
|2015-07-07|125.889999|126.150002|123.769997|125.690002|46716100|125.690002|
|2015-07-06|124.940002|126.230003|124.849998|     126.0|27900200|     126.0|
|2015-07-02|    126.43|126.690002|125.769997|126.440002|27122500|126.440002|
+----------+----------+----------+----------+----------+--------+----------+
only showing top 5 rows



In [5]:
assembled_data = feature_vector(feature_columns=feature_columns)

## The train/test-split

from pyspark.sql.functions import percent_rank
from pyspark.sql import Window

assembled_data = assembled_data.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("Date")))
print("Assembled Data:")
assembled_data.show()

train_df = assembled_data.where("rank <= .8").drop("rank")
print("Train Split:")
train_df.show()

test_df = assembled_data.where("rank > .8").drop("rank")
print("Test Split:")
test_df.show()

predictions, _ = lregresor(train=train_df, test=test_df, label=label)

## Because the Linear Regressor creates (which it returns) a new ("predict") DataFrame,
## the 'Date' column dtype is changed to bigint, which requires to be casted back
## to 'timestamp' for conversion and format.
predictions = predictions.withColumn('Date', predictions.Date.cast('timestamp'))

print("Predicted values: ")
predictions.show(5)

## Displaying the final dtypes to ensure the 'Date' is casted back to 'timestamp'
print(predictions.dtypes)

Assembled Data:
+-------------------+---------+---------+---------+---------+---------+---------+--------------------+--------------------+
|               Date|     Open|     High|      Low|    Close|   Volume|Adj Close|            features|                rank|
+-------------------+---------+---------+---------+---------+---------+---------+--------------------+--------------------+
|1980-12-12 00:00:00| 28.74984| 28.87472| 28.74984| 28.74984|117258400| 0.440188|[28.74984,28.8747...|                 0.0|
|1980-12-15 00:00:00|27.375041|27.375041| 27.25016| 27.25016| 43971200| 0.417226|[27.375041,27.375...|1.147183664104623...|
|1980-12-16 00:00:00| 25.37472| 25.37472| 25.24984| 25.24984| 26432000|   0.3866|[25.37472,25.3747...|2.294367328209246...|
|1980-12-17 00:00:00|25.874799| 26.00024|25.874799|25.874799| 21610400| 0.396168|[25.874799,26.000...|3.441550992313869...|
|1980-12-18 00:00:00|26.625201| 26.75008|26.625201|26.625201| 18362400| 0.407658|[26.625201,26.750...|4.588734656418

+-------------------+----------+----------+----------+----------+---------+---------+--------------------+------------------+
|               Date|      Open|      High|       Low|     Close|   Volume|Adj Close|            features|        prediction|
+-------------------+----------+----------+----------+----------+---------+---------+--------------------+------------------+
|2008-08-05 00:00:00|155.420019|160.800009|154.819979|160.639992|172092900|21.556347|[155.420019,160.8...| 159.6243652067771|
|2008-08-06 00:00:00|159.970016|167.400026|158.000011|164.189966|197852200| 22.03272|[159.970016,167.4...|164.89001969102202|
|2008-08-07 00:00:00|162.710026|166.149971|161.500011|163.569979|168093100|21.949523|[162.710026,166.1...|164.78768248091257|
|2008-08-08 00:00:00|163.859985|169.649971|163.750023|169.550009|178499300|22.751986|[163.859985,169.6...|168.78474209846112|
|2008-08-11 00:00:00|170.069967|176.500034|169.669985|173.560034|222826100|23.290093|[170.069967,176.5...|175.31209902