In [None]:
pip install google-cloud-bigquery

In [32]:
from pyspark.sql.functions import col, min, max, lag, lead, mean, stddev
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

In [3]:
from pyspark.sql import SparkSession

service_account_path = "creds.json"

project_id = "bigdata-421623"

spark = SparkSession.builder \
    .appName("BigQuery Integration") \
    .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.21.0") \
    .config("credentialsFile", service_account_path) \
    .config("parentProject", project_id) \
    .getOrCreate()



:: loading settings :: url = jar:file:/opt/conda/envs/bigdata/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
com.google.cloud.spark#spark-bigquery-with-dependencies_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c062adbf-a30e-4105-85f5-5a9d6f80f4c1;1.0
	confs: [default]
	found com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.21.0 in central
:: resolution report :: resolve 306ms :: artifacts dl 11ms
	:: modules in use:
	com.google.cloud.spark#spark-bigquery-with-dependencies_2.12;0.21.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	----------------------------------------------------------------

In [4]:
table = "bigdata-421623.ForEx_Big_Data.Hourly_Forex"

df = spark.read \
    .format("bigquery") \
    .option("table", table) \
    .load()

In [5]:
df.printSchema()

root
 |-- closing_price: double (nullable = true)
 |-- highest_price: double (nullable = true)
 |-- lowest_price: double (nullable = true)
 |-- transactions: long (nullable = true)
 |-- opening_price: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- volume: long (nullable = true)
 |-- avg_volume_weight: double (nullable = true)
 |-- ticker: string (nullable = true)
 |-- created_at: timestamp (nullable = true)



In [6]:
tickers = df.select("ticker").distinct().rdd.flatMap(lambda x: x).collect()

                                                                                

In [7]:
print(tickers)

['C:JPYUSD', 'C:USDEUR', 'C:EURUSD', 'C:GBPEUR', 'C:USDJPY', 'C:GBPUSD', 'C:EURGBP', 'C:JPYEUR', 'C:EURJPY', 'C:JPYGBP', 'C:GBPJPY', 'C:USDGBP']


In [8]:
df.show(5)

                                                                                

+-------------+-------------+------------+------------+-------------+-------------------+------+-----------------+--------+--------------------+
|closing_price|highest_price|lowest_price|transactions|opening_price|               time|volume|avg_volume_weight|  ticker|          created_at|
+-------------+-------------+------------+------------+-------------+-------------------+------+-----------------+--------+--------------------+
|      1.05685|      1.05772|      1.0564|        6162|      1.05765|2023-02-28 19:00:00|  6162|           1.0568|C:EURUSD|2024-04-28 12:04:...|
|      1.05841|       1.0586|      1.0566|        7986|      1.05684|2023-02-28 20:00:00|  7986|           1.0576|C:EURUSD|2024-04-28 12:04:...|
|      1.05831|      1.05866|       1.058|        4862|      1.05848|2023-02-28 21:00:00|  4862|           1.0584|C:EURUSD|2024-04-28 12:04:...|
|      1.05929|       1.0596|      1.0581|        5067|       1.0583|2023-02-28 22:00:00|  5067|            1.059|C:EURUSD|2024-04

In [9]:
timestamp_extremes = df.select(
    min("time").alias("Earliest Timestamp"),
    max("time").alias("Latest Timestamp")
)

In [10]:
timestamp_extremes.show()

[Stage 4:>                                                          (0 + 1) / 1]

+-------------------+-------------------+
| Earliest Timestamp|   Latest Timestamp|
+-------------------+-------------------+
|2023-02-28 19:00:00|2024-04-26 17:00:00|
+-------------------+-------------------+



                                                                                

# Working on the Top 3 Tickers for Y2024

In [11]:
USDEUR_df = df.filter(col("ticker") == 'C:USDEUR')
USDJPY_df = df.filter(col("ticker") == 'C:USDJPY')
USDGBP_df = df.filter(col("ticker") == 'C:USDGBP')

In [12]:
features = ["closing_price", "highest_price", "lowest_price", "opening_price", "volume", "avg_volume_weight"]

In [13]:
assembler = VectorAssembler(inputCols=features, outputCol="features_vector")
scaler = StandardScaler(inputCol="features_vector", outputCol="scaledFeatures", withMean=True, withStd=True)

In [14]:
pipeline = Pipeline(stages=[assembler, scaler])

## Ticker USDGBP

In [15]:
USDGBP_df.show(10)

+-------------+-------------+------------+------------+-------------+-------------------+------+-----------------+--------+--------------------+
|closing_price|highest_price|lowest_price|transactions|opening_price|               time|volume|avg_volume_weight|  ticker|          created_at|
+-------------+-------------+------------+------------+-------------+-------------------+------+-----------------+--------+--------------------+
|      0.83218|      0.83231|   0.8309098|        6601|        0.831|2023-02-28 19:00:00|  6601|           0.8317|C:USDGBP|2024-04-28 12:12:...|
|       0.8309|      0.83222|      0.8306|        9253|      0.83215|2023-02-28 20:00:00|  9253|           0.8314|C:USDGBP|2024-04-28 12:12:...|
|      0.83151|       0.8322|     0.83076|        3348|      0.83096|2023-02-28 21:00:00|  3348|           0.8315|C:USDGBP|2024-04-28 12:12:...|
|      0.83057|      0.83166|      0.8303|        3407|      0.83152|2023-02-28 22:00:00|  3407|           0.8309|C:USDGBP|2024-04

In [16]:
USDGBP_df = pipeline.fit(USDGBP_df).transform(USDGBP_df)

                                                                                

In [17]:
USDGBP_df.printSchema()

root
 |-- closing_price: double (nullable = true)
 |-- highest_price: double (nullable = true)
 |-- lowest_price: double (nullable = true)
 |-- transactions: long (nullable = true)
 |-- opening_price: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- volume: long (nullable = true)
 |-- avg_volume_weight: double (nullable = true)
 |-- ticker: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- features_vector: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)



In [None]:
USDGBP_df.select("scaledFeatures").show(1)

In [None]:
scaledFeatures = USDGBP_df.select("scaledFeatures").collect()
for row in scaledFeatures:
    print(row.scaledFeatures)

In [20]:
windowSpec = Window.orderBy("time")

In [21]:
n_steps_in = 24 
for i in range(1, n_steps_in + 1):
    USDGBP_df = USDGBP_df.withColumn(f"scaledFeatures_t-{i}", lag(col("scaledFeatures"), i).over(windowSpec))

In [22]:
USDGBP_df.printSchema()

root
 |-- closing_price: double (nullable = true)
 |-- highest_price: double (nullable = true)
 |-- lowest_price: double (nullable = true)
 |-- transactions: long (nullable = true)
 |-- opening_price: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- volume: long (nullable = true)
 |-- avg_volume_weight: double (nullable = true)
 |-- ticker: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- features_vector: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)
 |-- scaledFeatures_t-1: vector (nullable = true)
 |-- scaledFeatures_t-2: vector (nullable = true)
 |-- scaledFeatures_t-3: vector (nullable = true)
 |-- scaledFeatures_t-4: vector (nullable = true)
 |-- scaledFeatures_t-5: vector (nullable = true)
 |-- scaledFeatures_t-6: vector (nullable = true)
 |-- scaledFeatures_t-7: vector (nullable = true)
 |-- scaledFeatures_t-8: vector (nullable = true)
 |-- scaledFeatures_t-9: vector (nullable = true)
 |-- scaledFeatures_t-10

In [25]:
n_steps_out = 3

for i in range(1, n_steps_out + 1):
    USDGBP_df = USDGBP_df.withColumn(f"price_t+{i}", lead(col("closing_price"), i).over(windowSpec))

In [26]:
USDGBP_df.printSchema()

root
 |-- closing_price: double (nullable = true)
 |-- highest_price: double (nullable = true)
 |-- lowest_price: double (nullable = true)
 |-- transactions: long (nullable = true)
 |-- opening_price: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- volume: long (nullable = true)
 |-- avg_volume_weight: double (nullable = true)
 |-- ticker: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- features_vector: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)
 |-- scaledFeatures_t-1: vector (nullable = true)
 |-- scaledFeatures_t-2: vector (nullable = true)
 |-- scaledFeatures_t-3: vector (nullable = true)
 |-- scaledFeatures_t-4: vector (nullable = true)
 |-- scaledFeatures_t-5: vector (nullable = true)
 |-- scaledFeatures_t-6: vector (nullable = true)
 |-- scaledFeatures_t-7: vector (nullable = true)
 |-- scaledFeatures_t-8: vector (nullable = true)
 |-- scaledFeatures_t-9: vector (nullable = true)
 |-- scaledFeatures_t-10

In [27]:
USDGBP_df.count()

8315

In [28]:
for i in range(1, n_steps_out + 1):
    USDGBP_df = USDGBP_df.filter(col(f"price_t+{i}").isNotNull())

In [None]:
USDGBP_df.count()

In [30]:
for i in range(1, n_steps_in + 1):
    USDGBP_df = USDGBP_df.filter(col(f"scaledFeatures_t-{i}").isNotNull())

In [None]:
USDGBP_df.count()

In [None]:
stats_price_t1 = USDGBP_df.select(mean("price_t+1").alias("mean"), stddev("price_t+1").alias("stddev")).collect()[0]
stats_price_t2 = USDGBP_df.select(mean("price_t+2").alias("mean"), stddev("price_t+2").alias("stddev")).collect()[0]
stats_price_t3 = USDGBP_df.select(mean("price_t+3").alias("mean"), stddev("price_t+3").alias("stddev")).collect()[0]

In [37]:
print(f"The mean and standard deviation for price t+1 are : {stats_price_t1}")
print(f"The mean and standard deviation for price t+2 are : {stats_price_t2}")
print(f"The mean and standard deviation for price t+3 are : {stats_price_t3}")

The mean and standard deviation for price t+1 are : Row(mean=0.7984131664330393, stddev=0.014891596916482598)
The mean and standard deviation for price t+2 are : Row(mean=0.798409260313583, stddev=0.014886865780914332)
The mean and standard deviation for price t+3 are : Row(mean=0.7984054252050001, stddev=0.01488217843407354)


In [38]:
USDGBP_df = USDGBP_df.withColumn("price_t+1_scaled", (col("price_t+1") - stats_price_t1.mean) / stats_price_t1.stddev)
USDGBP_df = USDGBP_df.withColumn("price_t+2_scaled", (col("price_t+2") - stats_price_t2.mean) / stats_price_t2.stddev)
USDGBP_df = USDGBP_df.withColumn("price_t+3_scaled", (col("price_t+3") - stats_price_t3.mean) / stats_price_t3.stddev)

In [39]:
USDGBP_df.printSchema()

root
 |-- closing_price: double (nullable = true)
 |-- highest_price: double (nullable = true)
 |-- lowest_price: double (nullable = true)
 |-- transactions: long (nullable = true)
 |-- opening_price: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- volume: long (nullable = true)
 |-- avg_volume_weight: double (nullable = true)
 |-- ticker: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- features_vector: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)
 |-- scaledFeatures_t-1: vector (nullable = true)
 |-- scaledFeatures_t-2: vector (nullable = true)
 |-- scaledFeatures_t-3: vector (nullable = true)
 |-- scaledFeatures_t-4: vector (nullable = true)
 |-- scaledFeatures_t-5: vector (nullable = true)
 |-- scaledFeatures_t-6: vector (nullable = true)
 |-- scaledFeatures_t-7: vector (nullable = true)
 |-- scaledFeatures_t-8: vector (nullable = true)
 |-- scaledFeatures_t-9: vector (nullable = true)
 |-- scaledFeatures_t-10

In [None]:
pandas_df = USDGBP_df.toPandas()

In [41]:
pandas_df.to_csv('usdgbp.csv', index=False)

## Ticker USDJPY

In [42]:
USDJPY_df = pipeline.fit(USDJPY_df).transform(USDJPY_df)

                                                                                

In [43]:
windowSpec = Window.orderBy("time")

In [44]:
n_steps_in = 24
for i in range(1, n_steps_in + 1):
    USDJPY_df = USDJPY_df.withColumn(f"scaledFeatures_t-{i}", lag(col("scaledFeatures"), i).over(windowSpec))

In [45]:
n_steps_out = 3
for i in range(1, n_steps_out + 1):
    USDJPY_df = USDJPY_df.withColumn(f"price_t+{i}", lead(col("closing_price"), i).over(windowSpec))

In [46]:
for i in range(1, n_steps_out + 1):
    USDJPY_df = USDJPY_df.filter(col(f"price_t+{i}").isNotNull())

for i in range(1, n_steps_in + 1):
    USDJPY_df = USDJPY_df.filter(col(f"scaledFeatures_t-{i}").isNotNull())

In [47]:
stats_price_t1 = USDJPY_df.select(mean("price_t+1").alias("mean"), stddev("price_t+1").alias("stddev")).collect()[0]
stats_price_t2 = USDJPY_df.select(mean("price_t+2").alias("mean"), stddev("price_t+2").alias("stddev")).collect()[0]
stats_price_t3 = USDJPY_df.select(mean("price_t+3").alias("mean"), stddev("price_t+3").alias("stddev")).collect()[0]

print(f"The mean and standard deviation for price t+1 are : {stats_price_t1}")
print(f"The mean and standard deviation for price t+2 are : {stats_price_t2}")
print(f"The mean and standard deviation for price t+3 are : {stats_price_t3}")

USDJPY_df = USDJPY_df.withColumn("price_t+1_scaled", (col("price_t+1") - stats_price_t1.mean) / stats_price_t1.stddev)
USDJPY_df = USDJPY_df.withColumn("price_t+2_scaled", (col("price_t+2") - stats_price_t2.mean) / stats_price_t2.stddev)
USDJPY_df = USDJPY_df.withColumn("price_t+3_scaled", (col("price_t+3") - stats_price_t3.mean) / stats_price_t3.stddev)

24/05/03 18:42:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 18:42:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 18:42:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 18:42:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 18:42:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 18:42:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 1

The mean and standard deviation for price t+1 are : Row(mean=144.2553180523532, stddev=6.2683829963715905)
The mean and standard deviation for price t+2 are : Row(mean=144.25830109860306, stddev=6.269842186716653)
The mean and standard deviation for price t+3 are : Row(mean=144.26124101451245, stddev=6.271226515656858)


In [None]:
pandas_df = USDJPY_df.toPandas()

In [49]:
pandas_df.to_csv('usdjpy.csv', index=False)

## Ticker USDEUR

In [50]:
USDEUR_df = pipeline.fit(USDEUR_df).transform(USDEUR_df)

In [51]:
windowSpec = Window.orderBy("time")

In [52]:
n_steps_in = 24
for i in range(1, n_steps_in + 1):
    USDEUR_df = USDEUR_df.withColumn(f"scaledFeatures_t-{i}", lag(col("scaledFeatures"), i).over(windowSpec))

In [53]:
n_steps_out = 3
for i in range(1, n_steps_out + 1):
    USDEUR_df = USDEUR_df.withColumn(f"price_t+{i}", lead(col("closing_price"), i).over(windowSpec))

In [54]:
for i in range(1, n_steps_out + 1):
    USDEUR_df = USDEUR_df.filter(col(f"price_t+{i}").isNotNull())

for i in range(1, n_steps_in + 1):
    USDEUR_df = USDEUR_df.filter(col(f"scaledFeatures_t-{i}").isNotNull())

In [55]:
stats_price_t1 = USDEUR_df.select(mean("price_t+1").alias("mean"), stddev("price_t+1").alias("stddev")).collect()[0]
stats_price_t2 = USDEUR_df.select(mean("price_t+2").alias("mean"), stddev("price_t+2").alias("stddev")).collect()[0]
stats_price_t3 = USDEUR_df.select(mean("price_t+3").alias("mean"), stddev("price_t+3").alias("stddev")).collect()[0]

print(f"The mean and standard deviation for price t+1 are : {stats_price_t1}")
print(f"The mean and standard deviation for price t+2 are : {stats_price_t2}")
print(f"The mean and standard deviation for price t+3 are : {stats_price_t3}")

USDEUR_df = USDEUR_df.withColumn("price_t+1_scaled", (col("price_t+1") - stats_price_t1.mean) / stats_price_t1.stddev)
USDEUR_df = USDEUR_df.withColumn("price_t+2_scaled", (col("price_t+2") - stats_price_t2.mean) / stats_price_t2.stddev)
USDEUR_df = USDEUR_df.withColumn("price_t+3_scaled", (col("price_t+3") - stats_price_t3.mean) / stats_price_t3.stddev)

24/05/03 18:44:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 18:44:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 18:44:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 18:44:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 18:44:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 18:44:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/03 1

The mean and standard deviation for price t+1 are : Row(mean=0.9234584314720805, stddev=0.012939727591783587)
The mean and standard deviation for price t+2 are : Row(mean=0.9234579169982158, stddev=0.012939192076341945)
The mean and standard deviation for price t+3 are : Row(mean=0.9234573545067903, stddev=0.012938635879544101)


In [None]:
pandas_df = USDEUR_df.toPandas()

In [57]:
pandas_df.to_csv('usdeur.csv', index=False)