In [None]:
pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=c35dbb1a868cc2d2dd877d742e7e5cecba0b6046d42f85312d3cb65537f20965
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
# @title Set Up Spark Streaming


In [None]:
import time
import yfinance as yf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
from pyspark.sql.functions import col

# Define the stock symbol (e.g., Apple stock)
SYMBOL = 'CTS'  # Replace with the stock symbol of your choice

# Fetch stock data using Yahoo Finance (yfinance)
def fetch_stock_data_yfinance():
    stock = yf.Ticker(SYMBOL)
    hist = stock.history(period="1d", interval="1m")  # Fetch 1-minute data for today

    # Rename columns and convert data to a list of dictionaries
    hist = hist.rename(columns={
        'Date': 'trade_date',
        'Open': 'open_price',
        'High': 'high_price',
        'Low': 'low_price',
        'Close': 'close_price',
        'Volume': 'volume'
    })

    return [
        {
            'symbol': SYMBOL,
            'timestamp': str(index),
            'low': float(row['low_price']),
            'open': float(row['open_price']),
            'close': float(row['close_price']),# Convert to native float
            'volume': int(row['volume'])  # Convert to native int
        }
        for index, row in hist.iterrows()
    ]

# Convert API data into Spark DataFrame rows
def to_spark_row(stock_data):
    return Row(
        symbol=stock_data['symbol'],
        open=stock_data['open'],
        low=stock_data['low'],
        close=stock_data['close'],
        volume=stock_data['volume'],
        timestamp=stock_data['timestamp']
    )

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RealTimeStockDataYahooFinance") \
    .getOrCreate()

# Define the schema for stock data
schema = StructType([
    StructField("symbol", StringType(), True),
     StructField("low", DoubleType(), True),
    StructField("open", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("volume", LongType(), True),
    StructField("timestamp", StringType(), True)
])

# Fetch and process data with stopping condition
def stream_data_to_spark(max_batches=5):
    batch_count = 0
    while batch_count < max_batches:  # Stop after 5 batches (or any number you choose)
        data = fetch_stock_data_yfinance()  # Fetch stock data using Yahoo Finance API
        if data:
            rows = [to_spark_row(d) for d in data]  # Convert to rows
            df = spark.createDataFrame(rows, schema)  # Create Spark DataFrame

            # Example: Show the DataFrame or replace this with any processing logic
            df.show()

        else:
            print("No data fetched. Retrying...")

        batch_count += 1  # Increment the batch counter
        time.sleep(60)  # Wait for 60 seconds before fetching the next batch of data

    print("Streaming process completed after", batch_count, "batches.")

# Start the data streaming process
stream_data_to_spark(max_batches=1)  # Run for 1 batch for testing



+------+------------------+------------------+------------------+------+--------------------+
|symbol|               low|              open|             close|volume|           timestamp|
+------+------------------+------------------+------------------+------+--------------------+
|   CTS|46.970001220703125| 46.78499984741211| 47.08000183105469|     0|2024-09-18 09:30:...|
|   CTS|  46.8650016784668|  46.8650016784668|  46.8650016784668|   112|2024-09-18 09:35:...|
|   CTS| 46.82500076293945| 46.82500076293945|  46.8650016784668|   575|2024-09-18 09:45:...|
|   CTS|46.904998779296875|46.904998779296875|46.904998779296875|   461|2024-09-18 09:46:...|
|   CTS| 46.96500015258789| 46.96500015258789| 46.96500015258789|   590|2024-09-18 10:03:...|
|   CTS| 46.96500015258789| 46.96500015258789| 46.96500015258789|   199|2024-09-18 10:08:...|
|   CTS|46.790000915527344|46.790000915527344|46.810001373291016|   729|2024-09-18 10:14:...|
|   CTS| 46.82500076293945| 46.82500076293945| 46.8250007629

In [None]:
# @title Anomaly Detection


In [None]:
from pyspark.sql.functions import col, avg, stddev, abs

# Function to perform anomaly detection
def detect_anomalies(df):
    # Define window specifications
    window_spec = Window.orderBy("timestamp").rowsBetween(-5, 0)  # 5-minute window for moving averages

    # Calculate moving average and standard deviation for the 'close' price
    df = df.withColumn("moving_avg", avg(col("close")).over(window_spec))
    df = df.withColumn("moving_stddev", stddev(col("close")).over(window_spec))

    # Define thresholds for anomaly detection
    threshold_price = 0.05  # 5% deviation from moving average
    threshold_volume = 1.5  # 1.5 times the average volume

    # Detect anomalies based on price deviation and volume spikes
    df = df.withColumn("price_anomaly", abs(col("close") - col("moving_avg")) > (col("moving_avg") * threshold_price))
    df = df.withColumn("volume_anomaly", col("volume") > (avg(col("volume")).over(window_spec) * threshold_volume))

    # Filter rows with anomalies
    anomalies = df.filter(col("price_anomaly") | col("volume_anomaly"))

    return anomalies

def stream_data_to_spark(max_batches=5):
    batch_count = 0
    while batch_count < max_batches:
        data = fetch_stock_data_yfinance()
        if data:
            rows = [to_spark_row(d) for d in data]
            df = spark.createDataFrame(rows, schema)

            # Perform anomaly detection
            anomalies = detect_anomalies(df)

            # Show anomalies
            if anomalies.count() > 0:
                anomalies.show()
            else:
                print("No anomalies detected in this batch.")

        else:
            print("No data fetched. Retrying...")

        batch_count += 1
        time.sleep(60)

    print("Streaming process completed after", batch_count, "batches.")

# Start the data streaming process
stream_data_to_spark(max_batches=1)

+------+------------------+------------------+------------------+------+--------------------+------------------+--------------------+-------------+--------------+
|symbol|               low|              open|             close|volume|           timestamp|        moving_avg|       moving_stddev|price_anomaly|volume_anomaly|
+------+------------------+------------------+------------------+------+--------------------+------------------+--------------------+-------------+--------------+
|   CTS|  46.8650016784668|  46.8650016784668|  46.8650016784668|   112|2024-09-18 09:35:...| 46.97250175476074|  0.1520280658510399|        false|          true|
|   CTS| 46.82500076293945| 46.82500076293945|  46.8650016784668|   575|2024-09-18 09:45:...|46.936668395996094| 0.12413039597242927|        false|          true|
|   CTS|46.904998779296875|46.904998779296875|46.904998779296875|   461|2024-09-18 09:46:...| 46.92875099182129| 0.10258156734188123|        false|          true|
|   CTS| 46.9650001525