# Pyspark Assignment

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, udf
from pyspark.sql.types import DoubleType, StringType

## Building SparkSession and loadting the dataset

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Bitcoin Transactions Analysis") \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .getOrCreate()

# Set Spark log level to ERROR to suppress warnings
spark.sparkContext.setLogLevel("ERROR")

# Load the dataset
file_path = "C:/Users/91628/Desktop/CLASSROOM/Semester 3/Big Data Framework/Week 7/Assignment/bitcoin_transactions.csv"  # Replace with the actual path if downloaded locally or in a cloud bucket

# Load the data with appropriate partitioning (adjust partition size based on data size and environment)
df = spark.read.csv(file_path, header=True, inferSchema=True)




24/11/05 08:21:49 WARN Utils: Your hostname, Anmols-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.4.2.150 instead (on interface en0)
24/11/05 08:21:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/05 08:21:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [6]:
# Show the schema of the dataset
df.printSchema()

# Show the first 10 rows
df.show(10)

root
 |-- trade_id: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- price: double (nullable = true)
 |-- side: string (nullable = true)
 |-- volume(quote): double (nullable = true)
 |-- size(base): double (nullable = true)

+------------------+-------------+--------+----+-------------+----------+
|          trade_id|    timestamp|   price|side|volume(quote)|size(base)|
+------------------+-------------+--------+----+-------------+----------+
|728569744763912192|1609430484000|28814.99| buy|   2103.49427|     0.073|
|728574157171720192|1609431536000| 28627.2| buy|     74.43072|    0.0026|
|728708911770632192|1609463664000| 29335.0|sell|    1041.3925|    0.0355|
|728711638068232192|1609464314000|29301.46| buy|    102.55511|    0.0035|
|728713911381000192|1609464856000|29345.12| buy|     88.03536|     0.003|
|728720152505352192|1609466344000|29390.18| buy|   111.682684|    0.0038|
|728725932256264192|1609467722000|29310.74| buy|  2142.615094|    0.0731|
|7287268256430161

In [7]:
# Count the total number of rows
total_rows = df.count()
print(f"Total number of rows: {total_rows}")




Total number of rows: 52345506


                                                                                

## 2. Data Cleaning and Transformation

#### Handling missing values: Remove or impute missing values.

In [10]:
# Step 1: Handling Missing Values
# Count rows before dropping missing values
initial_count = df.count()
print(f"Row count before dropping missing values: {initial_count}")

# Drop rows with missing values
df_cleaned = df.dropna()

# Count rows after dropping missing values
after_nan_count = df_cleaned.count()
print(f"Row count after dropping missing values: {after_nan_count}")


                                                                                

Row count before dropping missing values: 52345506




Row count after dropping missing values: 52345506


                                                                                

In [11]:
# Step 2: Dealing with Duplicate Records
# Count rows before removing duplicates
count_before_duplicates = df_cleaned.count()
print(f"Row count before removing duplicates: {count_before_duplicates}")

# Remove duplicates
df_no_duplicates = df_cleaned.dropDuplicates()

# Count rows after removing duplicates
count_after_duplicates = df_no_duplicates.count()
print(f"Row count after removing duplicates: {count_after_duplicates}")

# Show the first 10 rows after removing duplicates
print("First 10 rows after removing duplicates:")
df_no_duplicates.show(10)  # Display the DataFrame as a table


                                                                                

Row count before removing duplicates: 52345506


                                                                                

Row count after removing duplicates: 52345504
First 10 rows after removing duplicates:




+------------------+-------------+--------+----+-------------+----------+
|          trade_id|    timestamp|   price|side|volume(quote)|size(base)|
+------------------+-------------+--------+----+-------------+----------+
|729911318064136195|1609750340000|32033.83| buy|   634.269834|    0.0198|
|729934147660808194|1609755783000| 28700.0|sell|      1854.02|    0.0646|
|732579646188552210|1610386519000|33098.71| buy|   433.593101|    0.0131|
|732902582430728192|1610463513000|33408.74|sell|    651.47043|    0.0195|
|733214286326792193|1610537829000|35196.17| buy|   239.333956|    0.0068|
|734663925869576193|1610883450000|35120.83| buy|    17.560415|    5.0E-4|
|735050342902792192|1610975579000|36877.94| buy|   999.392174|    0.0271|
|735076490193928193|1610981813000|36400.01| buy|    47.320013|    0.0013|
|736144355798024193|1611236412000| 32561.0| buy|     182.3416|    0.0056|
|736406051007496192|1611298805000|31556.51|sell|   211.428617|    0.0067|
+------------------+-------------+----

                                                                                

In [12]:
from pyspark.sql.functions import to_timestamp, col

# Step 3: Data Type Conversion
# Show schema before conversion
print("Schema before conversion:")
df_no_duplicates.printSchema()

# Convert 'timestamp' to timestamp and 'trade_id' to string
df_converted = df_no_duplicates \
    .withColumn("timestamp", to_timestamp(col("timestamp") / 1000)) \
    .withColumn("trade_id", col("trade_id").cast("string"))

# Show schema after conversion
print("Schema after conversion:")
df_converted.printSchema()

# Show the first 10 rows after conversion
print("First 10 rows after data type conversion:")
df_converted.show(10)  # Display the DataFrame as a table


Schema before conversion:
root
 |-- trade_id: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- price: double (nullable = true)
 |-- side: string (nullable = true)
 |-- volume(quote): double (nullable = true)
 |-- size(base): double (nullable = true)

Schema after conversion:
root
 |-- trade_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- side: string (nullable = true)
 |-- volume(quote): double (nullable = true)
 |-- size(base): double (nullable = true)

First 10 rows after data type conversion:




+------------------+-------------------+--------+----+-------------+----------+
|          trade_id|          timestamp|   price|side|volume(quote)|size(base)|
+------------------+-------------------+--------+----+-------------+----------+
|729911318064136195|2021-01-04 03:52:20|32033.83| buy|   634.269834|    0.0198|
|729934147660808194|2021-01-04 05:23:03| 28700.0|sell|      1854.02|    0.0646|
|732579646188552210|2021-01-11 12:35:19|33098.71| buy|   433.593101|    0.0131|
|732902582430728192|2021-01-12 09:58:33|33408.74|sell|    651.47043|    0.0195|
|733214286326792193|2021-01-13 06:37:09|35196.17| buy|   239.333956|    0.0068|
|734663925869576193|2021-01-17 06:37:30|35120.83| buy|    17.560415|    5.0E-4|
|735050342902792192|2021-01-18 08:12:59|36877.94| buy|   999.392174|    0.0271|
|735076490193928193|2021-01-18 09:56:53|36400.01| buy|    47.320013|    0.0013|
|736144355798024193|2021-01-21 08:40:12| 32561.0| buy|     182.3416|    0.0056|
|736406051007496192|2021-01-22 02:00:05|

                                                                                

In [13]:
# Step 4: Filtering Out Invalid Rows
# Assuming price cannot be negative and volume must be greater than zero
invalid_rows_count = df_converted.filter((col("price") < 0) | (col("volume(quote)") <= 0)).count()
print(f"Number of invalid rows found: {invalid_rows_count}")

# Filter out invalid rows
df_valid = df_converted.filter((col("price") >= 0) & (col("volume(quote)") > 0))

# Show row count after filtering
valid_rows_count = df_valid.count()
print(f"Row count after filtering invalid rows: {valid_rows_count}")
df_converted.show(10)  # Display the DataFrame as a table


                                                                                

Number of invalid rows found: 0


                                                                                

Row count after filtering invalid rows: 52345504




+------------------+-------------------+--------+----+-------------+----------+
|          trade_id|          timestamp|   price|side|volume(quote)|size(base)|
+------------------+-------------------+--------+----+-------------+----------+
|729911318064136195|2021-01-04 03:52:20|32033.83| buy|   634.269834|    0.0198|
|729934147660808194|2021-01-04 05:23:03| 28700.0|sell|      1854.02|    0.0646|
|732579646188552210|2021-01-11 12:35:19|33098.71| buy|   433.593101|    0.0131|
|732902582430728192|2021-01-12 09:58:33|33408.74|sell|    651.47043|    0.0195|
|733214286326792193|2021-01-13 06:37:09|35196.17| buy|   239.333956|    0.0068|
|734663925869576193|2021-01-17 06:37:30|35120.83| buy|    17.560415|    5.0E-4|
|735050342902792192|2021-01-18 08:12:59|36877.94| buy|   999.392174|    0.0271|
|735076490193928193|2021-01-18 09:56:53|36400.01| buy|    47.320013|    0.0013|
|736144355798024193|2021-01-21 08:40:12| 32561.0| buy|     182.3416|    0.0056|
|736406051007496192|2021-01-22 02:00:05|

                                                                                

In [14]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.sql.functions import col
from pyspark.ml.functions import vector_to_array

# Step 1: Assemble numeric columns into a feature vector
assembler = VectorAssembler(
    inputCols=["price", "volume(quote)", "size(base)"], 
    outputCol="features"
)
df_features = assembler.transform(df_converted)

# Step 2: Apply MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(df_features)
scaled_data = scaler_model.transform(df_features)

# Step 3: Convert vector to array and extract individual values
scaled_df = scaled_data.withColumn(
    "scaled_array", vector_to_array("scaledFeatures")
).select(
    "trade_id", "timestamp", "price", "volume(quote)", "size(base)", "side",
    col("scaled_array")[0].alias("scaledPrice"),
    col("scaled_array")[1].alias("scaledVolume"),
    col("scaled_array")[2].alias("scaledSize")
)

# Step 4: Display the normalized table with the first 10 rows
print("First 10 rows after normalization:")
scaled_df.show(10, truncate=False)

                                                                                

First 10 rows after normalization:




+------------------+-------------------+--------+-------------+----------+----+-------------------+---------------------+---------------------+
|trade_id          |timestamp          |price   |volume(quote)|size(base)|side|scaledPrice        |scaledVolume         |scaledSize           |
+------------------+-------------------+--------+-------------+----------+----+-------------------+---------------------+---------------------+
|729911318064136195|2021-01-04 03:52:20|32033.83|634.269834   |0.0198    |buy |0.3093404058307332 |1.0255716472090987E-4|7.7799377014779E-5   |
|729934147660808194|2021-01-04 05:23:03|28700.0 |1854.02      |0.0646    |sell|0.24698980289359698|2.9979829285690056E-4|2.5383918156359967E-4|
|732579646188552210|2021-01-11 12:35:19|33098.71|433.593101   |0.0131    |buy |0.32925621139241373|7.010649874775602E-5 |5.1471995530915214E-5|
|732902582430728192|2021-01-12 09:58:33|33408.74|651.47043    |0.0195    |sell|0.3350545146738049 |1.0533860722792521E-4|7.6620539037889

                                                                                

## 3. Data Analysis Using Spark SQL

In [16]:
# Register the DataFrame as a temporary view
scaled_df.createOrReplaceTempView("trades")


In [17]:
# Calculate summary statistics for numerical columns
summary_stats = spark.sql("""
SELECT 
    AVG(price) AS avg_price,
    AVG(`volume(quote)`) AS avg_volume_quote,  -- Use backticks to reference columns with special characters
    AVG(`size(base)`) AS avg_size_base,        -- Same here for size(base)
    STDDEV(price) AS stddev_price,
    STDDEV(`volume(quote)`) AS stddev_volume_quote,
    STDDEV(`size(base)`) AS stddev_size_base
FROM trades
""")

# Show the summary statistics
summary_stats.show()





+-----------------+------------------+-------------------+------------------+-------------------+------------------+
|        avg_price|  avg_volume_quote|      avg_size_base|      stddev_price|stddev_volume_quote|  stddev_size_base|
+-----------------+------------------+-------------------+------------------+-------------------+------------------+
|33051.15537243369|4012.0078153470267|0.14212028672807997|11497.223661123615|  7913.010948233298|0.3057933820355615|
+-----------------+------------------+-------------------+------------------+-------------------+------------------+



                                                                                

In [18]:
#Grouping and filtering: Group data by specific categories and calculate aggregations foreach group.
# Group by 'side' and calculate the total volume for each side
side_stats = spark.sql("""
SELECT 
    side, 
    SUM(`volume(quote)`) AS total_volume_quote,
    AVG(price) AS avg_price
FROM trades
GROUP BY side
HAVING total_volume_quote > 1000  -- Example filter condition
""")

# Show the grouped stats
side_stats.show()





+----+--------------------+------------------+
|side|  total_volume_quote|         avg_price|
+----+--------------------+------------------+
| buy|1.048673583319543...|33117.025192550886|
|sell|1.051432128143213...| 32985.63472709124|
+----+--------------------+------------------+



                                                                                

In [19]:
#time based analysis
# Analyze daily trends of average price over time
daily_trends = spark.sql("""
SELECT 
    DATE(timestamp) AS trade_date, 
    AVG(price) AS avg_daily_price
FROM trades
GROUP BY trade_date
ORDER BY trade_date
""")

# Show the daily trends
daily_trends.show()





+----------+------------------+
|trade_date|   avg_daily_price|
+----------+------------------+
|2020-12-31|29192.522222222222|
|2021-01-01|29235.046451612903|
|2021-01-02| 31085.51416666667|
|2021-01-03| 33514.34820512821|
|2021-01-04|31233.909856115108|
|2021-01-05| 32658.13606557377|
|2021-01-06| 35513.20644230769|
|2021-01-07| 38173.10241935485|
|2021-01-08| 40384.07938271606|
|2021-01-09|        40236.8292|
|2021-01-10|          37931.26|
|2021-01-11|33892.100806451606|
|2021-01-12|34299.704556962024|
|2021-01-13| 35270.78153846154|
|2021-01-14| 38840.59712121212|
|2021-01-15| 36712.81170454545|
|2021-01-16|37088.703333333346|
|2021-01-17|         35435.637|
|2021-01-18| 36612.22265822785|
|2021-01-19|36754.923571428575|
+----------+------------------+
only showing top 20 rows



                                                                                

#### Joins

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Trade Data Analysis") \
    .getOrCreate()

# Define schema
schema = StructType([
    StructField("trade_id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("volume_quote", DoubleType(), True),
    StructField("size_base", DoubleType(), True),
    StructField("side", StringType(), True),
    StructField("scaledPrice", DoubleType(), True),
    StructField("scaledVolume", DoubleType(), True),
    StructField("scaledSize", DoubleType(), True),
])

# Sample data
data = [
    ("729911318064136195", "2021-01-04 03:52:20", 32033.83, 634.269834, 0.0198, "buy", 0.3093404058307332, 1.0255716472090987E-4, 7.7799377014779E-5),
    ("729934147660808194", "2021-01-04 05:23:03", 28700.0, 1854.02, 0.0646, "sell", 0.24698980289359698, 2.9979829285690056E-4, 2.5383918156359967E-4),
    ("732579646188552210", "2021-01-11 12:35:19", 33098.71, 433.593101, 0.0131, "buy", 0.32925621139241373, 7.010649874775602E-5, 5.1471995530915214E-5),
    ("732902582430728192", "2021-01-12 09:58:33", 33408.74, 651.47043, 0.0195, "sell", 0.3350545146738049, 1.0533860722792521E-4, 7.662053903788958E-5),
    ("733214286326792193", "2021-01-13 06:37:09", 35196.17, 239.333956, 0.0068, "buy", 0.36848373518232674, 3.8693596353010224E-5, 2.6716398016237305E-5),
    ("734663925869576193", "2021-01-17 06:37:30", 35120.83, 17.560415, 5.0E-4, "buy", 0.36707469680146043, 2.831446351422617E-6, 1.960800501559408E-6),
    ("735050342902792192", "2021-01-18 08:12:59", 36877.94, 999.392174, 0.0271, "buy", 0.3999368607170984, 1.6159970003404116E-4, 1.0648443445242165E-4),
    ("735076490193928193", "2021-01-18 09:56:53", 36400.01, 47.320013, 0.0013, "buy", 0.39099842544543023, 7.643756975361955E-6, 5.104368439931205E-6),
    ("736144355798024193", "2021-01-21 08:40:12", 32561.0, 182.3416, 0.0056, "buy", 0.3191997469940109, 2.9477580724636045E-5, 2.2001046108679613E-5),
    ("736406051007496192", "2021-01-22 02:00:05", 31556.51, 211.428617, 0.0067, "sell", 0.30041337901954185, 3.4181130850810605E-5, 2.6323452023940836E-5),
]

# Create DataFrame
trades_df = spark.createDataFrame(data, schema)

# Show the DataFrame
trades_df.show()


+------------------+-------------------+--------+------------+---------+----+-------------------+--------------------+--------------------+
|          trade_id|          timestamp|   price|volume_quote|size_base|side|        scaledPrice|        scaledVolume|          scaledSize|
+------------------+-------------------+--------+------------+---------+----+-------------------+--------------------+--------------------+
|729911318064136195|2021-01-04 03:52:20|32033.83|  634.269834|   0.0198| buy| 0.3093404058307332|1.025571647209098...|  7.7799377014779E-5|
|729934147660808194|2021-01-04 05:23:03| 28700.0|     1854.02|   0.0646|sell|0.24698980289359698|2.997982928569005...|2.538391815635996...|
|732579646188552210|2021-01-11 12:35:19|33098.71|  433.593101|   0.0131| buy|0.32925621139241373|7.010649874775602E-5|5.147199553091521...|
|732902582430728192|2021-01-12 09:58:33|33408.74|   651.47043|   0.0195|sell| 0.3350545146738049|1.053386072279252...|7.662053903788958E-5|
|733214286326792193|

In [22]:
# Sample user data
user_data = [
    ("729911318064136195", "user1"),
    ("729934147660808194", "user2"),
    ("732579646188552210", "user1"),
    ("732902582430728192", "user3"),
    ("733214286326792193", "user2"),
]

# Define schema for user data
user_schema = StructType([
    StructField("trade_id", StringType(), True),
    StructField("user_id", StringType(), True),
])

# Create DataFrame for user data
user_df = spark.createDataFrame(user_data, user_schema)

# Show the user DataFrame
user_df.show()


+------------------+-------+
|          trade_id|user_id|
+------------------+-------+
|729911318064136195|  user1|
|729934147660808194|  user2|
|732579646188552210|  user1|
|732902582430728192|  user3|
|733214286326792193|  user2|
+------------------+-------+



In [23]:
# Perform the join
joined_df = trades_df.join(user_df, on="trade_id", how="inner")

# Show the joined DataFrame
joined_df.show()


+------------------+-------------------+--------+------------+---------+----+-------------------+--------------------+--------------------+-------+
|          trade_id|          timestamp|   price|volume_quote|size_base|side|        scaledPrice|        scaledVolume|          scaledSize|user_id|
+------------------+-------------------+--------+------------+---------+----+-------------------+--------------------+--------------------+-------+
|729911318064136195|2021-01-04 03:52:20|32033.83|  634.269834|   0.0198| buy| 0.3093404058307332|1.025571647209098...|  7.7799377014779E-5|  user1|
|729934147660808194|2021-01-04 05:23:03| 28700.0|     1854.02|   0.0646|sell|0.24698980289359698|2.997982928569005...|2.538391815635996...|  user2|
|732579646188552210|2021-01-11 12:35:19|33098.71|  433.593101|   0.0131| buy|0.32925621139241373|7.010649874775602E-5|5.147199553091521...|  user1|
|732902582430728192|2021-01-12 09:58:33|33408.74|   651.47043|   0.0195|sell| 0.3350545146738049|1.0533860722792

#### 4. Machine Learning Model (Regression/Classification)

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline



In [3]:
# Initialize Spark session

spark = SparkSession.builder.appName("TradeSideClassification").config("spark.driver.memory", "8g").config("spark.executor.memory", "4g").getOrCreate()


# Load the dataset
df = spark.read.csv("bitcoin_transactions.csv", header=True, inferSchema=True)

# Check schema to ensure correct data types
df.printSchema()

# Show a sample of the data
df.show(10)

24/11/05 08:47:42 WARN Utils: Your hostname, Anmols-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.4.2.150 instead (on interface en0)
24/11/05 08:47:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/05 08:47:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

root
 |-- trade_id: long (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- price: double (nullable = true)
 |-- side: string (nullable = true)
 |-- volume(quote): double (nullable = true)
 |-- size(base): double (nullable = true)

+------------------+-------------+--------+----+-------------+----------+
|          trade_id|    timestamp|   price|side|volume(quote)|size(base)|
+------------------+-------------+--------+----+-------------+----------+
|728569744763912192|1609430484000|28814.99| buy|   2103.49427|     0.073|
|728574157171720192|1609431536000| 28627.2| buy|     74.43072|    0.0026|
|728708911770632192|1609463664000| 29335.0|sell|    1041.3925|    0.0355|
|728711638068232192|1609464314000|29301.46| buy|    102.55511|    0.0035|
|728713911381000192|1609464856000|29345.12| buy|     88.03536|     0.003|
|728720152505352192|1609466344000|29390.18| buy|   111.682684|    0.0038|
|728725932256264192|1609467722000|29310.74| buy|  2142.615094|    0.0731|
|7287268256430161

                                                                                

In [4]:
# Split the dataset into training and testing sets (80% train, 20% test)
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Define the feature columns that exist in the dataset
feature_columns = ["price", "volume(quote)", "size(base)"]
label_column = "side"

In [5]:
# Step 1: Convert the `side` column from string to numeric index
indexer = StringIndexer(inputCol=label_column, outputCol="label")
indexed_df = indexer.fit(df).transform(df)

# Re-split the indexed data into training and testing sets
train_data, test_data = indexed_df.randomSplit([0.8, 0.2], seed=42)

# Step 2: Combine features into a single vector
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")


24/11/05 08:47:54 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

In [9]:
# Step 3: Initialize the Logistic Regression model
#logistic_regression = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

logistic_regression = LogisticRegression(featuresCol="features", labelCol="label", maxIter=5)


# Step 4: Create a pipeline to streamline the process
pipeline = Pipeline(stages=[vector_assembler, logistic_regression])


In [11]:

train_data_sample = train_data.sample(withReplacement=False, fraction=0.1, seed=42)  # 10% sample


In [13]:
train_data_sample = train_data.sample(fraction=0.1, seed=42)  # Adjust fraction as needed
test_data_sample = test_data.sample(fraction=0.1, seed=42)
# Train the model
model = pipeline.fit(train_data_sample)

# Make predictions on the test set
predictions = model.transform(test_data_sample)

24/11/05 08:48:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/11/05 08:48:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

In [17]:
# Initialize evaluators for accuracy and F1-score
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Calculate accuracy and F1-score
accuracy = accuracy_evaluator.evaluate(predictions)
f1_score = f1_evaluator.evaluate(predictions)

# Print evaluation metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")





Accuracy: 0.5011
F1 Score: 0.4979


                                                                                

## 5. Model Tuning and Evaluation

In [20]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 1: Define a parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(logistic_regression.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(logistic_regression.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(logistic_regression.maxIter, [5, 10, 20]) \
    .build()



In [22]:
# Step 2: Set up the evaluator (we'll use F1-score as the primary metric)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Step 3: Initialize cross-validator
cross_validator = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5  # Set the number of folds for cross-validation
)



In [24]:
# Step 4: Fit the cross-validator to the training data
cv_model = cross_validator.fit(train_data_sample)

# Step 5: Make predictions on the test set with the best model from cross-validation
best_model_predictions = cv_model.transform(test_data_sample)



                                                                                

In [26]:
# Evaluate final model on the test data
accuracy = accuracy_evaluator.evaluate(best_model_predictions)
f1_score = f1_evaluator.evaluate(best_model_predictions)

# Print final evaluation metrics
print("Final Model Evaluation Metrics:")
print(f"Accuracy: {accuracy:.4f}")
print(f"F1 Score: {f1_score:.4f}")




Final Model Evaluation Metrics:
Accuracy: 0.5011
F1 Score: 0.4989


                                                                                