Przygotowanie danych historycznych.
Główne funkcjonalności: \
1) czytywanie danych giełdowych z wielu plików CSV. \
2) Dodawanie informacji o tickerze (symbolu akcji). \
3) Filtrowanie danych na podstawie liczby rekordów i zakresu dat. \
4) Obliczanie zwrotu z inwestycji w oknach czasowych.

In [97]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime
import os
from pyspark.sql.functions import to_date
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [98]:
spark = SparkSession.builder \
    .appName("Stocks Data Analysis") \
    .getOrCreate()

In [99]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [100]:
# Schemat dla plików CSV
schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Open", DoubleType(), True),
    StructField("Close", DoubleType(), True)
])

In [101]:
def process_file(file_path, n_steps):
    df = spark.read.csv(file_path, schema=schema, header=False)

    df = df.withColumn("Symbol", F.input_file_name()) \
        .withColumn("Symbol", F.element_at(F.split(F.col("Symbol"), "/"), -1)) \
        .withColumn("Symbol", F.element_at(F.split(F.col("Symbol"), "\\."), 1))

    # Zliczana jest liczba wierszy dla każdego Symbol.
    # Pliki z mniej niż 5 latami danych (minimum 260 dni giełdowych w roku + margines 10) są odrzucane.
    df = df.withColumn('count', F.count("Symbol").over(Window.partitionBy('Symbol'))) \
        .filter(F.col('count') > 260 * 5 + 10)

    df = df.withColumn("Date", F.to_date(F.to_timestamp(F.col('Date'), 'yy-MM-dd')))
    df = df.filter(F.col("Date") >= datetime(2013, 1, 2)) \
           .filter(F.col('Date') <= datetime(2023, 12, 29))

    # Funkcja do obliczania zwrotu (rolling window)
    window = Window.partitionBy("Symbol").orderBy("Date").rowsBetween(1 - n_steps, 0)
    df = df.withColumn("return", \
                       (F.col("Close") - F.first("Close").over(window)) / F.first("Close").over(window))

    return df.orderBy("Symbol", "Date")

In [102]:
# Obliczany jest zwrot z inwestycji (return) jako procentowa zmiana wartości w oknie czasowym o długości n_steps dni.
# Używane są okna czasowe (Window), które grupują dane według Symbol i sortują według daty.

n_steps = 10
input_dir = '/content/drive/My Drive/BigDATA'

In [103]:
#Każdy plik jest przetwarzany funkcją process_file.
all_files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if f.endswith('.csv')]
processed_dfs = [process_file(file_path, n_steps) for file_path in all_files]

Liczba kroków jest równa 10 i jest to okno dwóch tygodni (giełda nie jest otwarta w weekedy). Obliczanie zwrotu z inwestycji dla każdego symbolu akcji w oknie czasowym o długości n_steps, a wynik jest dodawany do final_df. Teraz dane mogą być przetwarzane dalej lub eksportowane. Exportujemy dane do PandasDataFrame

In [91]:
import pandas as pd
from functools import reduce

In [96]:
df = df.withColumn("Date", to_date(df["Date"], "yyyy-MM-dd"))

In [106]:
pandas_df = final_df.toPandas()

pandas_df = pandas_df.assign(stock_returns=pandas_df['Close'])

print(pandas_df.head())

         Date         Open        Close                   Symbol  count  \
0  2013-01-02  4648.899902  4705.899902  ASX_200_historical_data   2783   
1  2013-01-03  4705.899902  4740.700195  ASX_200_historical_data   2783   
2  2013-01-04  4740.700195  4723.799805  ASX_200_historical_data   2783   
3  2013-01-07  4726.799805  4717.299805  ASX_200_historical_data   2783   
4  2013-01-08  4721.399902  4690.299805  ASX_200_historical_data   2783   

     return  rolling_return  stock_returns  
0  0.000000        0.000000    4705.899902  
1  0.007395        0.007395    4740.700195  
2  0.003804        0.003804    4723.799805  
3  0.002422        0.002422    4717.299805  
4 -0.003315       -0.003315    4690.299805  


Metoda najmniejszych kwadratów z moduły scikit-learn: Wcześniej usuwania Na

In [120]:
from sklearn.linear_model import LinearRegression
import numpy as np
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

In [121]:
pandas_df.replace([np.inf, -np.inf], np.nan, inplace=True)
feature_columns = list(pandas_df.columns[-6:])
pandas_df = pandas_df.dropna(subset=feature_columns + ['stock_returns'])

In [128]:
# Define the feature columns as per your original code
feature_columns =  ['Date', 'Close', 'Symbol', 'count', 'return', 'rolling_return', 'stock_return']

# Define the UDF to compute the OLS coefficients
@pandas_udf(StructType(
    [StructField("Symbol", StringType(), True)] +
    [StructField(f"coef_{feature}", DoubleType(), True) for feature in feature_columns]
))
def find_ols_coef_udf(pdf: pd.DataFrame) -> pd.DataFrame:
    # The target variable
    y = pdf['stock_returns']

    # Feature variables
    X = pdf[feature_columns]

    # Initialize the linear regression model
    regr = LinearRegression()

    # Fit the model
    regr.fit(X, y)

    # Extract coefficients
    coef_list = [pdf['Symbol'].iloc[0]] + list(regr.coef_)

    # Return as a pandas DataFrame
    return pd.DataFrame([coef_list], columns=['Symbol'] + [f"coef_{feature}" for feature in feature_columns])

# Apply the UDF to each group
coef_per_stock_df = df.groupby('Symbol').apply(find_ols_coef_udf)

# Show the result
coef_per_stock_df.show()


ValueError: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP.

In [129]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pandas as pd
from sklearn.linear_model import LinearRegression

# Define the feature columns (adjust this as per your dataset)

# Define the UDF to compute the OLS coefficients
@pandas_udf(StructType(
    [StructField("Symbol", StringType(), True)] +
    [StructField(f"coef_{feature}", DoubleType(), True) for feature in feature_columns]
), functionType='GROUPED_MAP')
def find_ols_coef_udf(pdf: pd.DataFrame) -> pd.DataFrame:
    # The target variable
    y = pdf['stock_returns']

    # Feature variables
    X = pdf[feature_columns]

    # Initialize the linear regression model
    regr = LinearRegression()

    # Fit the model
    regr.fit(X, y)

    # Extract coefficients
    coef_list = [pdf['Symbol'].iloc[0]] + list(regr.coef_)

    # Return as a pandas DataFrame
    return pd.DataFrame([coef_list], columns=['Symbol'] + [f"coef_{feature}" for feature in feature_columns])

# Use groupBy and transform (instead of apply)
coef_per_stock_df = df.groupBy('Symbol').apply(find_ols_coef_udf)

# Show the result
coef_per_stock_df.show()


PySparkTypeError: [INVALID_PANDAS_UDF_TYPE] `functionType` should be one the values from PandasUDFType, got GROUPED_MAP