# 1. Import libraries and create Spark Session #

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.config('spark.cores.max', "16") \
                            .config("spark.executor.memory", "70g") \
                            .config("spark.driver.memory", "50g") \
                            .config("spark.memory.offHeap.enabled",True) \
                            .config("spark.memory.offHeap.size","16g") \
                            .getOrCreate()

24/01/23 23:44:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# 2. Read csv file into Dataframe and Convert to RDD #

In [3]:
# Read into dataframe
df_vcb = spark.read.csv("./VCB.csv", header = True, inferSchema = True)

# Print Schema
df_vcb.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)



In [4]:
# Show simple data
df_vcb.show(10)

+----------+------------+------------+------------+------------+------------+-------+
|      Date|        Open|        High|         Low|       Close|   Adj Close| Volume|
+----------+------------+------------+------------+------------+------------+-------+
|2022-01-24|75698.562500|79170.195313|74682.468750|78746.828125|78746.828125|3221886|
|2022-01-25|78323.453125|81287.046875|75359.867188|81117.695313|81117.695313|2830030|
|2022-01-26|81117.695313|81117.695313|78408.125000|80016.937500|80016.937500|3108982|
|2022-01-27|78746.828125|80016.937500|76545.296875|77053.343750|77053.343750|2046554|
|2022-01-28|77053.343750|77900.085938|75359.867188|75359.867188|75359.867188|2528166|
|2022-01-31|        null|        null|        null|        null|        null|   null|
|2022-02-01|        null|        null|        null|        null|        null|   null|
|2022-02-02|        null|        null|        null|        null|        null|   null|
|2022-02-03|        null|        null|        null|   

In [5]:
# Convert to RDD
rdd_vcb = df_vcb.rdd
rdd_vcb.take(10)

[Stage 3:>                                                          (0 + 1) / 1]                                                                                

[Row(Date=datetime.date(2022, 1, 24), Open='75698.562500', High='79170.195313', Low='74682.468750', Close='78746.828125', Adj Close='78746.828125', Volume='3221886'),
 Row(Date=datetime.date(2022, 1, 25), Open='78323.453125', High='81287.046875', Low='75359.867188', Close='81117.695313', Adj Close='81117.695313', Volume='2830030'),
 Row(Date=datetime.date(2022, 1, 26), Open='81117.695313', High='81117.695313', Low='78408.125000', Close='80016.937500', Adj Close='80016.937500', Volume='3108982'),
 Row(Date=datetime.date(2022, 1, 27), Open='78746.828125', High='80016.937500', Low='76545.296875', Close='77053.343750', Adj Close='77053.343750', Volume='2046554'),
 Row(Date=datetime.date(2022, 1, 28), Open='77053.343750', High='77900.085938', Low='75359.867188', Close='75359.867188', Adj Close='75359.867188', Volume='2528166'),
 Row(Date=datetime.date(2022, 1, 31), Open='null', High='null', Low='null', Close='null', Adj Close='null', Volume='null'),
 Row(Date=datetime.date(2022, 2, 1), Open

# 3. Pre-Processing data #

In [6]:
# Count before process
print("Number of rows before Pre-Processing: ", rdd_vcb.count())

Number of rows before Pre-Processing:  521


In [7]:
rdd_vcb = rdd_vcb.filter(lambda row: row.Open != 'null' or row.Close != 'null' or
                         row.High != 'null' or row.Volume != 'null')
rdd_vcb.take(10)

[Row(Date=datetime.date(2022, 1, 24), Open='75698.562500', High='79170.195313', Low='74682.468750', Close='78746.828125', Adj Close='78746.828125', Volume='3221886'),
 Row(Date=datetime.date(2022, 1, 25), Open='78323.453125', High='81287.046875', Low='75359.867188', Close='81117.695313', Adj Close='81117.695313', Volume='2830030'),
 Row(Date=datetime.date(2022, 1, 26), Open='81117.695313', High='81117.695313', Low='78408.125000', Close='80016.937500', Adj Close='80016.937500', Volume='3108982'),
 Row(Date=datetime.date(2022, 1, 27), Open='78746.828125', High='80016.937500', Low='76545.296875', Close='77053.343750', Adj Close='77053.343750', Volume='2046554'),
 Row(Date=datetime.date(2022, 1, 28), Open='77053.343750', High='77900.085938', Low='75359.867188', Close='75359.867188', Adj Close='75359.867188', Volume='2528166'),
 Row(Date=datetime.date(2022, 2, 7), Open='76714.648438', High='78662.148438', Low='75783.234375', Close='78238.781250', Adj Close='78238.781250', Volume='2037933'),

In [8]:
# Count before process
print("Number of rows After Pre-Processing: ", rdd_vcb.count())

Number of rows After Pre-Processing:  492


# 4. Create Simple Linear Regression #

## Formular predict Close base on Open ##

Close = alpha + beta * Open

(Yt = alpha + beta * Xt)

## Step 1: Calculate mean for both Open and Close ##

In [9]:
# Function for calculate mean
def calculate_mean_rdd(rdd_base, row_name):
    # Create specific rdd (Open_rdd, Close_rdd)
    specific_rdd = rdd_base.map(lambda x: float(x[row_name]))
    
    # Find number of rows
    number_of_value = specific_rdd.count()
    
    # Calculate sum value
    sum_value = specific_rdd.reduce(lambda x, y: x + y)
    
    # Mean value
    mean = sum_value / number_of_value
    
    return mean

In [10]:
# Calculate average for Close
close_mean = calculate_mean_rdd(rdd_vcb, 'Close')
close_mean

75103.89530947953

In [11]:
# Calculate average for Open
open_mean = calculate_mean_rdd(rdd_vcb, 'Open')
open_mean

75070.28237755678

## Step 2: Calculate real_value - mean ##

In [12]:
# Function for calculate real_value - mean_value for each columns
def calculate_real_minus_mean(rdd_base, row_name, mean):
    # Calculate
    rdd_real_mean = rdd_base.map(lambda x: float(x[row_name]) - mean)
    
    return rdd_real_mean

In [13]:
close_real_mean_rdd = calculate_real_minus_mean(rdd_vcb, 'Close', close_mean)
close_real_mean_rdd.take(10)

[3642.932815520471,
 6013.800003520475,
 4913.042190520471,
 1949.4484405204712,
 255.971878520475,
 3134.885940520471,
 2796.190628520475,
 1780.1046905204712,
 2203.471878520475,
 1102.706253520475]

In [14]:
open_real_mean_rdd = calculate_real_minus_mean(rdd_vcb, 'Open', open_mean)
open_real_mean_rdd.take(10)

[628.2801224432187,
 3253.1707474432187,
 6047.4129354432225,
 3676.5457474432187,
 1983.0613724432187,
 1644.3660604432225,
 3168.4988724432187,
 2152.4129354432225,
 2067.7332474432187,
 2237.0848104432225]

## Step 3: Calculate Alpha and Beta ##

In [24]:
# Function for calculate beta
def calculate_beta(ytb_rdd, xtb_rdd):
    # Filter out non-numeric values and convert to float
    ytb_rdd_filtered = ytb_rdd.map(lambda x: float(x))
    xtb_rdd_filtered = xtb_rdd.map(lambda x: float(x))

    # Calculate Denominator_Beta
    denominator_beta = xtb_rdd_filtered.map(lambda x: x * x).sum()

    # Calculate Numerator_Beta
    combined_rdd = ytb_rdd_filtered.zip(xtb_rdd_filtered)
    numerator_beta = combined_rdd.map(lambda x: x[0] * x[1]).sum()

    # Calculate Beta (Numerator_Beta / Denominator_Beta)
    beta = numerator_beta / denominator_beta

    return beta

In [25]:
beta = calculate_beta(close_real_mean_rdd, open_real_mean_rdd)
beta

1.0004552459956448

In [26]:
# Function for calculate alpha
def calculate_alpha(ytb, xtb, beta):
    return (ytb - xtb * beta)

In [27]:
alpha = calculate_alpha(close_mean, open_mean, beta)
alpha

-0.5625135215668706

# 5. Predict new value #

**Close = -0.5625135215668706 + 1.0004552459956448 * Open**

In [28]:
# New open
new_open_values = [3000, 3500, 4000, 4500, 5000]

# Predict_Close
def predict_close(open_value):
    return -0.5625135215668706 + 1.0004552459956448 * open_value

# Predict
predicted_closes = [predict_close(open_value) for open_value in new_open_values]

# Result
for i in range(len(new_open_values)):
    print(f'Predict value for Close base on Open {new_open_values[i]} là: {predicted_closes[i]}')

Predict value for Close base on Open 3000 là: 3000.8032244653677
Predict value for Close base on Open 3500 là: 3501.03084746319
Predict value for Close base on Open 4000 là: 4001.2584704610126
Predict value for Close base on Open 4500 là: 4501.486093458835
Predict value for Close base on Open 5000 là: 5001.7137164566575
