[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://githubtocolab.com/jkanclerz/dbs/blob/main/11--spark.ipynb)

## Instalacja Spark 

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz -O spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [None]:
pip install pyspark

In [None]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
        .master("local")
        .appName("Spark playground")
        .getOrCreate())

In [None]:
spark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from pyspark.sql.types import *
schema = StructType([
    StructField('Datetime', TimestampType()),
    StructField('Open', DecimalType(), True),
    StructField('High', DecimalType(), True),
    StructField('Low', DecimalType(), True),
    StructField('Close', DecimalType(), True),
    StructField('Volume', IntegerType(), True),
    StructField('Dividends', DoubleType(), True),
    StructField('Stock Splits', DoubleType(), True)
])
df = (spark.read
      .option("header", True)
      .option("recursiveFileLookup", True)
      .option("delimiter", ',')
      .schema(schema)
      .csv('/content/drive/MyDrive/fo'))

In [None]:
df.printSchema()

In [None]:
df.show(2)

In [None]:
df.createOrReplaceTempView('stocks')

In [None]:
stocks = spark.sql('''
  select 
    regexp_replace(input_file_name(), '.*([A-Z]{3}\.WA).*', '$1') as ticker,
    from_utc_timestamp(DateTime, 'Europe/Warsaw') datetime,
    to_date(DateTime) as date,
    Open open,
    High high,
    Close close,
    Volume volume
    from stocks limit 
''')

In [None]:
(stocks.write
  .option('overwrite', True)
  .partitionBy(['ticker', 'date']).parquet("/content/drive/MyDrive/fo/stocks"))

In [None]:
loadedDf = spark.read.parquet("/content/drive/MyDrive/fo/stocks")

In [None]:
loadedDf.show(10)


In [None]:
loadedDf.printSchema()

In [None]:
pip install pandas

In [None]:
loadedDf.createOrReplaceTempView('stocks')

In [None]:
spark.sql('''
  with ranked as (
  select
    ticker,
    datetime,
    date, 
    close, 
    rank() over (partition by date order by datetime DESC) as rank
  from stocks
  where ticker = "PKN.WA"
  order by datetime DESC)
  
  select * from ranked where rank = 1
  ''').show()

In [None]:
asPd = spark.sql('''
  with ranked as (
  select
    ticker,
    datetime,
    date, 
    close, 
    rank() over (partition by date order by datetime DESC) as rank
  from stocks
  where ticker = "PKN.WA"
  order by datetime DESC)
  
  select date, cast (close as float) from ranked where rank = 1
  ''').toPandas()

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [16, 5]
asPd.set_index('date').plot.line()

In [None]:
toModelAll = spark.sql('''
  select
    ticker,
    from_utc_timestamp(datetime, 'Europe/Warsaw') datetime,
    date, 
    close,
    lag(close, 1) over (order by datetime) lag_close_1,
    lag(close, 2) over (order by datetime) lag_close_2,
    lag(close, 3) over (order by datetime) lag_close_3,
    lag(close, 4) over (order by datetime) lag_close_4,
    lag(close, 5) over (order by datetime) lag_close_5,
    volume,
    lag(volume, 1) over (order by datetime) lag_volume_1,
    lag(volume, 2) over (order by datetime) lag_volume_2,
    lag(volume, 3) over (order by datetime) lag_volume_3,
    lag(volume, 4) over (order by datetime) lag_volume_4,
    lag(volume, 5) over (order by datetime) lag_volume_5
  from stocks
  where ticker = "PKN.WA" and date_format(from_utc_timestamp(datetime, 'Europe/Warsaw'), 'HH:mm:ss') >= "09:00:00" and date_format(from_utc_timestamp(datetime, 'Europe/Warsaw'), 'HH:mm:ss') < "17:00:00"
  order by datetime DESC
''').drop()


In [None]:
toModelAll.show()

# Features

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
va = VectorAssembler(inputCols = ['lag_close_1', 'lag_close_2','lag_close_3','lag_close_4','lag_close_5','lag_volume_1','lag_volume_2','lag_volume_3','lag_volume_4','lag_volume_5'], outputCol = 'features')

In [None]:
toModelAll.printSchema()

In [None]:
toModelAll = toModelAll.dropna()

In [None]:
reg_df = va.transform(toModelAll)

In [None]:
reg_df.select('features', 'close').show(3)

https://spark.apache.org/docs/latest/ml-classification-regression.html https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression



In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
reg_df.select('features', 'close').printSchema()

In [None]:
(train_df, test_df) = reg_df.randomSplit([0.7, 0.3])


In [None]:
train_df.show()

In [None]:
 
lr = LinearRegression(featuresCol='features',labelCol='close')
 
lr_model = lr.fit(train_df)

In [None]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))


$$ \hat{Y} = X_{1}{\beta}_{1} + ... + X_{n}{\beta}_{n} + {\beta}_0 $$
$$ \hat{Y} = 0.6196 * X_{1} + ... + X_{n}{\beta}_{n} + 0.293 $$

In [None]:
trainingSummary = lr_model.summary

In [None]:
print("R2: %f" % trainingSummary.r2)

In [None]:
lr_predictions = lr_model.transform(test_df)

In [None]:
lr_predictions.select("prediction","close","features").show(100)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# R2:
lr_evaluator = RegressionEvaluator(predictionCol="prediction",
                                   labelCol="close",
                                   metricName="r2")
 
print("R2 on test data = %g" % lr_evaluator.evaluate(lr_predictions))

In [None]:
lr_model.write().overwrite().save("/content/drive/MyDrive/fo/models/predict/PKN.WA")