In [0]:
%spark.pyspark

from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import random

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["date", "price"]

data = []
start_date = datetime(2023, 1, 1)
for i in range(100):
    price = round(random.uniform(10, 100), 2)
    data.append((start_date.strftime("%Y-%m-%d"), price))
    start_date += timedelta(days=1)

# Create DataFrame with inferred schema
df = spark.createDataFrame(data, schema=columns)

df.show()

# Cumulative Moving Average
## $$ \text{CMA}(k) = \frac{x_1 + x_2 + \ldots + x_k}{k} $$


In [2]:
%spark.pyspark
%matplotlib inline
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import pandas as pd

def cumulative_moving_average(df):
    window_spec = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, 0) # Tính từ đầu
    df = df.withColumn('cma', F.round(F.avg("price").over(window_spec), 2))
    df_cma = df.select("date", "price", "cma")
    
    return df_cma

df_cma = cumulative_moving_average(df)
df_cma.show(truncate=False)

#Convert Spark DataFrame to Pandas DataFrame for plotting
pandas_df = df_cma.toPandas()
pandas_df['date'] = pd.to_datetime(pandas_df['date'])

# Plotting price and sma on the same plot using Matplotlib
plt.figure(figsize=(22, 8))
plt.plot(pandas_df['date'], pandas_df['price'], label='Price')
plt.plot(pandas_df['date'], pandas_df['cma'], label='CMA')
plt.xlabel('Date')
plt.ylabel('Value')
plt.title('Price and Cumulative Moving Average (CMA)')
plt.legend()
plt.xticks(rotation=45)

plt.show()



# Simple Moving Average
## $$ \text{SMA}(k) = \frac{x_{k-T+1} + x_{k-T+2} + \ldots + x_k}{T} $$
### Với T là  chu kì tính (tông số phiên tính : các phiên trước + phiên hiện tại)





In [5]:
%spark.pyspark
%matplotlib inline
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import pandas as pd

def simple_moving_average(df, period=3):
    window_spec = Window.orderBy("date").rowsBetween(1 - period, 0)
    df = df.withColumn("sma", F.round(F.avg("price").over(window_spec), 2))
    df_sma = df.select("date", "price", "sma")
    return df_sma

period_sma = int(z.input("Enter SMA period: ", 3))  
df_sma = simple_moving_average(df, period_sma)
df_sma.show(truncate=False)
# df_sma.createOrReplaceTempView('sma') to switch to sql

# Convert Spark DataFrame to Pandas DataFrame for plotting
pandas_df = df_sma.toPandas()
pandas_df['date'] = pd.to_datetime(pandas_df['date'])

# Plotting price and sma on the same plot using Matplotlib
plt.figure(figsize=(22, 8))
plt.plot(pandas_df['date'], pandas_df['price'], label='Price')
plt.plot(pandas_df['date'], pandas_df['sma'], label='SMA')
plt.xlabel('Date')
plt.ylabel('Value')
plt.title('Price and Simple Moving Average (SMA)')
plt.legend()
plt.xticks(rotation=45)

plt.show()



# Exponential Moving Average

## $$ \text{EMA}_k = x_0  \text{ if }  k = 0  \text{ else } = a \cdot x_k + (1 - a) \cdot \text{EMA}_k  \text{ }_1 $$
## Biểu thức biến đối : $$ y_k = \frac{x_k + (1-a) \cdot x_{t-1} + (1-a)^2 \cdot x_{k-2} + ... + (1-a)^n \cdot x_{t-n} }{1 + (1-a) + (1-a)^2 + ... + (1-a)^n} $$
### Với a : $$\frac{2}{T + 1} $$ 
###     T : chu kì tính (12)


In [7]:
%spark.pyspark
%matplotlib inline
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

def ema_calculate(t, values, N):
    alpha = 2.0 / (N + 1) # Chỉ số alpha
    adjusted_weights = [pow(1 - alpha, t - index) for index in range(t + 1)] # [(1-a)^0 , (1-a)^1 , (1-a)^2 , ... , (1-a)^n]
    numerator = sum(map(lambda x, y: x * y, adjusted_weights, values[:t + 1])) # tử số , nhân 2 phần tử tương ứng 2 mảng và tính tổng các kết quả
    denomiator = sum(adjusted_weights)
    result =  numerator / denomiator
    
    return  result

def exponential_moving_average(df, period = 10):
    column = "price"
    orderByColumn = "date"
    
    ema_udf = F.udf(ema_calculate)
    window_spec = Window.orderBy(orderByColumn)
    df_with_t = df.withColumn("t", F.row_number().over(window_spec) - 1)
    df_ema = df_with_t.withColumn("ema", ema_udf(F.col("t"), F.collect_list(column).over(window_spec), F.lit(period))).drop("t")
    df_ema = df_ema.select("date", "price", "ema")
    
    return df_ema

period_ema = int(z.input("Enter SMA period: ", 12))  
df_ema = exponential_moving_average(df, period_ema)
df_ema.show(truncate=False)

#Convert Spark DataFrame to Pandas DataFrame for plotting
pandas_df = df_ema.toPandas()
pandas_df['date'] = pd.to_datetime(pandas_df['date'])

# Plotting price and sma on the same plot using Matplotlib
plt.figure(figsize=(22, 8))
plt.plot(pandas_df['date'], pandas_df['price'], label='Price')
plt.plot(pandas_df['date'], pandas_df['ema'], label='EMA')
plt.xlabel('Date')
plt.ylabel('Value')
plt.title('Price and Exponential Moving Average (EMA)')
plt.legend()
plt.yticks(np.linspace(0, 100, 10))
plt.xticks(rotation=45)

plt.show()


# Relative Strength - Relative Strength Index

## $$ \text{RS} = \frac{\text{AVG(GAIN)}_T}{\text{AVG(LOSS)}_T} $$
## $$ \text{RSI} = 100 - \frac{100}{1 + \text{RS} } $$
### Với: T : chu kì tính (14)

In [9]:
%spark.pyspark
%matplotlib inline
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

def relative_strength_index(df, period = 14):
    # Tính độ tăng/giảm giữa 2 phiên liên tiếp
    window_spec = Window.orderBy('date')
    df_dis = df.withColumn('dis',F.col('price') - F.lag(F.col('price'), 1).over(window_spec))
    df_dis = df_dis.withColumn('up', F.when(F.col('dis') >= 0, F.col('dis')).otherwise(None))
    df_dis = df_dis.withColumn('down', F.when(F.col('dis') < 0, -F.col('dis')).otherwise(None))
    df_dis = df_dis.drop('dis')
    
    # Tính TBC tăng/giảm theo chu kì (14) và chia tỉ lệ ra rs
    df_avg_dis = df_dis.withColumn('avg_up', F.when(F.row_number().over(Window.orderBy('date')) > period,
                                              F.avg('up').over(window_spec)).otherwise(None))
    df_avg_dis = df_avg_dis.withColumn('avg_down', F.when(F.row_number().over(Window.orderBy('date')) > period,
                                              F.avg('down').over(window_spec)).otherwise(None))
    df_rs = df_avg_dis.withColumn('rs',F.round(F.col("avg_up") / F.col("avg_down"),2))
    
    # Tính rsi từ rs
    df_rsi = df_rs.withColumn('rsi', 100 - 100 / (1 + F.col('rs')))
    df_rsi = df_rsi.withColumn('rsi', F.round(F.col('rsi'), 2))
    df_rsi = df_rsi.select("date", "price", "rsi")
    
    return df_rsi

period_rsi = int(z.input("Enter RSI period: ", 14))  
df_rsi = relative_strength_index(df, period_rsi)
df_rsi.show(truncate=False)

#Convert Spark DataFrame to Pandas DataFrame for plotting
pandas_df = df_rsi.toPandas()
pandas_df['date'] = pd.to_datetime(pandas_df['date'])

# Plotting price and sma on the same plot using Matplotlib
plt.figure(figsize=(22, 8))
plt.plot(pandas_df['date'], pandas_df['rsi'], label='RSI')
plt.xlabel('Date')
plt.ylabel('RSI')
plt.title('Price and Relative Strength Index (RSI)')
plt.legend()
# plt.yticks(np.linspace(0, 100, 10))
plt.xticks(rotation=45)

plt.show()
