In [18]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("btc_price_prediction") \
    .config(
        "spark.jars.packages",
        "org.postgresql:postgresql:42.7.3"
    ) \
    .getOrCreate()




In [80]:
#spark.stop()

## 1. Data Injection (collecte data)

In [89]:
import requests
import os
from dotenv import load_dotenv
load_dotenv() 
def data_collection():

  SYMBOL = 'BTCUSDT'
  INTERVAL = '1m'   # Intervalle d'une minute
  LIMIT = 600    
  api='https://api.binance.com/api/v3/klines'
  response = requests.get(api, params={
    "symbol": SYMBOL,
    "interval": INTERVAL,
    "limit": LIMIT
  })
  if response.status_code != 200:
        print(f"Erreur {response.status_code}")
        return None
  api_data=response.json()
  columns = [
        "open_time", "open_price", "high_price", "low_price", "close_price", "volume",
        "close_time", "quote_asset_volume", "number_of_trades",
        "taker_buy_base_volume", "taker_buy_quote_volume", "ignore"
    ]
  psdf= spark.createDataFrame(api_data, columns)
 
  
  return psdf
  
data = data_collection()
data.show()

+-------------+--------------+--------------+--------------+--------------+-----------+-------------+------------------+----------------+---------------------+----------------------+------+
|    open_time|    open_price|    high_price|     low_price|   close_price|     volume|   close_time|quote_asset_volume|number_of_trades|taker_buy_base_volume|taker_buy_quote_volume|ignore|
+-------------+--------------+--------------+--------------+--------------+-----------+-------------+------------------+----------------+---------------------+----------------------+------+
|1768888260000|91732.17000000|91805.04000000|91732.17000000|91805.03000000|15.56320000|1768888319999|  1428293.44297580|            3151|          12.76808000|      1171744.61757530|     0|
|1768888320000|91805.03000000|91805.04000000|91726.70000000|91726.70000000|11.12407000|1768888379999|  1020929.19367620|            2973|           1.92730000|       176885.33794860|     0|
|1768888380000|91726.71000000|91734.01000000|91724

In [90]:
data.printSchema()

root
 |-- open_time: long (nullable = true)
 |-- open_price: string (nullable = true)
 |-- high_price: string (nullable = true)
 |-- low_price: string (nullable = true)
 |-- close_price: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- close_time: long (nullable = true)
 |-- quote_asset_volume: string (nullable = true)
 |-- number_of_trades: long (nullable = true)
 |-- taker_buy_base_volume: string (nullable = true)
 |-- taker_buy_quote_volume: string (nullable = true)
 |-- ignore: string (nullable = true)



In [91]:
print(f"nombre des lignes dana la dataset :",data.count())

nombre des lignes dana la dataset : 600


In [92]:
from pyspark.sql.functions import col
num_cols= ["open_price", "high_price", "low_price", "close_price", "volume", "quote_asset_volume", "number_of_trades", "taker_buy_base_volume", "taker_buy_quote_volume"]
for feature in num_cols:
      data = data.withColumn(feature, col(feature).cast('double'))

data.printSchema()

root
 |-- open_time: long (nullable = true)
 |-- open_price: double (nullable = true)
 |-- high_price: double (nullable = true)
 |-- low_price: double (nullable = true)
 |-- close_price: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- close_time: long (nullable = true)
 |-- quote_asset_volume: double (nullable = true)
 |-- number_of_trades: double (nullable = true)
 |-- taker_buy_base_volume: double (nullable = true)
 |-- taker_buy_quote_volume: double (nullable = true)
 |-- ignore: string (nullable = true)



In [93]:
data.describe().show()

[Stage 78:>                                                         (0 + 4) / 4]

+-------+-------------------+------------------+-----------------+------------------+-----------------+------------------+-------------------+------------------+-----------------+---------------------+----------------------+------+
|summary|          open_time|        open_price|       high_price|         low_price|      close_price|            volume|         close_time|quote_asset_volume| number_of_trades|taker_buy_base_volume|taker_buy_quote_volume|ignore|
+-------+-------------------+------------------+-----------------+------------------+-----------------+------------------+-------------------+------------------+-----------------+---------------------+----------------------+------+
|  count|                600|               600|              600|               600|              600|               600|                600|               600|              600|                  600|                   600|   600|
|   mean|      1.76890623E12| 91059.35965000003|      91088.60115|      

                                                                                

1. Convertir les colonnes open_time (ms) et close_time (ms) en secondes (s).

In [94]:


data= data.withColumn("open_time_s", col("open_time") / 1000) \
       .withColumn("close_time_s", col("close_time") / 1000)
data= data.drop("open_time", "close_time")

2. Convertir les colonnes open_time_s et close_time_s  en timestamp.

In [95]:
from pyspark.sql.functions import to_timestamp, col

data = data.withColumn("open_time_ts", to_timestamp(col("open_time_s"))) \
       .withColumn("close_time_ts", to_timestamp(col("close_time_s")))
data= data.drop("open_time_s", "close_time_s")
data.printSchema()



root
 |-- open_price: double (nullable = true)
 |-- high_price: double (nullable = true)
 |-- low_price: double (nullable = true)
 |-- close_price: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- quote_asset_volume: double (nullable = true)
 |-- number_of_trades: double (nullable = true)
 |-- taker_buy_base_volume: double (nullable = true)
 |-- taker_buy_quote_volume: double (nullable = true)
 |-- ignore: string (nullable = true)
 |-- open_time_ts: timestamp (nullable = true)
 |-- close_time_ts: timestamp (nullable = true)



### create le target y = close_price(t+10)

In [96]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Creation une fenêtre ordonnée par temps
window = Window.orderBy("open_time_ts")

# Décaler la colonne 'close_price' de 10 lignes (10 minutes)
data_new = data.withColumn("close_t_plus_10", F.lead("close_price", 10).over(window))
data_new.select("close_price","close_t_plus_10","open_time_ts").show()

26/01/20 16:50:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/20 16:50:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/20 16:50:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/20 16:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
26/01/20 16:50:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+-----------+---------------+-------------------+
|close_price|close_t_plus_10|       open_time_ts|
+-----------+---------------+-------------------+
|   91805.03|       91580.01|2026-01-20 06:51:00|
|    91726.7|       91592.84|2026-01-20 06:52:00|
|   91725.23|       91604.27|2026-01-20 06:53:00|
|   91699.26|       91553.74|2026-01-20 06:54:00|
|   91689.07|       91596.19|2026-01-20 06:55:00|
|   91681.32|       91583.95|2026-01-20 06:56:00|
|   91685.86|       91591.25|2026-01-20 06:57:00|
|    91650.0|       91594.05|2026-01-20 06:58:00|
|   91624.18|       91599.99|2026-01-20 06:59:00|
|   91622.11|       91578.06|2026-01-20 07:00:00|
|   91580.01|       91562.01|2026-01-20 07:01:00|
|   91592.84|        91528.0|2026-01-20 07:02:00|
|   91604.27|       91500.01|2026-01-20 07:03:00|
|   91553.74|        91457.4|2026-01-20 07:04:00|
|   91596.19|       91391.62|2026-01-20 07:05:00|
|   91583.95|       91233.76|2026-01-20 07:06:00|
|   91591.25|       90979.01|2026-01-20 07:07:00|
