Step 2: Data Cleaning & Validation

Create Spark Session

In [9]:
# code for Data cleaning and validation
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, when, count, isnan

spark = SparkSession.builder \
    .appName("Crypto Data Cleaning") \
    .getOrCreate()


Load CSV File

In [10]:
url = r"C:\Users\bnsah\OneDrive\文档\PROJECTS(FINAL)\PROJECTS\Academic_project\BDA_BITCOIN\CryptoPredict\data\synthetic_bitcoin_2014_2023.csv"
df = spark.read.csv(url, header=True, inferSchema=True)
df.show(5)
print(f"Total Rows: {df.count()}")


+----------+------------------+------------------+------------------+------------------+------+------------------+
|      Date|              Open|              High|               Low|             Close|Volume|         MarketCap|
+----------+------------------+------------------+------------------+------------------+------+------------------+
|2014-01-01|20968.082996411034| 20829.91171069015|20870.791135428568|20945.270521984607| 19033|3986513.3384493305|
|2014-01-02| 19522.47194008754|19797.146770887925|19423.537425631155|19674.704767049698| 36088| 7100207.456332895|
|2014-01-03| 20701.45138135251| 20785.33115376397|20329.565015906777| 20650.36962066113| 44465| 9182186.851826971|
|2014-01-04|  20938.4878660132| 20980.01940620715| 20754.12859635343|20866.669464301012| 18527|3865967.8516510488|
|2014-01-05|20456.964154911486|20312.142864650312|20309.665064519202| 20509.47348177957| 12964|2658848.1421779036|
+----------+------------------+------------------+------------------+-----------

Handle Missing / Null Values

In [11]:
from pyspark.sql.functions import col, when, count

df.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df.columns
]).show()


+----+----+----+---+-----+------+---------+
|Date|Open|High|Low|Close|Volume|MarketCap|
+----+----+----+---+-----+------+---------+
|   0|   0|   0|  0|    0|     0|        0|
+----+----+----+---+-----+------+---------+



Remove Duplicates

In [12]:
before = df.count()
df_no_duplicates = df.dropDuplicates()
after = df_no_duplicates.count()
print(f"Removed {before - after} duplicate rows")


Removed 0 duplicate rows


Detect & Remove Outliers (on “Close”)

In [13]:
stats = df_no_duplicates.select(
    mean(col("Close")).alias("mean"),
    stddev(col("Close")).alias("stddev")
).collect()

mean_val = stats[0]["mean"]
std_val = stats[0]["stddev"]

print(f"Mean Close Price: {mean_val:.2f}, Std Dev: {std_val:.2f}")

df_no_outliers = df_no_duplicates.filter(
    (col("Close") >= mean_val - 3 * std_val) &
    (col("Close") <= mean_val + 3 * std_val)
)

print(f"After removing outliers: {df_no_outliers.count()} rows")


Mean Close Price: 20518.57, Std Dev: 1012.62
After removing outliers: 3642 rows


Validate Cleaned Data

In [14]:
df_no_outliers.describe(["Open", "High", "Low", "Close", "Volume"]).show()

+-------+------------------+------------------+------------------+------------------+------------------+
|summary|              Open|              High|               Low|             Close|            Volume|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|              3642|              3642|              3642|              3642|              3642|
|   mean| 20514.83483401755|20616.696558847096| 20412.37077296476| 20514.97795462582|24989.030203185062|
| stddev|1000.3915896959207|1004.2657343796701|1004.2033573771114| 1000.015953448771|14303.377773427568|
|    min|17554.468854404855| 17552.02489278787|17327.930855040104|17483.292078958923|              1010|
|    max| 23528.89543327984|23808.953125325203|23406.057555410658| 23519.65383486466|             49975|
+-------+------------------+------------------+------------------+------------------+------------------+



Save Cleaned Data (optional)

In [None]:
#df_no_outliers.write.csv("cleaned_bitcoin_data.csv", header=True, mode="overwrite")

Step 3: Feature Engineering

In [None]:
# Feature Engineering for feature extraction 