In [118]:
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql.window import Window
from pyspark.sql.functions import (coalesce, col, lag, when, mean, lit, stddev, count, expr, avg, udf, collect_list, max, min, isnan)
import numpy as np

In [119]:
spark = SparkSession.builder.appName("TransformFile").config("spark.jars", "./postgresql-42.7.1.jar").getOrCreate()

In [120]:
def merge_parquet_files(input_folder="tmp1/stock_data/stock_history"):
    input_path = Path(input_folder)
    folders = [str(folder) for folder in input_path.glob("*") if folder.is_dir()]

    merged_df = None

    for folder in folders:
        df = spark.read.parquet(str(Path(folder).resolve()))
        
        if merged_df is None:
            merged_df = df
        else:
            merged_df = merged_df.union(df)

    if merged_df:
        merged_df = merged_df.withColumn("Date", col("Time").cast("date"))
        merged_df = merged_df.drop("Time")
        merged_df = merged_df.orderBy("Symbol", "Date")
        merged_df.show(10)
    else:
        print("No Parquet files found.")

    return merged_df

merged_df = merge_parquet_files(input_folder="tmp1/stock_data/stock_history")

24/02/23 16:54:32 WARN DAGScheduler: Broadcasting large task binary with size 1628.0 KiB

+------+-----+-----+-----+-----+------+----------+
|Symbol| Open| High|  Low|Close|Volume|      Date|
+------+-----+-----+-----+-----+------+----------+
|   A32|27680|27680|27680|27680|   100|2023-05-09|
|   A32|30450|30450|30450|30450|   100|2023-05-10|
|   A32|30450|30450|30450|30450|   500|2023-05-11|
|   A32|30450|30450|26760|26760|   300|2023-05-17|
|   A32|29260|29260|29260|29260|   100|2023-05-18|
|   A32|30450|30450|30450|30450|   700|2023-05-23|
|   A32|29710|29710|29710|29710|   800|2023-05-25|
|   A32|29620|29620|29620|29620|   600|2023-05-26|
|   A32|28600|28600|28600|28600|   200|2023-05-29|
|   A32|27680|27680|27680|27680|  1000|2023-05-30|
+------+-----+-----+-----+-----+------+----------+
only showing top 10 rows



                                                                                

In [121]:
description = merged_df.describe()

description.show()

24/02/23 16:54:42 WARN DAGScheduler: Broadcasting large task binary with size 1628.8 KiB
24/02/23 16:54:48 WARN DAGScheduler: Broadcasting large task binary with size 1640.5 KiB
[Stage 344:>                                                        (0 + 8) / 8]

+-------+------+------------------+------------------+------------------+------------------+------------------+
|summary|Symbol|              Open|              High|               Low|             Close|            Volume|
+-------+------+------------------+------------------+------------------+------------------+------------------+
|  count|177796|            177796|            177796|            177796|            177796|            177796|
|   mean|  NULL| 20630.62951922428|20908.305518684334|20334.405695291232|20638.249268824944| 899311.5238588045|
| stddev|  NULL|32201.543806365287|32647.166888800373| 31758.95100311469|32167.360324551755|3422855.0810003737|
|    min|   A32|               300|               300|               300|               300|               100|
|    max|   YTC|           1279000|           1279000|           1241000|           1249000|         127103200|
+-------+------+------------------+------------------+------------------+------------------+------------

                                                                                

In [122]:
def get_all_column_data_types(df):
    return [(field.name, field.dataType.typeName()) for field in df.schema.fields]

column_data_types = get_all_column_data_types(merged_df)
for column_name, data_type in column_data_types:
    print(f"The data type of the '{column_name}' column is: {data_type}")

The data type of the 'Symbol' column is: string
The data type of the 'Open' column is: long
The data type of the 'High' column is: long
The data type of the 'Low' column is: long
The data type of the 'Close' column is: long
The data type of the 'Volume' column is: long
The data type of the 'Date' column is: date


## Price Change Compared to N-days ago

In [123]:
def calculate_price_change(df, n=1):
    sorted_df = df.orderBy("Symbol", "Date")

    window_spec = Window.partitionBy("Symbol").orderBy("Date")
    df_with_price_change = sorted_df.withColumn(
        f"Price Change To {n} Day(s) Ago",
        coalesce(df["Close"] - lag(df["Close"], n).over(window_spec), lit(None))
    )

    return df_with_price_change

# result_df = calculate_price_change(merged_df, n=1)
# result_df = calculate_price_change(result_df, n=2)
# result_df = calculate_price_change(result_df, n=3)
# result_df.show(10)

## Volume Change Compared to N-days ago

In [124]:
def calculate_volume_change(df, n=1):
    sorted_df = df.orderBy("Symbol", "Date")

    window_spec = Window.partitionBy("Symbol").orderBy("Date")
    df_with_price_change = sorted_df.withColumn(
        f"Volume Change To {n} Day(s) Ago",
        coalesce(df["Volume"] - lag(df["Volume"], n).over(window_spec), lit(None))
    )

    return df_with_price_change

# result_df = calculate_volume_change(merged_df, n=1)
# result_df = calculate_volume_change(result_df, n=2)
# result_df = calculate_volume_change(result_df, n=3)
# result_df.show(10)

## Price Difference

In [125]:
def calculate_price_difference(df):
    sorted_df = df.orderBy("Symbol", "Date")

    window_spec = Window.partitionBy("Symbol").orderBy("Date")
    df_with_price_diff = sorted_df.withColumn(
        f"Price Difference",
        coalesce(df["Close"] - df["Open"], lit(None))
    )

    return df_with_price_diff

# result_df = calculate_price_difference(merged_df)
# result_df.show(10)

## Median/Std. of Volume for N-days period

In [126]:
def calculate_median_std_volume(df, n=1):
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n+1, 0)

    count_expr = count('Volume').over(window_spec)

    median_expr = expr('percentile_approx(Volume, 0.5)')

    df = df.withColumn(f'Median Volume {n} day(s) periods', when(count_expr >= n, median_expr.over(window_spec)).otherwise(None))
    
    df = df.withColumn(f'Std Volume {n} day(s) periods', when(count_expr >= n, stddev('Volume').over(window_spec)).otherwise(None))

    return df

# result_df = calculate_median_std_volume(merged_df, n=3)
# result_df.show(10)

## Median/Std. of Price Change for N-days period

In [127]:
def calculate_median_std_price_change(df, n=1):
    sorted_df = df.orderBy("Symbol", "Date")

    window_spec = Window.partitionBy("Symbol").orderBy("Date")
    df = sorted_df.withColumn(
        "Price Change",
        coalesce(df["Close"] - lag(df["Close"]).over(window_spec), lit(None))
    )

    count_expr = count("Price Change").over(window_spec)
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n+1, 0)
    median_expr = expr('percentile_approx(`Price Change`, 0.5)')
    df = df.withColumn(f'Median Price Change {n} day(s) periods', when(count_expr >= n, median_expr.over(window_spec)).otherwise(None))
    
    df = df.withColumn(f'Std Price Change {n} day(s) periods', when(count_expr >= n, stddev('Price Change').over(window_spec)).otherwise(None))

    df = df.drop("Price Change") #Optional

    return df

# result_df = calculate_median_std_price_change(merged_df, n=3)
# result_df.show(10)

## Median/Std. of Volumn Change for N-days period

In [128]:
def calculate_median_std_volume_change(df, n=1):
    sorted_df = df.orderBy("Symbol", "Date")

    window_spec = Window.partitionBy("Symbol").orderBy("Date")
    df = sorted_df.withColumn(
        "Volume Change",
        coalesce(df["Volume"] - lag(df["Volume"]).over(window_spec), lit(None))
    )

    count_expr = count("Volume Change").over(window_spec)
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n+1, 0)
    median_expr = expr('percentile_approx(`Volume Change`, 0.5)')
    df = df.withColumn(f'Median Volume Change {n} day(s) periods', when(count_expr >= n, median_expr.over(window_spec)).otherwise(None))
    
    df = df.withColumn(f'Std Volume Change {n} day(s) periods', when(count_expr >= n, stddev('Volume').over(window_spec)).otherwise(None))

    df = df.drop("Volume Change") #Optional

    return df

# result_df = calculate_median_std_volume_change(merged_df, n=3)
# result_df.show(10)

## N-day Simple Moving Average (SMA)

In [129]:
def calculate_sma(df, n=1):
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n+1, 0)

    df = df.orderBy('Symbol', 'Date')

    sma_expr = avg('Close').over(window_spec)
    count_expr = count("Close").over(window_spec)

    df = df.withColumn(f'SMA {n} Days', when(count_expr >= n, sma_expr).otherwise(None))

    return df

# result_df = calculate_sma(merged_df, n=3)
# result_df.show(10)

## N-day Weighted Moving Average (WMA)

In [130]:
import pandas as pd

def calculate_wma(df, n):
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n+1, 0)

    def wma(values, n):
        if len(values) < n:
            return None

        return float(sum(values[i] * (n - i) for i in range(n)) / n)
        
    wma_udf = udf(wma)

    df = df.withColumn(f"WMA {n} Days", wma_udf(collect_list('Close').over(window_spec), lit(n)))
    df = df.withColumn(f"WMA {n} Days", col(f"WMA {n} Days").cast("double"))

    return df

# result_df = calculate_wma(merged_df, n=3)
# result_df.show(10)

## N-day Exponential Moving Average (EMA)

In [131]:
def calculate_ema(df, n):

    pandas_df = df.toPandas()

    pandas_df.sort_values(["Symbol", "Date"], inplace=True)
    pandas_df[f"EMA {n} Days"] = pandas_df.groupby("Symbol")["Close"].transform(lambda x: x.ewm(span=n, min_periods=n).mean())
    pandas_df = pandas_df.replace({np.nan: None})
    result_df = spark.createDataFrame(pandas_df)
    return result_df

# result_df = calculate_ema(merged_df, n=3)
# result_df = calculate_ema(result_df, n=5)
# result_df.show(10)

## Moving Average Convergence Divergence - MACD

In [132]:
def calculate_macd(df, ema_1=3, ema_2=5):
    df1 = calculate_ema(df, ema_1)
    df2 = calculate_ema(df1, ema_2)
    df2 = df2.withColumn(f"MACD of EMA{ema_1} and EMA{ema_2}", 
                   df2[f"EMA {ema_1} Days"] - df2[f"EMA {ema_2} Days"])
    
    # df2 = df2.drop(f"EMA {ema_1} Days", f"EMA {ema_2} Days") #Optional

    return df2

# result_df = calculate_macd(merged_df)
# result_df.show(10)

## %K of the Stochastic Oscillator for a N-day period

In [133]:
def calculate_K_indicator(df, n=3):
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n+1, 0)

    df = df.withColumn('HH', expr(f'max(Close)').over(window_spec).cast('double'))
    df = df.withColumn('LL', expr(f'min(Close)').over(window_spec).cast('double'))

    count_expr = count("Close").over(window_spec)

    df = df.withColumn(f'%K {n} Days', when(count_expr >= n, 100 * (col('Close') - col('LL')) / (col('HH') - col('LL'))).otherwise(None))

    df = df.drop('HH', 'LL') #Optional

    return df

# result_df = calculate_K_indicator(merged_df, n=3)
# result_df.show(10)

## %D of the Stochastic Oscillator

In [134]:
def calculate_D_indicator(df, n=3):
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n+1, 0)

    df = calculate_K_indicator(df, n=n)
    df = df.withColumn(f'%D {n} Days', avg(col(f'%K {n} Days')).over(window_spec))

    # df = df.drop(f'%K {n} Days') #Optional

    return df

# result_df = calculate_D_indicator(merged_df, n=3)
# result_df.show(10)

## Relative Strength Index (RSI) for N-day period

In [135]:
def calculate_rsi(df, n=3):

    df = df.withColumn('change', col('Close') - lag('Close').over(Window.partitionBy('Symbol').orderBy('Date')))

    df = df.withColumn('gain', when(col('change') < 0, 0.0).otherwise(col('change')))
    df = df.withColumn('loss', when(col('change') > 0, 0.0).otherwise(-col('change')))

    def rma(values, n):
        if len(values) < n:
            return None
        
        x = np.array(values)

        a = np.full_like(x, np.nan)
        a[n-1] = x[:n].mean()
        for i in range(n, len(x)):
            a[i] = (a[i-1] * (n - 1) + x[i]) / n
        return float(a[-1])

    rma_udf = udf(rma)

    window_spec = Window.partitionBy('Symbol').orderBy('Date')
    df = df.withColumn('avg_gain', rma_udf(collect_list('gain').over(window_spec), lit(n)))
    df = df.withColumn('avg_loss', rma_udf(collect_list('loss').over(window_spec), lit(n)))

    df = df.withColumn('rs', col('avg_gain') / col('avg_loss'))
    df = df.withColumn(f'RSI {n} Days', 100 - (100 / (1 + col('rs'))))

    df = df.drop('change', 'gain', 'loss', 'avg_gain', 'avg_loss', 'rs') #Optional

    return df

# result_df = calculate_rsi(merged_df, n=3)
# result_df.show(10)

## Williams %R for a N-day period

In [136]:
def calculate_williams_r(df, n):
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(- n + 1, 0)

    df = df.withColumn('Highest High', max('High').over(window_spec))
    df = df.withColumn('Lowest Low', min('Low').over(window_spec))

    df = df.withColumn(f'Williams %R {n} Days', ((col('Highest High') - col('Close')) / (col('Highest High') - col('Lowest Low'))) * -100)

    df = df.drop('Highest High', 'Lowest Low') #Optional

    return df
# result_df = calculate_williams_r(merged_df, n=3)
# result_df.show(10)

## Accumulation/Distribution Indicator (A/D)

In [137]:
def calculate_money_flow_and_ad(df, n=3):
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(- n + 1, 0)

    df = df.withColumn('Highest High', max('High').over(window_spec))
    df = df.withColumn('Lowest Low', min('Low').over(window_spec))

    money_flow_expr = ((col('Close') - col('Lowest Low')) - (col('Highest High') - col('Close'))) / (col('Highest High') - col('Lowest Low'))
    df = df.withColumn('MoneyFlow', money_flow_expr)

    window_spec = Window().partitionBy('Symbol').orderBy('Date')
    ad_expr = lag('MoneyFlow').over(window_spec) + col('MoneyFlow')
    df = df.withColumn(f'AD {n} Days', ad_expr)

    df = df.drop('Highest High', 'Lowest Low', 'MoneyFlow') #Optional

    return df
# result_df = calculate_money_flow_and_ad(merged_df, n=3)
# result_df.show(10)

## Commodity Channel Index (CCI)

[Reference](https://www.google.com/search?q=stock+cci+example+calculation&tbm=isch&ved=2ahUKEwilxcW6hfSDAxUvWPUHHYcrBfcQ2-cCegQIABAA&oq=stock+cci+example+calculation&gs_lcp=CgNpbWcQAzoECCMQJ1DuDVjdGWD7GmgAcAB4AIABiwGIAeUMkgEEMC4xM5gBAKABAaoBC2d3cy13aXotaW1nwAEB&sclient=img&ei=WvivZaWwGK-w1e8Ph9eUuA8&bih=992&biw=902#imgrc=ERE0qeFnlpRTSM)

In [138]:
def calculate_cci(df, n=3):
    df = df.withColumn('TypicalPrice', (col('High') + col('Low') + col('Close')) / 3)

    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(- n + 1, 0)
    count_expr = count("Close").over(window_spec)

    sma_expr = avg('TypicalPrice').over(window_spec)
    df = df.withColumn(f'SMA', when(count_expr >= n, sma_expr).otherwise(None))

    def md(values, n):
        if len(values) < n:
            return None

        x = np.array(values)
        avg = np.sum(x) / n
        summ = np.sum(np.abs(avg - x))
        result = summ / n
        return float(result)
        
    md_udf = udf(md)

    df = df.withColumn('MeanDeviation', md_udf(collect_list('TypicalPrice').over(window_spec), lit(n)))

    cci_expr = (col('TypicalPrice') - col('SMA')) / (0.015 * col('MeanDeviation'))
    df = df.withColumn(f'CCI {n} Days', cci_expr)

    df = df.drop('SMA', 'MeanDeviation', 'TypicalPrice') #Optional

    return df

# data = {
#     "High": [24.20, 24.07, 24.04, 23.87, 23.67, 23.59, 23.80, 23.80, 24.30, 24.15, 24.05, 24.06, 23.88, 25.14, 25.20, 25.07, 25.22, 25.37, 25.36, 25.26, 24.82, 24.44],
#     "Low": [23.85, 23.72, 23.64, 23.37, 23.46, 23.18, 23.40, 23.57, 24.05, 23.77, 23.6, 23.84, 23.64, 23.94, 24.74, 24.77, 24.90, 24.93, 24.96, 24.93, 24.21, 24.21],
#     "Close": [23.89, 23.95, 23.67, 23.78, 23.50, 23.32, 23.75, 23.79, 24.14, 23.81, 23.78, 23.86, 23.70, 24.96, 24.88, 24.96, 25.18, 25.07, 25.27, 25.00, 24.46, 24.28],
# }
# from pyspark.sql import Row

# # Create a list of Row objects from the data
# rows = [Row(High=high, Low=low, Close=close, ) for close, high, low in zip(data['Close'], data['High'], data['Low'])]

# df = spark.createDataFrame(rows)

# df = df.withColumn('Symbol', lit('AAPL'))
# df = df.withColumn("Date", date_add(lit("2022-01-01"), row_number().over(Window.orderBy("Symbol")) - 1))


# result_df = calculate_cci(merged_df, n=3)
# result_df.show(10)

## %Difference from n days

In [139]:
def calculate_percentage_change(df, n):
    window_spec = Window().partitionBy('Symbol').orderBy('Date')

    df = df.withColumn(f'Percentage Change {n} Day(s)', (col('Close') - lag('Close', n).over(window_spec)) / col('Close'))

    return df
# result_df = calculate_percentage_change(merged_df, n=1)
# result_df = calculate_percentage_change(result_df, n=2)
# result_df = calculate_percentage_change(result_df, n=3)
# result_df.show(10)

## %Difference from lowest low i for a N-day period

In [140]:
def calculate_percentage_to_lowest(df, n):
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n+1, 0)
    count_expr = count("Close").over(window_spec)

    df = df.withColumn('LL', when(count_expr >= n, expr(f'min(Close)').over(window_spec).cast('double')).otherwise(None))

    df = df.withColumn(f'Percentage Change {n} Day(s)', (col('Close') - col('LL')) / col('Close'))

    df = df.drop('LL') #Optional

    return df

# result_df = calculate_percentage_to_lowest(merged_df, n=3)
# result_df.show(10)

## %Difference from highest high for a N-day period

In [141]:
def calculate_percentage_to_highest(df, n):
    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n+1, 0)
    count_expr = count("Close").over(window_spec)

    df = df.withColumn('HH', when(count_expr >= n, expr(f'max(Close)').over(window_spec).cast('double')).otherwise(None))

    df = df.withColumn(f'Percentage Change {n} Day(s)', (col('Close') - col('HH')) / col('Close'))

    df = df.drop('HH') #Optional

    return df

# result_df = calculate_percentage_to_highest(merged_df, n=3)
# result_df.show(10)

## On-balance volume (OBV)

In [142]:
def calculate_obv(df):
    df = df.withColumn('OBV', lit(0))
    window_spec = Window().partitionBy('Symbol').orderBy('Date')

    obv_expr = (
        when(col('Close') > lag('Close').over(window_spec), 1)
        .when(col('Close') < lag('Close').over(window_spec), -1)
        .otherwise(0)
    ) * col('Volume') + lag('OBV').over(window_spec)

    df = df.withColumn('OBV', coalesce(obv_expr, lit(None)))

    return df
    
# result_df = calculate_obv(merged_df)
# result_df.show(10)

## FT-Min

In [143]:
def ft_min_udf(df, n=3):
    def ft_min(values):
        if len(values) == 0:
            return None

        fft_result = np.fft.fft(values)
        m = len(values)
        frequencies = np.fft.fftfreq(m, d=1)

        non_zero_frequencies = frequencies[np.nonzero(frequencies)]

        if len(non_zero_frequencies) > 0:
            min_frequency = np.min(np.abs(non_zero_frequencies))
            return float(min_frequency)
        else:
            return None
        
        return 1

    ft_min_udf = udf(ft_min)

    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n + 1, 0)
    df = df.withColumn(f'FT_Min {n} days', ft_min_udf(collect_list('Close').over(window_spec)))
    df = df.withColumn(f'FT_Min {n} days', col(f'FT_Min {n} days').cast("double"))

    return df

# result_df = ft_min_udf(merged_df, n=6)
# result_df.show(10)

## FT-Max

In [144]:
def ft_max_udf(df, n=3):
    def ft_max(values):
        if len(values) == 0:
            return None

        fft_result = np.fft.fft(values)
        m = len(values)
        frequencies = np.fft.fftfreq(m, d=1)

        non_zero_frequencies = frequencies[np.nonzero(frequencies)]

        if len(non_zero_frequencies) > 0:
            max_frequency = np.max(np.abs(non_zero_frequencies))
            return float(max_frequency)
        else:
            return None
        
        return 1

    ft_max_udf = udf(ft_max)

    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n + 1, 0)
    df = df.withColumn(f'FT_Max {n} days', ft_max_udf(collect_list('Close').over(window_spec)))
    df = df.withColumn(f'FT_Max {n} days', col(f'FT_Max {n} days').cast("double"))

    return df

# result_df = ft_max_udf(merged_df, n=6)

## FT-Mean

In [145]:
def ft_mean_udf(df, n=3):
    def ft_mean(values):
        if len(values) == 0:
            return None

        fft_result = np.fft.fft(values)
        m = len(values)
        frequencies = np.fft.fftfreq(m, d=1)

        non_zero_frequencies = frequencies[np.nonzero(frequencies)]

        if len(non_zero_frequencies) > 0:
            mean_frequency = np.mean(np.abs(non_zero_frequencies))
            return float(mean_frequency)
        else:
            return None
        
        return 1

    ft_mean_udf = udf(ft_mean)

    window_spec = Window().partitionBy('Symbol').orderBy('Date').rowsBetween(-n + 1, 0)
    df = df.withColumn(f'FT_Mean {n} days', ft_mean_udf(collect_list('Close').over(window_spec)))
    df = df.withColumn(f'FT_Mean {n} days', col(f'FT_Mean {n} days').cast("double"))

    return df

# result_df = ft_mean_udf(merged_df, n=6)
# result_df.show(10)

# Boolean

## Simple Boolean for Moving Areas

In [146]:
def calculate_simple_bool(df, col1, col2):
    obv_expr = (
        when(col(col1) > col(col2), 1)
        .when(col(col2).isNull(), None)
        .otherwise(-1)
    )

    df = df.withColumn(f'Bool {col2}', coalesce(obv_expr, lit(None)))
    return df

# result_df = calculate_simple_bool(result_df, "Close", "SMA 3 Days")
# result_df = calculate_simple_bool(result_df, "Close", "SMA 5 Days")
# result_df = calculate_simple_bool(result_df, "Close", "WMA 3 Days")
# result_df = calculate_simple_bool(result_df, "Close", "WMA 5 Days")
# result_df = calculate_simple_bool(result_df, "Close", "EMA 3 Days")
# result_df = calculate_simple_bool(result_df, "Close", "EMA 5 Days")

## Complex Boolean for Stochastic, RSI, CCI, ...

In [147]:
def calculate_complex_bool(df, col_name, upper, lower):
    window_spec = Window().partitionBy('Symbol').orderBy('Date')
    obv_expr = (
        when(col(col_name) >= upper, 1)
        .when(col(col_name) <= lower, -1)
        .when((col(col_name) > lower) & (col(col_name) < upper) & (col(col_name) > lag(col_name).over(window_spec)), 1)
        .when(col(col_name).isNull(), None)
        .otherwise(-1)
    )

    df = df.withColumn(f'Bool {col_name}', coalesce(obv_expr, lit(None)))
    return df

## Complex Boollean 2 for A/D and MACD

In [148]:
def calculate_complex2_bool(df, col_name):
    window_spec = Window().partitionBy('Symbol').orderBy('Date')
    obv_expr = (
        when(col(col_name) > lag(col_name).over(window_spec), 1)
        .when(col(col_name).isNull(), None)
        .otherwise(-1)
    )

    df = df.withColumn(f'Bool {col_name}', coalesce(obv_expr, lit(None)))
    return df

## Combine

In [149]:
functions_and_parameters = [
    (calculate_price_change, {'n': 1}),
    (calculate_price_change, {'n': 2}),
    (calculate_price_change, {'n': 3}),
    (calculate_volume_change, {'n': 1}),
    (calculate_volume_change, {'n': 2}),
    (calculate_volume_change, {'n': 3}),
    (calculate_price_difference, {}),
    (calculate_median_std_volume, {'n': 3}),
    (calculate_median_std_price_change, {'n': 3}),
    (calculate_median_std_volume_change, {'n': 3}),
    (calculate_sma, {'n': 3}),
    (calculate_sma, {'n': 5}),
    (calculate_wma, {'n': 3}),
    (calculate_wma, {'n': 5}),
    (calculate_ema, {'n': 3}),
    (calculate_ema, {'n': 5}),
    (calculate_simple_bool, {'col1': 'Close', 'col2': 'SMA 3 Days'}),
    (calculate_simple_bool, {'col1': 'Close', 'col2': 'SMA 5 Days'}),
    (calculate_simple_bool, {'col1': 'Close', 'col2': 'WMA 3 Days'}),
    (calculate_simple_bool, {'col1': 'Close', 'col2': 'WMA 5 Days'}),
    (calculate_simple_bool, {'col1': 'Close', 'col2': 'EMA 3 Days'}),
    (calculate_simple_bool, {'col1': 'Close', 'col2': 'EMA 5 Days'}),
    (calculate_macd, {'ema_1': 3, 'ema_2': 5}),
    (calculate_K_indicator, {'n': 3}),
    (calculate_K_indicator, {'n': 5}),
    (calculate_D_indicator, {'n': 3}),
    (calculate_D_indicator, {'n': 5}),
    (calculate_rsi, {'n': 3}),
    (calculate_rsi, {'n': 5}),
    (calculate_williams_r, {'n': 3}),
    (calculate_money_flow_and_ad, {'n': 3}),
    (calculate_cci, {'n': 3}),
    (calculate_complex2_bool, {'col_name': 'AD 3 Days'}),
    (calculate_complex2_bool, {'col_name': "MACD of EMA3 and EMA5"}),
    (calculate_complex_bool, {'col_name': '%K 3 Days', 'upper': 70, 'lower': 30}),
    (calculate_complex_bool, {'col_name': '%K 5 Days', 'upper': 70, 'lower': 30}),
    (calculate_complex_bool, {'col_name': '%D 3 Days', 'upper': 70, 'lower': 30}),
    (calculate_complex_bool, {'col_name': '%D 5 Days', 'upper': 70, 'lower': 30}),
    (calculate_complex_bool, {'col_name': 'CCI 3 Days', 'upper': 200, 'lower': -200}),
    (calculate_complex_bool, {'col_name': 'Williams %R 3 Days', 'upper': -20, 'lower': -80}),
    (calculate_percentage_change, {'n': 1}),
    (calculate_percentage_change, {'n': 2}),
    (calculate_percentage_change, {'n': 3}),
    (calculate_percentage_to_lowest, {'n': 3}),
    (calculate_percentage_to_highest, {'n': 3}),
    (calculate_obv, {}),
    (ft_min_udf, {'n': 6}),
    (ft_max_udf, {'n': 6}),
    (ft_mean_udf, {'n': 6}),
]

result_df = merged_df
for func, params in functions_and_parameters:
    result_df = func(result_df, **params)
result_df.show()

24/02/23 16:55:03 WARN DAGScheduler: Broadcasting large task binary with size 1628.8 KiB
24/02/23 16:55:09 WARN DAGScheduler: Broadcasting large task binary with size 1640.5 KiB
24/02/23 16:55:43 WARN TaskSetManager: Stage 745 contains a task of very large size (3093 KiB). The maximum recommended task size is 1000 KiB.
24/02/23 16:55:57 WARN TaskSetManager: Stage 746 contains a task of very large size (3242 KiB). The maximum recommended task size is 1000 KiB.
24/02/23 16:56:13 WARN TaskSetManager: Stage 747 contains a task of very large size (4140 KiB). The maximum recommended task size is 1000 KiB.
24/02/23 16:56:32 WARN TaskSetManager: Stage 748 contains a task of very large size (4140 KiB). The maximum recommended task size is 1000 KiB.
[Stage 757:=====>                                                  (1 + 9) / 10]

+------+----+----+----+-----+-------+----------+----------------------------+----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------+------------------------------+---------------------------+------------------------------------+---------------------------------+-------------------------------------+----------------------------------+-----------------+----------+-----------------+----------+------------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-------------------+--------------------+-------------------+--------------+--------------------------+--------------+--------------+--------------+--------------+---------------+-----------------------+-------------------------

                                                                                

In [150]:
result_pd_df = result_df.toPandas()
result_pd_df.to_excel('demo_transform.xlsx', index=False)

24/02/23 16:56:39 WARN TaskSetManager: Stage 763 contains a task of very large size (4140 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [151]:
for field in result_df.schema.fields:
    print(f"{field.name}: {str(field.dataType)}")

Symbol: StringType()
Open: LongType()
High: LongType()
Low: LongType()
Close: LongType()
Volume: LongType()
Date: DateType()
Price Change To 1 Day(s) Ago: DoubleType()
Price Change To 2 Day(s) Ago: DoubleType()
Price Change To 3 Day(s) Ago: DoubleType()
Volume Change To 1 Day(s) Ago: DoubleType()
Volume Change To 2 Day(s) Ago: DoubleType()
Volume Change To 3 Day(s) Ago: DoubleType()
Price Difference: LongType()
Median Volume 3 day(s) periods: DoubleType()
Std Volume 3 day(s) periods: DoubleType()
Median Price Change 3 day(s) periods: DoubleType()
Std Price Change 3 day(s) periods: DoubleType()
Median Volume Change 3 day(s) periods: DoubleType()
Std Volume Change 3 day(s) periods: DoubleType()
SMA 3 Days: DoubleType()
SMA 5 Days: DoubleType()
WMA 3 Days: DoubleType()
WMA 5 Days: DoubleType()
EMA 3 Days: DoubleType()
EMA 5 Days: DoubleType()
Bool SMA 3 Days: DoubleType()
Bool SMA 5 Days: DoubleType()
Bool WMA 3 Days: DoubleType()
Bool WMA 5 Days: DoubleType()
Bool EMA 3 Days: DoubleType(

# Write to Postgres

In [152]:
result_df = result_df.toDF(*[col_name.lower().replace(' ', '_') for col_name in result_df.columns])
result_df.show(10)

24/02/23 16:59:41 WARN TaskSetManager: Stage 778 contains a task of very large size (4140 KiB). The maximum recommended task size is 1000 KiB.
[Stage 787:>                                                      (0 + 10) / 10]

+------+----+----+----+-----+-------+----------+----------------------------+----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+----------------+------------------------------+---------------------------+------------------------------------+---------------------------------+-------------------------------------+----------------------------------+-----------------+----------+-----------------+----------+------------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------------+---------+------------------+------------------+-----------------+-----------------+-----------------+-------------------+--------------------+------------------+--------------+--------------------------+--------------+--------------+--------------+--------------+---------------+-----------------------+--------------------------+----------

                                                                                

In [153]:
import psycopg2
from pyspark.sql.types import StringType, IntegerType, DoubleType, LongType, DateType

db_params = {
    "host": "localhost",
    "port": "5432",
    "database": "vnstockdw",
    "user": "postgres",
    "password": "postgres"
}

table_name = "stock_history"
table_schema = "public"

def spark_to_postgres_type(spark_type):
    if spark_type == DoubleType() or spark_type == LongType():
        return "DOUBLE PRECISION"
    elif spark_type == IntegerType():
        return "INT"
    elif spark_type == DateType():
        return "DATE"
    else:
        return "VARCHAR(255)"

additional_columns = {
    "created_at": f"TIMESTAMP DEFAULT current_timestamp",
    "updated_at": f"TIMESTAMP DEFAULT current_timestamp",
    "status": "BOOLEAN DEFAULT True"
}

column_definitions = {field.name: spark_to_postgres_type(field.dataType) for field in result_df.schema.fields}

column_definitions = {**additional_columns, **column_definitions}

conn = psycopg2.connect(**db_params)
cur = conn.cursor()
cur.execute(f"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name}' AND table_schema = '{table_schema}')")
table_exists = cur.fetchone()[0]

if not table_exists:
    columns_str = ', '.join([f""""{col}" {col_type}""" for col, col_type in column_definitions.items()])
    create_table_query = f"CREATE TABLE {table_schema}.{table_name} ({columns_str})"
    print(create_table_query)
    cur.execute(create_table_query)

# for col, col_type in column_definitions.items():
#     cur.execute(f"SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = '{table_name}' AND table_schema = '{table_schema}' AND column_name = '{col}')")
#     column_exists = cur.fetchone()[0]
#     print(column_exists)
#     if not column_exists:
#         alter_table_query = f"ALTER TABLE {table_schema}.{table_name} ADD COLUMN {col} {col_type}"
#         cur.execute(alter_table_query)

conn.commit()
cur.close()
conn.close()


In [154]:
result_df.write.format("jdbc").mode("overwrite") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", f"jdbc:postgresql://{db_params['host']}:{db_params['port']}/{db_params['database']}") \
    .option("dbtable", f"{table_schema}.{table_name}") \
    .option("user", db_params["user"]).option("password", db_params["password"]).mode("append").save()

24/02/23 16:59:48 WARN TaskSetManager: Stage 793 contains a task of very large size (4140 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [155]:
spark.stop()