# Modul 8 Analisis Big Data
# Time Series Analysis dengan PySpark

#Import useful libraries

In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=6b988507945d55fce97abe06fc352e1f3ef43fe6aab4b90d89e7174dbb868cef
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


menginstal pyspark terlebih dahulu menggunakan pip install pyspark

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime

kemudian import library yang diperlukan

#Read dataset from csv

**NB**: For some reason yahoo finance responded with 403 when trying to download the dataset directly with wget, so the file "BTC_USD.csv" (included in the archive) needs to be added to DBFS at "dbfs:/FileStore/BTC_USD/BTC_USD.csv"

In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("YourAppName") \
    .getOrCreate()
#read from dbfs
df = spark.read.load("/content/BTC-USD.csv",
                      format="csv",
                      sep=",",
                      inferSchema="true",
                      header="true");

# Show the DataFrame schema
df.printSchema()

# Show the first few rows of the DataFrame
df.show()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2014-09-17|465.864014|468.174011|452.421997|457.334015|457.334015|21056800|
|2014-09-18|456.859985|456.859985|413.104004|424.440002|424.440002|34483200|
|2014-09-19|424.102997|427.834991|384.532013| 394.79599| 394.79599|37919700|
|2014-09-20|394.673004| 423.29599|389.882996|408.903992|408.903992|36863600|
|2014-09-21|408.084991|412.425995|   393.181|398.821014|398.821014|26580100|
|2014-09-22|399.100006|406.915985|397.130005|402.152008|402.152008|24127600|
|2014-09-23| 402.09201|441.557007|396.196991|4

lalu import data dan tampilkan data dalam bentuk dataframe schema

#Train-test split

Validation not needed because CrossValidator will use part of train set as validation set (KFold)

Proportion of split is ~ 70/30

In [None]:
train_set = df.filter(col("Date") < "2019-09-17")
test_set = df.filter(col("Date") >= "2019-09-17")

# Show the number of rows in each set
print("Number of rows in train set:", train_set.count())
print("Number of rows in test set:", test_set.count())

Number of rows in train set: 1826
Number of rows in test set: 861


lalu bagi dataset menjadi data uji dan data latih

#Feature importance analysis

Pearson and Spearman correlation matrices used to study the correlation between each couple of features.

Being all features quite highly correlated, I choose to keep just one of them (close price) to be able to use a window of more days in the models

In [None]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

def show_matrix(matrix):
    """
    function to print a matrix on screen
    """
    print(matrix.collect()[0][matrix.columns[0]].toArray())
    print()

vector_col = "features"
assembler = VectorAssembler(inputCols=["Open", "Close", "High", "Low", "Volume"], outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)

matrix_pearson = Correlation.corr(df_vector, vector_col)    #pearson is default
matrix_spearman = Correlation.corr(df_vector, vector_col, "spearman")
show_matrix(matrix_pearson)
show_matrix(matrix_spearman)

[[1.         0.99883087 0.99953588 0.99908664 0.73391176]
 [0.99883087 1.         0.99948583 0.99940123 0.73284666]
 [0.99953588 0.99948583 1.         0.99904118 0.73738097]
 [0.99908664 0.99940123 0.99904118 1.         0.72644921]
 [0.73391176 0.73284666 0.73738097 0.72644921 1.        ]]

[[1.         0.99909409 0.99955709 0.99941132 0.93222606]
 [0.99909409 1.         0.99951155 0.99957452 0.93249602]
 [0.99955709 0.99951155 1.         0.9991788  0.93351421]
 [0.99941132 0.99957452 0.9991788  1.         0.9314874 ]
 [0.93222606 0.93249602 0.93351421 0.9314874  1.        ]]



lalu lakukan perhitungan korelasi antara kolom-kolom dalam DataFrame PySpark.

#Feature scaling

Tanh estimator used to scale all the feature values

In [None]:
import math

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType

# Define the Tanh estimator function as a UDF
@udf(returnType=FloatType())
def tanh_estimator(x):
    """
    User-defined function, applies Tanh estimator's formula to a feature value x
    """
    return 0.5 * (math.tanh(0.01 * (x - mean) / std) + 1)

# Define a function to scale the DataFrame using the Tanh estimator function
def scale_transform(df):
    """
    Transforms a DataFrame by applying the Tanh estimator UDF
    """
    # Ensure "Close" column exists
    if "Close" not in df.columns:
        raise ValueError("Column 'Close' not found in DataFrame.")

    # Convert "Close" column to FloatType if it's not already
    if df.schema["Close"].dataType != FloatType():
        df = df.withColumn("Close", col("Close").cast(FloatType()))

    # Apply the Tanh estimator UDF to the "Close" column
    return df.withColumn("Scaled_Close", tanh_estimator(col("Close")))

# Apply scaling transformation to the training and test sets
scaled_train_set = scale_transform(train_set)
scaled_test_set = scale_transform(test_set)

# Show the first few rows of scaled DataFrame
scaled_train_set.show()
scaled_test_set.show()

mengaplikasikan transformasi skala pada DataFrame PySpark dengan menggunakan estimasi Tanh sebagai fungsi pemodelan skala

#Sliding window

Window of 30 days is slided on the close prices in order to create train and test set, composed by examples such as:

x={day(i), ... , day(i+29)}, y={day(i+30)}

In [None]:
from pyspark.sql.window import Window

def slide_window(df, window_size):
    """
    Returns two new dataframes:
    X - obtained sliding a window of given size (=#window_size) on the original dataframe, aggregating #window_size close prices on the same row
    y - for each row of X, y contains a row with the (single) price of the day after last day contained in X
    """

    w = Window.orderBy("Date")
    indexed_df = df.withColumn("Index", row_number().over(w)).select("Index", "Close")    #adding index to be able to loop following order and create windows

    schema = StructType([StructField("Close", ArrayType(FloatType()), False)])   #schema for X (array of floats)

    X = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema )
    y = spark.createDataFrame(spark.sparkContext.emptyRDD(), FloatType())

    length = indexed_df.count()
    for i in range(window_size+1, length+1):
        new_df = indexed_df.where(col("Index").between(i-window_size, i-1)).select("Close")    #select the window
        new_row = new_df.agg(collect_list("Close").alias("Close"))    #create new X's row with all prices from window
        X = X.union(new_row)
        new_row = indexed_df.where(col("Index") == i).select("Close")    #create new Y's row with price of the day after last day contained in X
        y = y.union(new_row)

    return X, y

kode diatas digunakan untuk membuat Sliding window pada DataFrame dengan mengumpulkan harga penutupan dari sejumlah hari yang diberikan dalam setiap window dan menghasilkan DataFrame baru untuk X (Sliding window) dan y (harga satu hari setelah window terakhir dalam X).

In [None]:
window = 30    #window size

X_train, y_train = slide_window(scaled_train_set, window)    #slide window on train set
X_test, y_test = slide_window(scaled_test_set, window)    #slide window on test set

print("X_train shape:", X_train.schema)
print("y_train shape:", y_train.schema)

X_train shape: StructType([StructField('Close', ArrayType(FloatType(), True), False)])
y_train shape: StructType([StructField('value', FloatType(), True)])


kode tersebut digunakan untuk menerapkan Sliding window pada data pelatihan dan pengujian yang telah disesuaikan skala, dan kemudian menampilkan bentuk atau skema DataFrame yang dihasilkan setelah proses Sliding window

#Merging X and y

X and y (for both train and test) need to be merged as the Pyspark regression models require them in a single dataframe

In [None]:
def merge_X_y(X, y):
    """
    merges two dataframes column-wise
    """
    schema = StructType(X.schema.fields + y.schema.fields)
    X_y = X.rdd.zip(y.rdd).map(lambda x: x[0]+x[1])
    return spark.createDataFrame(X_y, schema)

X_y_train = merge_X_y(X_train, y_train)
X_y_test = merge_X_y(X_test, y_test)

menggabungkan dua DataFrame menjadi satu DataFrame dengan metode yang spesifik, di mana kolom-kolom dari kedua DataFrame digabungkan secara berdampingan

#Vectorization of windows

Windows represented as lists of days need to be converted to vectors (rows) of features as Pyspark regression models require dataframes in this form

In [None]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark import StorageLevel

list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())    #converts list of prices to features vector

def assemble_window(X_y):
    """
    applies list_to_vector_udf to given dataframe
    """
    return X_y.select(list_to_vector_udf(X_y["Close"]).alias("features"), X_y["value"].alias("label"))

X_y_train_vec = assemble_window(X_y_train)
X_y_test_vec = assemble_window(X_y_test)

digunakan untuk mengonversi daftar harga dalam DataFrame menjadi vektor fitur dan menggabungkannya dengan label yang sesuai

#Hyperparameter tuning/model selection/evaluation

In this section linear regression and gradient-boosted trees regression models are cross-validated partitioning the train set in 3 folds in order to tune their hyperparameters and find the best model.

Then the best models found are tested on unseen data (test set) and the actual and predicted prices are plotted.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import RegressionEvaluator

def cross_validate(model, param_grid, df):
    """
    Performs grid search on given model with given parameter grid, using given dataframe as train/validation data.
    Returns the validated model (ready to be used for predictions using best parameters found)
    """
    evaluator = RegressionEvaluator(metricName="rmse")
    cv = CrossValidator(estimator=model, estimatorParamMaps=param_grid, evaluator=evaluator)
    validated_model = cv.fit(df)
    return validated_model

digunakan untuk mengoptimalkan model pembelajaran mesin dengan melakukan pencarian grid dan validasi silang untuk menemukan set parameter terbaik, lalu mengembalikan model terbaik yang telah divalidasi.

#Linear Regression

In [None]:
from pyspark.ml.regression import LinearRegression

evaluator = RegressionEvaluator(metricName="rmse")

lr = LinearRegression(standardization=False)    #avoid standardization as tanh estimator has been applied yet
param_grid = ParamGridBuilder().addGrid(lr.regParam, [0.33, 0.66]).addGrid(lr.elasticNetParam, [0.33, 0.5, 0.66]).build()    #parameters to be tuned
validated_lr_model = cross_validate(lr, param_grid, X_y_train_vec)

#best parameters found
elasticNet = validated_lr_model.bestModel.getElasticNetParam()
reg = validated_lr_model.bestModel.getRegParam()

print("ElasicNetParam of best model -> ", elasticNet)
print("RegParam of best model -> = ", reg)

predictions = validated_lr_model.transform(X_y_test_vec)    #test on unseen data
RMSE = evaluator.evaluate(predictions)    #evaluate predictions using ROOT MEAN SQUARED ERROR
print("RMSE of best model on unseen data -> ", RMSE)

kode diatas digunakan untuk mengoptimalkan model regresi linear dengan pencarian grid dan validasi silang, lalu mengevaluasi performanya pada data uji menggunakan metrik RMSE.

In [None]:
def plot_predictions(predictions):
    """
    plots two lines representing predicted and actual prices
    """
    pandas_df = predictions.select('label', 'prediction').toPandas()
    plt.figure(figsize=(20, 7))
    plt.plot(range(len(pandas_df['label'].values)), pandas_df['label'].values, label = 'Actual Price', color = 'blue')
    plt.plot(range(len(pandas_df['prediction'].values)), pandas_df['prediction'].values, label = 'Predicted Price', color = 'red')
    plt.xticks(np.arange(100, pandas_df.shape[0], 200))
    plt.xlabel('Time')
    plt.ylabel('Price (scaled)')
    plt.legend()
    plt.show()

plot_predictions(predictions)    #plot linear regression's predictions

#Gradient-boosted trees

In [None]:
from pyspark.ml.regression import GBTRegressor

gbt_r = GBTRegressor()
param_grid = ParamGridBuilder().addGrid(gbt_r.maxDepth, [4, 8, 12]).addGrid(gbt_r.featureSubsetStrategy, ['0.33', '0.66']).build()    #parameters to be tuned
validated_gbt_model = cross_validate(gbt_r, param_grid, X_y_train_vec)

#best parameters found
max_depth = validated_gbt_model.bestModel.getMaxDepth()
subsample = validated_gbt_model.bestModel.getFeatureSubsetStrategy()

print("maxDepth of best model -> ", max_depth)
print("featureSubsetStrategy of best model -> = ", subsample)

predictions = validated_gbt_model.transform(X_y_test_vec)    #test on unseen data
RMSE = evaluator.evaluate(predictions)    #evaluate predictions using ROOT MEAN SQUARED ERROR
print("RMSE of best model on unseen data ->  ", RMSE)

In [None]:
plot_predictions(predictions)    #plot gradient-boosted trees' predicitons