In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType

spark = SparkSession.builder.appName("pipeline").getOrCreate()

In [None]:
# pipeline:
# input -> transformation -> output -> validation -> analysis

In [2]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("pratyushpuri/crypto-market-sentiment-and-price-dataset-2025")

print("Path to dataset files:", path)

  from .autonotebook import tqdm as notebook_tqdm


Downloading from https://www.kaggle.com/api/v1/datasets/download/pratyushpuri/crypto-market-sentiment-and-price-dataset-2025?dataset_version_number=1...


100%|██████████| 91.0k/91.0k [00:00<00:00, 534kB/s]

Extracting model files...





Path to dataset files: C:\Users\kizer\.cache\kagglehub\datasets\pratyushpuri\crypto-market-sentiment-and-price-dataset-2025\versions\1


In [13]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("cryptocurrency", StringType(), True),
    StructField("current_price_usd", FloatType(), True),
    StructField("price_change_24h_percent", FloatType(), True),
    StructField("trading_volume_24h", FloatType(), True),
    StructField("market_cap_usd", FloatType(), True),
    StructField("social_sentiment_score", FloatType(), True),
    StructField("news_sentiment_score", FloatType(), True),
    StructField("news_impact_score", FloatType(), True),
    StructField("social_mentions_count", FloatType(), True),
    StructField("fear_greed_index", FloatType(), True),
    StructField("volatility_index", FloatType(), True),
    StructField("rsi_technical_indicator", FloatType(), True),
    StructField("prediction_confidence", FloatType(), True),
])

In [14]:
full_path = "C:\\Users\\kizer\\.cache\\kagglehub\\datasets\\pratyushpuri\\crypto-market-sentiment-and-price-dataset-2025\\versions\\1\\crypto_sentiment_prediction_dataset.csv"
# during csv loading we can automatically removes blank lines and remove comments using optional argument 'comment'
# argument 'sep' to point type of separator in file
df = spark.read.csv(full_path, header=True, schema=schema)
df.show()

+-------------------+--------------+-----------------+------------------------+------------------+--------------+----------------------+--------------------+-----------------+---------------------+----------------+----------------+-----------------------+---------------------+
|          timestamp|cryptocurrency|current_price_usd|price_change_24h_percent|trading_volume_24h|market_cap_usd|social_sentiment_score|news_sentiment_score|news_impact_score|social_mentions_count|fear_greed_index|volatility_index|rsi_technical_indicator|prediction_confidence|
+-------------------+--------------+-----------------+------------------------+------------------+--------------+----------------------+--------------------+-----------------+---------------------+----------------+----------------+-----------------------+---------------------+
|2025-06-04 20:36:49|      Algorand|           0.3427|                   -5.35|         1716266.1|  1.76212403E9|                 0.367|               0.374|         

In [15]:
df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- cryptocurrency: string (nullable = true)
 |-- current_price_usd: float (nullable = true)
 |-- price_change_24h_percent: float (nullable = true)
 |-- trading_volume_24h: float (nullable = true)
 |-- market_cap_usd: float (nullable = true)
 |-- social_sentiment_score: float (nullable = true)
 |-- news_sentiment_score: float (nullable = true)
 |-- news_impact_score: float (nullable = true)
 |-- social_mentions_count: float (nullable = true)
 |-- fear_greed_index: float (nullable = true)
 |-- volatility_index: float (nullable = true)
 |-- rsi_technical_indicator: float (nullable = true)
 |-- prediction_confidence: float (nullable = true)



In [4]:
social_sentiment_negative_count = df.filter(df['social_sentiment_score'] < 0).count()
print(social_sentiment_negative_count)
print(social_sentiment_negative_count / df.count() * 100)

1003
48.61851672321861


In [None]:
# validation via joins:
# taking two files, we do right join on file which is valid so only company that match valid company (for example) will be taken
#complex rule validation:
# calculations, against external source, udfs
import pyspark.sql.functions as F
valid_df = df.withColumnRenamed()
joined_df = df.join()
F.broadcast(joined_df)
#need two data frame one on which validation?join could be perform

In [11]:
def avg_price_cahnge(crypto_list):
    total_change = 0
    count = 0
    for crypto in crypto_list:
        total_change += crypto
        count += 1
    return total_change / count
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

udfGetAvgChange = udf(avg_price_cahnge, FloatType())
avg_change_all_crypto = udfGetAvgChange(df['price_change_24h_percent'])
avg_change_all_crypto.collect()

+-----------------------------+
|avg(price_change_24h_percent)|
+-----------------------------+
|         -0.01804168686379...|
+-----------------------------+



In [12]:
from pyspark.sql.functions import avg
avg_price_change_percentage = df.select(avg("price_change_24h_percent"))
avg_price_change_percentage.show()

+-----------------------------+
|avg(price_change_24h_percent)|
+-----------------------------+
|         -0.01804168686379...|
+-----------------------------+



In [None]:
#data camp example of udf usage in analysis
# Create a function to return the number and type of dogs as a tuple
#def dogParse(doglist):
#  dogs = []
#  for dog in doglist:
#    (breed, start_x, start_y, end_x, end_y) = dog.split(',')
#    dogs.append((StringType(breed), int(start_x), int(start_y), int(end_x), int(end_y)))
#  return dogs
# Create a UDF
#udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs
#joined_df = joined_df.withColumn('dogs', udfDogParse(joined_df['dog_list']))

# Show the number of dogs in the first 10 rows
#joined_df.select('dogs').show(10)