### test 2

#### Lưu ý
- trong spark streaming dùng readStream để xử lý dữ liệu
- và dùng writeStreaming để ghi dữ liệu ra màn hình hoặc lưu dữ liệu


In [13]:
import numpy as np
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-streaming')
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5")
         .getOrCreate())
sc = spark.sparkContext

# Khởi tạo Kafka consumer
kafka_bootstrap_servers = 'kafka1:19091,kafka2:19092,kafka3:19093' # Địa chỉ Kafka của các container Kafka
kafka_topic = 'USDCHF_i30'  # Tên topic Kafka

# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("subscribe", kafka_topic) \
  .load()

In [14]:
df.printSchema()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, explode, arrays_zip, lag, when, avg, pow, sqrt, mean, stddev
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType
from pyspark.sql.window import Window
import pandas as pd
# Tạo SparkSession
spark = SparkSession.builder.appName("jsonStreaming").getOrCreate()

# Định nghĩa schema cho dữ liệu JSON
jsonSchema = StructType([
    StructField("date", ArrayType(StringType())),
    StructField("open", ArrayType(DoubleType())),
    StructField("high", ArrayType(DoubleType())),
    StructField("low", ArrayType(DoubleType())),
    StructField("close", ArrayType(DoubleType())),
    StructField("volume", ArrayType(IntegerType()))
])

# Đọc dữ liệu từ nguồn (ví dụ: Kafka, file, ...)

# Hàm tính EMA
def calculate_ema(column, period):
    alpha = 1 - (2 / (period + 1))  # Smoothing factor for EMA
    window = Window.orderBy('Date').rowsBetween(Window.unboundedPreceding, 0)
    ema = avg(column).over(window)
    return ema

# Hàm tính RSI
def calculate_rsi(df, column, period):
    window = Window.orderBy('Date')
    daily_change = col(column) - lag(column, 1).over(window)
    gain = when(daily_change > 0, daily_change).otherwise(0)
    loss = when(daily_change < 0, -daily_change).otherwise(0)
    avg_gain = avg(gain).over(Window.orderBy('Date').rowsBetween(-period, -1))
    avg_loss = avg(loss).over(Window.orderBy('Date').rowsBetween(-period, -1))
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

# Hàm tính Turbulence
def calculate_turbulence(df, window_period):
    windowSpec = Window.orderBy('Date').rowsBetween(-window_period, -1)
    mean_close = mean('Close').over(windowSpec)
    stddev_close = stddev('Close').over(windowSpec)
    turbulence = (col('Close') - mean_close) / stddev_close
    return df.withColumn('Turbulence', turbulence)
import requests

# Hàm xử lý từng batch
import requests
import json
def write_to_pandas(df, epoch_id):
    # Check if the DataFrame is empty
    if df.rdd.isEmpty():
        print(f"Batch {epoch_id} is empty, skipping.")
        return

    # Existing processing code
    df = df.withColumn('EMA', calculate_ema('Close', 12))
    df = df.withColumn('RSI', calculate_rsi(df, 'Close', 14))
    df = calculate_turbulence(df, 20)

    # Convert to Pandas DataFrame
    pandas_df = df.toPandas()

    # Handle NaN values for JSON conversion
    pandas_df = pandas_df.where(pd.notnull(pandas_df), None)

    # Convert the DataFrame to a JSON string
    json_payload = json.dumps(pandas_df.to_json(orient='records'))
#     print(json_payload)
    # Define the URL of your FastAPI endpoint
    url = 'http://host.docker.internal:8000/api/sparkreceive/predictions_values'

    # Make a POST request to the FastAPI endpoint
    try:
        headers = {'Content-Type': 'application/json'}
        response = requests.post(url, data=json_payload, headers=headers)
        response.raise_for_status()  # Raise an exception for HTTP errors
        print(f"Batch {epoch_id} successfully sent. Response: {response.text}")
    except requests.RequestException as e:
        print(f"Error sending batch {epoch_id}: {e}")

# Rest of your Spark streaming code

    
# Chuyển đổi và phân tích cú pháp JSON
df_parsed = df.selectExpr("CAST(value AS STRING)") \
              .select(from_json(col("value"), jsonSchema).alias("data")) \
              .select("data.*")

# Kết hợp các mảng thành dòng riêng biệt
df_exploded = df_parsed.select(
    explode(arrays_zip("date", "open", "high", "low", "close", "volume")).alias("data")
).select(
    col("data.date").alias("Date"),
    col("data.open").alias("Open"),
    col("data.high").alias("High"),
    col("data.low").alias("Low"),
    col("data.close").alias("Close"),
    col("data.volume").alias("Volume")
)

# Áp dụng hàm xử lý cho từng batch
query = df_exploded.writeStream \
                   .outputMode("append") \
                   .foreachBatch(write_to_pandas) \
                   .start()

query.awaitTermination()


Batch 0 is empty, skipping.
"[{\"Date\":\"202312230030\",\"Open\":0.85624,\"High\":0.8564,\"Low\":0.85575,\"Close\":0.85594,\"Volume\":4511,\"EMA\":0.85594,\"RSI\":null,\"Turbulence\":null},{\"Date\":\"202312230100\",\"Open\":0.85595,\"High\":0.85622,\"Low\":0.8556,\"Close\":0.85566,\"Volume\":4356,\"EMA\":0.8558,\"RSI\":null,\"Turbulence\":null},{\"Date\":\"202312230130\",\"Open\":0.85565,\"High\":0.85627,\"Low\":0.85555,\"Close\":0.85611,\"Volume\":2997,\"EMA\":0.8559033333,\"RSI\":0.0,\"Turbulence\":1.5657364441},{\"Date\":\"202312230200\",\"Open\":0.8561,\"High\":0.85712,\"Low\":0.85582,\"Close\":0.85709,\"Volume\":3680,\"EMA\":0.8562,\"RSI\":61.6438356164,\"Turbulence\":5.2223221866},{\"Date\":\"202312230230\",\"Open\":0.85708,\"High\":0.85733,\"Low\":0.85601,\"Close\":0.85602,\"Volume\":3229,\"EMA\":0.856164,\"RSI\":83.6257309941,\"Turbulence\":-0.2895452765},{\"Date\":\"202312230300\",\"Open\":0.85603,\"High\":0.85605,\"Low\":0.85567,\"Close\":0.85585,\"Volume\":2292,\"EMA\":0.8

In [12]:
spark.stop()

In [39]:
import requests

# Define the URL of your FastAPI endpoint
url = 'http://host.docker.internal:8000/api/predictions_values'

# Sample JSON payload as a string
sample_payload = json.dumps([
    {
        "Date": "2023-01-01",
        "Open": 100.5,
        "High": 110.2,
        "Low": 99.8,
        "Close": 105.3,
        "Volume": 5000,
        "EMA": 104.7,
        "RSI": 60.5,
        "Turbulence": 0.3
    },
    {
        "Date": "2023-01-02",
        "Open": 105.4,
        "High": 115.2,
        "Low": 104.8,
        "Close": 110.3,
        "Volume": 6000,
        "EMA": 109.7,
        "RSI": 65.5,
        "Turbulence": 0.4
    }
])


# Make a POST request to the FastAPI endpoint
try:
    headers = {'Content-Type': 'text/plain'}
    response = requests.post(url, data=sample_payload, headers=headers)
    response.raise_for_status()  # Raise an exception for HTTP errors
    print(f"Request successfully sent. Response: {response.text}")
except requests.RequestException as e:
    print(f"Error sending request: {e}")


Error sending request: HTTPConnectionPool(host='host.docker.internal', port=8000): Max retries exceeded with url: /api/predictions_values (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7feb7a190d90>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))


In [38]:
import requests

# Define the URL for the FastAPI endpoint
url = "http://host.docker.internal:8000/api/models/"

# Set the parameters for the request
params = {
    "skip": 0,
    "limit": 10
}

# Attempt to send a GET request to the FastAPI endpoint
try:
    response = requests.get(url, params=params)
    if response.status_code == 200:
        # If the request is successful, print the response content
        result = response.json()
    else:
        # If the request fails, print the status code
        result = f"Request failed with status code: {response.status_code}"
except requests.RequestException as e:
    # If there is an error in sending the request, print the error
    result = str(e)

result



"HTTPConnectionPool(host='host.docker.internal', port=8000): Max retries exceeded with url: /api/models/?skip=0&limit=10 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7feb93473ad0>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))"

In [28]:
spark.stop()

In [29]:
last_df

[]