### Set Parameter

In [1]:
project_id = "forex-ai-413616"
email = "forex-ai-bucket-admin@forex-ai-413616.iam.gserviceaccount.com"
credentials_key = "credentials-admin.p12"

### Startup

In [2]:
%load_ext jupyter_black
%load_ext autoreload
%autoreload 2

import os
import sys

sys.path.insert(0, os.path.abspath("../"))
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col

from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    FloatType,
    TimestampType,
    DoubleType,
    LongType,
)
from utils.pyspark_gcs import get_spark_gcs_session


spark = get_spark_gcs_session(
    project=project_id, email=email, service_account_keyfile_path=credentials_key
)

24/03/16 00:48:35 WARN Utils: Your hostname, Kittawees-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.23 instead (on interface en0)
24/03/16 00:48:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/03/16 00:48:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Collect & Prep Data

In [3]:
custom_schema = StructType(
    [
        StructField("symbol", StringType(), False),
        StructField("datetime", TimestampType(), False),
    ]
)

### Get Data

In [4]:
df = spark.read.json("gs://forex-ai/raws/time_series_eur_usd", schema=custom_schema)
df = df.select("symbol", "datetime").distinct().sort("datetime")
print(df.count())
print(df.printSchema())
print(df.show())

                                                                                

1431603
root
 |-- symbol: string (nullable = true)
 |-- datetime: timestamp (nullable = true)

None


                                                                                

+-------+-------------------+
| symbol|           datetime|
+-------+-------------------+
|EUR/USD|2020-04-07 06:54:00|
|EUR/USD|2020-04-07 06:55:00|
|EUR/USD|2020-04-07 06:56:00|
|EUR/USD|2020-04-07 06:57:00|
|EUR/USD|2020-04-07 06:58:00|
|EUR/USD|2020-04-07 06:59:00|
|EUR/USD|2020-04-07 07:00:00|
|EUR/USD|2020-04-07 07:01:00|
|EUR/USD|2020-04-07 07:02:00|
|EUR/USD|2020-04-07 07:03:00|
|EUR/USD|2020-04-07 07:04:00|
|EUR/USD|2020-04-07 07:05:00|
|EUR/USD|2020-04-07 07:06:00|
|EUR/USD|2020-04-07 07:07:00|
|EUR/USD|2020-04-07 07:08:00|
|EUR/USD|2020-04-07 07:09:00|
|EUR/USD|2020-04-07 07:10:00|
|EUR/USD|2020-04-07 07:11:00|
|EUR/USD|2020-04-07 07:12:00|
|EUR/USD|2020-04-07 07:13:00|
+-------+-------------------+
only showing top 20 rows

None


In [5]:
w = Window.partitionBy("symbol").orderBy("datetime")
next_date = F.lead(col("datetime"), 1).over(w)

# datediff
df_date_diff = (
    df.drop(col("open"), col("close"), col("high"), col("low"))
    .sort("datetime")
    .withColumn("next_date", next_date)
    .withColumn("next_date_timestamp", F.unix_timestamp(next_date))
    .withColumn("datetime_timestamp", F.unix_timestamp(col("datetime")))
    .withColumn(
        "date_diff",
        F.abs(col("next_date_timestamp") - col("datetime_timestamp")),
    )
    .drop("next_date_timestamp", "datetime_timestamp")
)

### Explore

In [6]:
df_date_diff = df_date_diff.sort(col("datetime").desc())
print("------------------------------")
print(
    (df_date_diff.filter(col("date_diff") > 60).filter(col("date_diff") < 1000).show())
)
print(
    (df_date_diff.filter(col("date_diff") > 60).filter(col("date_diff") < 1000).count())
)
print("------------------------------")
print(df_date_diff.filter(col("date_diff") >= 1000).show())
print(df_date_diff.filter(col("date_diff") >= 1000).count())

------------------------------


                                                                                

+-------+-------------------+-------------------+---------+
| symbol|           datetime|          next_date|date_diff|
+-------+-------------------+-------------------+---------+
|EUR/USD|2024-02-14 22:01:00|2024-02-14 22:04:00|      180|
|EUR/USD|2024-02-13 23:42:00|2024-02-13 23:44:00|      120|
|EUR/USD|2024-02-13 04:21:00|2024-02-13 04:23:00|      120|
|EUR/USD|2024-02-12 23:06:00|2024-02-12 23:09:00|      180|
|EUR/USD|2024-02-12 22:57:00|2024-02-12 23:00:00|      180|
|EUR/USD|2024-02-12 22:03:00|2024-02-12 22:05:00|      120|
|EUR/USD|2024-02-11 22:27:00|2024-02-11 22:30:00|      180|
|EUR/USD|2024-02-11 22:18:00|2024-02-11 22:20:00|      120|
|EUR/USD|2024-02-11 22:07:00|2024-02-11 22:09:00|      120|
|EUR/USD|2024-02-11 22:01:00|2024-02-11 22:04:00|      180|
|EUR/USD|2024-02-09 05:30:00|2024-02-09 05:32:00|      120|
|EUR/USD|2024-02-09 04:16:00|2024-02-09 04:18:00|      120|
|EUR/USD|2024-02-08 23:08:00|2024-02-08 23:10:00|      120|
|EUR/USD|2024-02-08 22:19:00|2024-02-08 

                                                                                

5573
------------------------------


                                                                                

+-------+-------------------+-------------------+---------+
| symbol|           datetime|          next_date|date_diff|
+-------+-------------------+-------------------+---------+
|EUR/USD|2024-02-09 21:59:00|2024-02-11 22:00:00|   172860|
|EUR/USD|2024-02-04 20:17:00|2024-02-04 22:00:00|     6180|
|EUR/USD|2024-02-04 20:00:00|2024-02-04 20:17:00|     1020|
|EUR/USD|2024-02-02 19:59:00|2024-02-04 20:00:00|   172860|
|EUR/USD|2024-01-28 21:26:00|2024-01-28 21:55:00|     1740|
|EUR/USD|2024-01-28 20:33:00|2024-01-28 21:11:00|     2280|
|EUR/USD|2024-01-26 19:59:00|2024-01-28 20:33:00|   174840|
|EUR/USD|2024-01-19 19:59:00|2024-01-21 21:49:00|   179400|
|EUR/USD|2024-01-12 19:59:00|2024-01-14 22:00:00|   180060|
|EUR/USD|2024-01-05 19:59:00|2024-01-07 22:00:00|   180060|
|EUR/USD|2024-01-01 20:02:00|2024-01-01 20:28:00|     1560|
|EUR/USD|2024-01-01 17:37:00|2024-01-01 18:10:00|     1980|
|EUR/USD|2023-12-29 19:59:00|2024-01-01 17:31:00|   250320|
|EUR/USD|2023-12-25 18:48:00|2023-12-25 

                                                                                

404


In [7]:
(df_date_diff.filter(col("date_diff") < 60).show())

                                                                                

+------+--------+---------+---------+
|symbol|datetime|next_date|date_diff|
+------+--------+---------+---------+
+------+--------+---------+---------+

