In [19]:
%load_ext jupyter_black

In [125]:
import sys
import os

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col, lit, when
from pyspark.sql.window import Window
from pyspark.sql.types import TimestampType, DateType, FloatType, StringType

spark = (
    SparkSession.builder.config("spark.executor.memory", "16g")
    .config("spark.driver.memory", "16g")
    .getOrCreate()
)
spark.conf.set("spark.sql.session.timeZone", "UTC")

### Import & Prep Data

In [89]:
df = (
    spark.read.option("recursiveFileLookup", "true")
    .option("header", "true")
    .json("resources/raws/time_series")
)

In [90]:
format = "yyyy-MM-dd HH:mm:ss"
df = (
    df.withColumn("symbol", col("symbol").cast(StringType()))
    .withColumn(
        "datetime", F.unix_timestamp(col("datetime"), format).cast(TimestampType())
    )
    .withColumn("open", col("open").cast(FloatType()))
    .withColumn("close", col("close").cast(FloatType()))
    .withColumn("high", col("high").cast(FloatType()))
    .withColumn("low", col("low").cast(FloatType()))
)

### Group By Symbol

In [129]:
symbol_list = df.groupBy(col("symbol")).count().collect()
df_dict = {}
for i in symbol_list:
    symbol = i["symbol"]
    symbol_name = symbol.replace("/", "_")
    print(symbol_name)
    df_dict[symbol_name] = df.filter(col("symbol") == symbol)
    # .sort(
    #     "datetime", ascending=False
    # )
    # print(f"{symbol_name} : {df_dict[symbol_name].count()}")
    # print(df_dict[symbol_name].show())

USD_JPY
XAU_USD
GBP_USD
EUR_USD


In [130]:
# print(df.groupBy(col("symbol")).count().show())
# print(df.count())
# print(df.printSchema())
# print(df.show())

### Group By Timeframe

In [131]:
timeframe = [
    # '1 minutes',
    "5 minutes",
    "15 minutes",
    "30 minutes",
    "1 hour",
    "4 hours",
    "1 day",
    # "1week",
    # "1month",
]

In [144]:
seconds = 60 * 5
(
    df_dict["USD_JPY"]
    # .withColumn("group_window", seconds_window)
    # .groupBy(F.window(col("datetime"), "4 hour"))
    .groupBy(F.window(col("datetime"), "30 minute"))
    .agg(
        col("window.start").alias("datetime"),
        # col("window.end").alias("datetime_end"),
        F.last("open").alias("open"),
        F.first("close").alias("close"),
        F.max("high").alias("high"),
        F.min("low").alias("low"),
    )
    .sort("datetime", ascending=False)
    .drop("window")
    .show()
)

+-------------------+-------+-------+-------+-------+
|           datetime|   open|  close|   high|    low|
+-------------------+-------+-------+-------+-------+
|2023-11-21 19:30:00|148.285|148.255|  148.3| 148.25|
|2023-11-21 19:00:00|148.305|148.285| 148.34| 148.24|
|2023-11-21 18:30:00| 148.28|148.295| 148.36| 148.22|
|2023-11-21 18:00:00| 148.25|148.275| 148.34| 148.05|
|2023-11-21 17:30:00|148.265|148.235| 148.31| 148.12|
|2023-11-21 17:00:00| 148.13| 148.27| 148.28|148.085|
|2023-11-21 16:30:00|148.095| 148.15| 148.18|148.085|
|2023-11-21 16:00:00| 148.26| 148.09| 148.28| 148.02|
|2023-11-21 15:30:00| 148.31| 148.25|148.315| 148.25|
|2023-11-21 15:00:00|148.395| 148.31|148.405|148.285|
|2023-11-21 14:30:00|148.375|  148.4| 148.41| 148.34|
|2023-11-21 14:00:00| 148.38| 148.37|148.425|148.335|
|2023-11-21 13:30:00|148.305|  148.4| 148.44| 148.28|
|2023-11-21 13:00:00| 148.32|148.305|148.385|  148.3|
|2023-11-21 12:30:00|148.345|148.325|148.455| 148.31|
|2023-11-21 12:00:00| 148.36

In [94]:
df_dict["USD_JPY"].show()

+-------+-------------------+-------+-------+-------+-------+
|  close|           datetime|   high|    low|   open| symbol|
+-------+-------------------+-------+-------+-------+-------+
|148.255|2023-11-22 02:38:00|148.275| 148.25|148.265|USD/JPY|
|148.275|2023-11-22 02:37:00| 148.28| 148.25| 148.26|USD/JPY|
|148.275|2023-11-22 02:36:00|148.285| 148.26| 148.28|USD/JPY|
|148.275|2023-11-22 02:35:00|148.275|148.275|148.275|USD/JPY|
|148.275|2023-11-22 02:34:00|148.285| 148.27|148.285|USD/JPY|
|148.285|2023-11-22 02:33:00|148.285| 148.26| 148.28|USD/JPY|
| 148.28|2023-11-22 02:32:00|148.295| 148.26| 148.29|USD/JPY|
|148.285|2023-11-22 02:31:00|148.295| 148.28|148.295|USD/JPY|
|148.295|2023-11-22 02:30:00|  148.3|148.285|148.285|USD/JPY|
|148.285|2023-11-22 02:29:00|148.305| 148.28|148.305|USD/JPY|
|148.305|2023-11-22 02:28:00|148.305|148.305|148.305|USD/JPY|
| 148.31|2023-11-22 02:27:00| 148.31| 148.31| 148.31|USD/JPY|
|148.305|2023-11-22 02:26:00|148.305|148.305|148.305|USD/JPY|
|148.305