In [1]:
!uname -a

Linux rajesh 5.15.153.1-microsoft-standard-WSL2 #1 SMP Fri Mar 29 23:14:13 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux


### Streaming Bitcoin Data from Socket to HDFS

This code is used to stream real-time Bitcoin price data from a socket and perform windowed aggregation in Apache Spark. The processed data is then written to HDFS in Parquet format.

#### Steps:
1. **Initialize Spark Session**: A Spark session is created to run the streaming job. The `appName` for the Spark job is set to `"BitcoinSocketStream"`.
   
2. **Define Schema**: The schema for the incoming JSON data is defined, which contains two fields:
   - `timestamp`: The timestamp of the data in string format.
   - `price`: The Bitcoin price in USD (double type).

3. **Read Streaming Data from Socket**: Data is read from a socket source, where the host is `localhost` and the port is `5002`. The incoming data is assumed to be in JSON format.

4. **Parse JSON Data**: The `from_json` function is used to parse the incoming stream of JSON data into a structured format, which is then split into individual columns.

5. **Data Filtering**: The data is filtered to exclude records where the `price` is `null`.

6. **Timestamp Conversion**: The `timestamp` field is cast to a `timestamp` type, which allows it to be used for time-based operations.

7. **Windowed Aggregation**: A watermark of `1 minute` is applied to the `timestamp` column to allow Spark to handle late-arriving data. The data is then aggregated in 1-minute windows, calculating the average price in each window.

8. **Write Stream to HDFS**: The aggregated data is written to HDFS in Parquet format, with the path specified as `hdfs://localhost:9000/user/rajesh/bitcoin_data`. A checkpoint directory is used to store the state of the streaming query to ensure fault tolerance.

#### Output:
The data is stored in Parquet format in the specified HDFS path, with each file containing the average Bitcoin price per minute. A checkpoint is also maintained for the stream.



In [5]:
from pyspark.sql import SparkSession

# Create SparkSession for streaming (local[*] uses all local cores)
spark = SparkSession.builder \
    .appName("BitcoinStreaming") \
    .master("local[*]") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

25/05/01 09:49:51 WARN Utils: Your hostname, rajesh resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/01 09:49:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/01 09:49:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/01 09:49:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, window, avg
from pyspark.sql.types import StructType, StringType, DoubleType
from pyspark.sql.functions import window, avg

# Initialize Spark
spark = SparkSession.builder \
    .appName("BitcoinSocketStream") \
    .getOrCreate()

# Define schema for JSON data
schema = StructType() \
    .add("timestamp", StringType()) \
    .add("price", DoubleType())

# Read stream from socket
lines = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 5002) \
    .load()

# Parse JSON
json_df = lines.select(from_json(col("value"), schema).alias("data")).select("data.*")

# Filter out bad data (i.e., where price is null)
filtered_df = json_df.filter(col("price").isNotNull())

# Convert timestamp to timestamp type
filtered_df = filtered_df.withColumn("timestamp", col("timestamp").cast("timestamp"))
from pyspark.sql.functions import window, avg

# Add watermark before aggregation
aggregated_df = filtered_df \
    .withWatermark("timestamp", "1 minute") \
    .groupBy(window(col("timestamp"), "1 minute")) \
    .agg(avg("price").alias("avg_price"))




# Output to HDFS in Parquet format
query = aggregated_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "hdfs://localhost:9000/user/rajesh/bitcoin_data") \
    .option("checkpointLocation", "hdfs://localhost:9000/user/rajesh/checkpoint") \
    .start()



                                                                                

# Inspecting Bitcoin Data from HDFS

This notebook demonstrates how to load Bitcoin data from HDFS and inspect the data by extracting and displaying the full timestamp information.


In [41]:

# Load Parquet data from HDFS
df = spark.read.parquet("hdfs://localhost:9000/user/rajesh/bitcoin_data")

# Show schema and data for inspection
df.printSchema()
df.show(5)

                                                                                

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- avg_price: double (nullable = true)

+--------------------+---------+
|              window|avg_price|
+--------------------+---------+
|{2025-05-01 20:07...|  96526.0|
|{2025-05-01 20:08...|  96529.6|
|{2025-05-01 20:09...|  96544.0|
|{2025-05-01 20:10...|  96536.0|
|{2025-05-01 20:11...|  96529.2|
+--------------------+---------+
only showing top 5 rows



In [40]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("InspectBitcoinData").getOrCreate()

df = spark.read.parquet("hdfs://localhost:9000/user/rajesh/bitcoin_data")

# Extract the start timestamp from the window struct
df_with_full_time = df.withColumn("datetime", col("window.start"))

# Show the results with the full time
df_with_full_time.select("datetime", "avg_price").show(truncate=False)


+-------------------+---------+
|datetime           |avg_price|
+-------------------+---------+
|2025-05-01 20:07:00|96526.0  |
|2025-05-01 20:08:00|96529.6  |
|2025-05-01 20:09:00|96544.0  |
|2025-05-01 20:10:00|96536.0  |
|2025-05-01 20:11:00|96529.2  |
|2025-05-01 20:14:00|96562.0  |
|2025-05-01 20:15:00|96604.0  |
|2025-05-01 20:16:00|96607.0  |
|2025-05-01 20:17:00|96607.0  |
|2025-05-01 20:18:00|96627.8  |
|2025-05-01 20:19:00|96625.0  |
|2025-05-01 20:20:00|96625.0  |
|2025-05-01 21:28:00|97106.0  |
|2025-05-01 21:29:00|97106.0  |
|2025-05-01 21:30:00|97093.0  |
|2025-05-01 21:31:00|97061.2  |
|2025-05-01 21:32:00|97054.0  |
|2025-05-01 21:33:00|97027.0  |
|2025-05-01 21:34:00|97007.0  |
|2025-05-01 21:35:00|97007.0  |
+-------------------+---------+
only showing top 20 rows



In [51]:
#Preprocess Data
from pyspark.sql.functions import col

# Extract the start timestamp from the window struct
df_with_full_time = df.withColumn("timestamp", col("window.start"))

# Show the results with the full time
df_with_full_time.select("timestamp", "avg_price").show(truncate=False)



[Stage 658:>                                                        (0 + 1) / 1]

+-------------------+---------+
|timestamp          |avg_price|
+-------------------+---------+
|2025-05-01 20:07:00|96526.0  |
|2025-05-01 20:08:00|96529.6  |
|2025-05-01 20:09:00|96544.0  |
|2025-05-01 20:10:00|96536.0  |
|2025-05-01 20:11:00|96529.2  |
|2025-05-01 20:14:00|96562.0  |
|2025-05-01 20:15:00|96604.0  |
|2025-05-01 20:16:00|96607.0  |
|2025-05-01 20:17:00|96607.0  |
|2025-05-01 20:18:00|96627.8  |
|2025-05-01 20:19:00|96625.0  |
|2025-05-01 20:20:00|96625.0  |
|2025-05-01 21:28:00|97106.0  |
|2025-05-01 21:29:00|97106.0  |
|2025-05-01 21:30:00|97093.0  |
|2025-05-01 21:31:00|97061.2  |
|2025-05-01 21:32:00|97054.0  |
|2025-05-01 21:33:00|97027.0  |
|2025-05-01 21:34:00|97007.0  |
|2025-05-01 21:35:00|97007.0  |
+-------------------+---------+
only showing top 20 rows



                                                                                

In [52]:
#Time Series Analysis:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import avg, lag
from pyspark.sql.window import Window

# Create a moving average of Bitcoin prices to smooth out fluctuations
window_spec = Window.orderBy("timestamp").rowsBetween(-5, 0)

df_clean = df_with_full_time.withColumn("moving_avg", avg(col("avg_price")).over(window_spec))

# Show the results with moving average
df_clean.select("timestamp", "avg_price", "moving_avg").show(10)




+-------------------+---------+-----------------+
|          timestamp|avg_price|       moving_avg|
+-------------------+---------+-----------------+
|2025-05-01 20:07:00|  96526.0|          96526.0|
|2025-05-01 20:08:00|  96529.6|          96527.8|
|2025-05-01 20:09:00|  96544.0|          96533.2|
|2025-05-01 20:10:00|  96536.0|          96533.9|
|2025-05-01 20:11:00|  96529.2|96532.95999999999|
|2025-05-01 20:12:00|  96525.0|96531.63333333335|
|2025-05-01 20:13:00|  96562.0|96537.63333333335|
|2025-05-01 20:14:00|  96562.0|96543.03333333333|
|2025-05-01 20:15:00|  96604.0|96553.03333333333|
|2025-05-01 20:16:00|  96607.0|96564.86666666665|
+-------------------+---------+-----------------+
only showing top 10 rows



                                                                                

In [53]:
#Create Features for Forecasting
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

# Define a new window spec for lag (just order by timestamp)
lag_window = Window.orderBy("timestamp")

# Create lagged price column
df_clean = df_clean.withColumn("lagged_price", lag("avg_price", 1).over(lag_window))

# Display the data with lagged price
df_clean.select("timestamp", "avg_price", "lagged_price").show(10, truncate=False)




+-------------------+---------+------------+
|timestamp          |avg_price|lagged_price|
+-------------------+---------+------------+
|2025-05-01 20:07:00|96526.0  |null        |
|2025-05-01 20:08:00|96529.6  |96526.0     |
|2025-05-01 20:09:00|96544.0  |96529.6     |
|2025-05-01 20:10:00|96536.0  |96544.0     |
|2025-05-01 20:11:00|96529.2  |96536.0     |
|2025-05-01 20:12:00|96525.0  |96529.2     |
|2025-05-01 20:13:00|96562.0  |96525.0     |
|2025-05-01 20:14:00|96562.0  |96562.0     |
|2025-05-01 20:15:00|96604.0  |96562.0     |
|2025-05-01 20:16:00|96607.0  |96604.0     |
+-------------------+---------+------------+
only showing top 10 rows



                                                                                