In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DoubleType, IntegerType, LongType
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
from pyspark.sql import SparkSession


keyfile_path = "btcanalysishust-b10a2ef12088.json"

spark = SparkSession.builder \
    .appName("KafkaConsumer") \
  .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.gs.auth.type", "OAuth2") \
    .config("spark.hadoop.fs.gs.project.id", "btcanalysishust") \
    .config("spark.hadoop.fs.gs.input.close.input.streams.after.task.complete", "true") \
    .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", keyfile_path) \
    .getOrCreate()



In [2]:
schema = StructType([
    StructField("timestamp", StringType()),
    StructField("prices", StructType([
        StructField("bitcoin", FloatType()),
        StructField("ethereum", FloatType()),
        StructField("tether", FloatType()),
        StructField("usd-coin", FloatType()),
        StructField("ripple", FloatType()),
        StructField("cardano", FloatType()),
        StructField("dogecoin", FloatType()),
        StructField("matic-network", FloatType()),
        StructField("solana", FloatType()),
        StructField("litecoin", FloatType()),
        StructField("polkadot", FloatType()),
        StructField("shiba-inu", FloatType()),
        StructField("tron", FloatType()),
        StructField("cosmos", FloatType()),
        StructField("chainlink", FloatType()),
        StructField("stellar", FloatType()),
        StructField("near", FloatType()),
    ]))
])

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "35.206.252.44:9092") \
    .option("subscribe", "crypto-prices") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 1000) \
    .load()
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(F.from_json(F.col("value"), schema).alias("data"))

crypto_parsed_df = parsed_df.select(
    F.to_timestamp(F.col("data.timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX").alias("DATE"),
    F.col("data.prices.bitcoin").alias("BTC"),
    F.col("data.prices.ethereum").alias("ETH"),
    F.col("data.prices.tether").alias("USDT"),
    F.col("data.prices.usd-coin").alias("USDC"),
    F.col("data.prices.ripple").alias("XRP"),
    F.col("data.prices.cardano").alias("ADA"),
    F.col("data.prices.dogecoin").alias("DOGE"),
    F.col("data.prices.matic-network").alias("MATIC"),
    F.col("data.prices.solana").alias("SOL")
)


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

In [None]:
# In ra kết quả để kiểm tra
crypto_parsed_df.show(truncate=False)  # Dùng .show() để in ra một vài dòng đầu tiên của DataFram

In [3]:
from google.cloud import storage
from datetime import datetime
from pyspark.sql.functions import to_date, col
from dateutil.relativedelta import relativedelta
def get_last_saved_date(crypto_id, storage_path : str):
    """
    Kiểm tra ngày cuối cùng đã được lưu trữ trong GCS hoặc HDFS.
    """
    storage_client = storage.Client.from_service_account_json("btcanalysishust-b10a2ef12088.json")
    blobs = storage_client.list_blobs(storage_path, prefix=f"ver2/{crypto_id}/")
    last_saved_date = None
    for blob in blobs:
        # Trích xuất ngày từ tên thư mục
        path_parts = blob.name.split('/')

        if len(path_parts) > 2: 
            year, month = path_parts[2], path_parts[3]
            date = datetime.strptime(f"{year}-{month.zfill(2)}","%Y-%m")
            if last_saved_date is None or date > last_saved_date:
                last_saved_date = date

    return last_saved_date.strftime("%Y-%m")

def get_gcs_price(crypto_id : str, start_date : str , end_date : str):
    """
    Lấy dữ liệu giá từ GCS trong khoảng thời gian chỉ định và hợp nhất thành một bảng Spark.

    Args:
        crypto_id (str): Tên tài sản (crypto, cổ phiếu, v.v.).
        start_date (str): Ngày bắt đầu (định dạng 'YYYY-MM').
        end_date (str): Ngày kết thúc (định dạng 'YYYY-MM').

    Returns:
        DataFrame: Bảng Spark chứa tất cả dữ liệu giá hợp nhất.
    """
    # Chuyển đổi chuỗi ngày thành đối tượng datetime
    start = datetime.strptime(start_date, "%Y-%m")
    end = datetime.strptime(end_date, "%Y-%m")
    
    # Kiểm tra ngày bắt đầu phải nhỏ hơn hoặc bằng ngày kết thúc
    if start > end:
        raise ValueError("start_date phải nhỏ hơn hoặc bằng end_date")
    
    # Khởi tạo danh sách kết quả
    all_prices = []
    
    # Lặp qua từng tháng
    current = start.replace(day=1)  # Đặt ngày thành ngày đầu tiên của tháng
    while current <= end:
        curr_price_dir=f"gs://crypto-historical-data-2/ver2/{crypto_id}/{current.year}/{current.month:02}/data.parquet"
        all_prices.append(spark.read.parquet(curr_price_dir))
        if current.month == 12:  # Nếu là tháng 12, chuyển sang tháng 1 năm sau
            current = current.replace(year=current.year + 1, month=1)
        else:
            current = current.replace(month=current.month + 1)
    if all_prices:
        merged_data = all_prices[0]
        for df in all_prices[1:]:
            merged_data = merged_data.union(df)
        merged_data = merged_data.withColumn('DATE', to_date('DATE', 'yyyy-MM-dd'))
        # Sort by the transformed date column
        sorted_data = merged_data.orderBy('DATE', ascending= True)

        # Sort the DataFrame by the date column
        return sorted_data.select(col('DATE'),col('HIGH'),col('LOW'),col('CLOSE'))
    else:
        raise ValueError("Không tìm thấy dữ liệu trong khoảng thời gian được chỉ định.")

In [4]:
data=get_gcs_price('BTC','2024-09','2024-10')
data.show()

+----------+--------+--------+--------+
|      DATE|    HIGH|     LOW|   CLOSE|
+----------+--------+--------+--------+
|2024-09-01|59070.55| 57200.0| 57299.0|
|2024-09-02| 59423.0|57119.01|59139.83|
|2024-09-03| 59825.7|57394.49|57468.84|
|2024-09-04|58531.25| 55555.0| 57971.0|
|2024-09-05|58326.12|55628.04|56156.82|
|2024-09-06| 56995.0| 52530.0|53950.01|
|2024-09-07| 54847.0| 53733.1|54156.33|
|2024-09-08|55315.95|53623.95|54881.11|
|2024-09-09|58119.97|54565.56| 57053.9|
|2024-09-10|58050.35|56377.76|57645.59|
|2024-09-11|58014.35|55534.41|57352.79|
|2024-09-12| 58600.0|57311.15|58137.54|
|2024-09-13| 60670.0|57630.01|60543.35|
|2024-09-14| 60660.0| 59436.8|60012.35|
|2024-09-15|60402.34|58695.75|59122.33|
|2024-09-16|59214.15| 57477.0|58208.75|
|2024-09-17|61373.41|57620.27| 60312.6|
|2024-09-18| 61800.0| 59174.5|61769.18|
|2024-09-19|63891.82|61569.16|62960.14|
|2024-09-20|64140.67| 62340.0|63210.69|
+----------+--------+--------+--------+
only showing top 20 rows



In [None]:

from pyspark.sql.functions import col, max as spark_max, min as spark_min, avg
from pyspark.sql.window import Window
class STOCK:
    def __init__(self, period_k=9, period_d=6, storage_path='crypto-historical-data-2' ):
        self.k=period_k
        self.d=period_d
        self.storage_path=storage_path
    def get_data(self,crypto_id):
        lastest_month=get_last_saved_date(crypto_id,self.storage_path)
        comeback_month=(self.k+self.d-1)//28+1
        start_month=datetime.strptime(lastest_month,'%Y-%m')-relativedelta(months=comeback_month)
        historical_data_df=get_gcs_price(crypto_id,start_month.strftime('%Y-%m'), lastest_month)
        # gia tri gia su
        current_close=95285.96
        timestap='2024-11-26'
        
        current_close_df = spark.createDataFrame([(current_close,current_close,current_close, timestap)], ["CLOSE",'HIGH',"LOW",'DATE'])
        combined_data_df = historical_data_df.unionByName(current_close_df,allowMissingColumns=True)
        return spark.createDataFrame(combined_data_df.tail(self.k+self.d-1))
    def calculate(self, crypto_id):
        combined_data_df=self.get_data(crypto_id)
        # Kiểm tra cột HIGH và LOW
        if not {'HIGH', 'LOW'}.issubset(combined_data_df.columns):
            raise ValueError("Dữ liệu thiếu cột HIGH hoặc LOW.")
        window_k = Window.orderBy("DATE").rowsBetween(-(self.k - 1), 0)
        combined_data_df = combined_data_df.withColumn(f"HIGH_{self.k}", spark_max("HIGH").over(window_k))
        combined_data_df = combined_data_df.withColumn(f"LOW_{self.k}", spark_min("LOW").over(window_k))

        # Tính %K
        combined_data_df = combined_data_df.withColumn(
            "%K", 
            100 * (col("CLOSE") - col(f"LOW_{self.k}")) / (col(f"HIGH_{self.k}") - col(f"LOW_{self.k}"))
        ).fillna(0, subset=["%K"])  # Thay NaN bằng 0 nếu (HIGH_k - LOW_k) = 0.
        
        # Sử dụng cửa sổ để tính trung bình động cho %D
        window_d = Window.orderBy("DATE").rowsBetween(-(self.d - 1), 0)
        combined_data_df = combined_data_df.withColumn("%D", avg("%K").over(window_d))
        
        # Trả về DataFrame với %K và %D
        return combined_data_df.select( "%K", "%D").tail(1)[0].asDict()
    def calculate__for_all(self,crypto_ids):
        all_stocks={}
        for crypto_id in crypto_ids:
            all_stocks[crypto_id] = self.calculate(crypto_id)
        return all_stocks
cal_stock=STOCK()
df=cal_stock.calculate('BTC')
df

{'%K': 56.3851687986003, '%D': 78.55468205756692}

In [None]:
class MACD:
    def __init__(self, phien_1, phien_2, storage_path ='crypto-historical-data-2') -> None:
        self.phien_1=phien_1
        self.phien_2=phien_2
        self.storage_path=storage_path
    def get_data(self, crypto_id):
        ema_dir=f'{crypto_id}'
        ema_phien1, ema_phien2=spark.read.parquet(ema_dir)
        return ema_phien1,ema_phien2
    def calculate(self, crypto_id):
        ema_phien1,ema_phien2=self.get_data(crypto_id)
        return ema_phien1-ema_phien2
    def calculate__for_all(self,crypto_ids):
        all_stocks={}
        for crypto_id in crypto_ids:
            all_stocks[crypto_id] = self.calculate(crypto_id)
        return all_stocks