In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Merge dataset pre-processing

In [None]:
# Install PySpark if not already installed
!pip install -q pyspark

# Import libraries
from pyspark.sql import SparkSession
from google.colab import drive
import os
import shutil
import glob

# Mount Google Drive
drive.mount('/content/drive')


spark = SparkSession.builder.appName("Data Preprocessing").getOrCreate()


input_path = "/content/drive/MyDrive/Big Data Final Project/Dataset/merge dataset/merged_data_w_sectors.csv"
temp_output_dir = "/content/cleaned_temp_output"
final_dir = "/content/drive/MyDrive/Big Data Final Project/Dataset/merge dataset/Cleaned_Dataset"
final_output_path = f"{final_dir}/final_merge_dataset.csv"


df = spark.read.option("header", "true").csv(input_path)

# Drop 'symbol' and 'Listing Date' columns
df_cleaned = df.drop("symbol", "Listing Date")

# Sort alphabetically by 'Company Name'
df_sorted = df_cleaned.orderBy("Company Name")

# Show cleaned and sorted sample
print("Cleaned & Sorted Data Preview:")
df_sorted.show()

# Save the cleaned and sorted data
os.makedirs(final_dir, exist_ok=True)

# Write to a temporary output directory
df_sorted.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_dir)

# Move the actual part file to the final location and rename
part_files = glob.glob(f"{temp_output_dir}/part-*.csv")
if len(part_files) != 1:
    raise ValueError("Expected exactly one part file, but found multiple or none.")
shutil.move(part_files[0], final_output_path)

# Remove the temporary directory
shutil.rmtree(temp_output_dir)

print("✅ Cleaned and sorted dataset saved as 'final_merge_dataset.csv' in your Google Drive!")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Cleaned & Sorted Data Preview:
+----+----+----+----+----------+---------+----+---+---+---+---+----+------+------------+-------------------+------------+--------+---------+
|   c|   h|   l|   o|         t|        v|   y|  m|  d|  w| wd|last|change|     pchange|       Company Name|Stock Symbol|  Sector|Subsector|
+----+----+----+----+----------+---------+----+---+---+---+---+----+------+------------+-------------------+------------+--------+---------+
| 6.7| 6.7| 6.7| 6.7|10/12/2013|  53800.0|2013| 12| 10| 50|  1|6.66|  0.04| 0.006006006|8990 Holdings, Inc.|       HOUSE|Property| Property|
|7.97|8.18|7.96|8.18|15/07/2014|3709800.0|2014|  7| 15| 29|  1| 8.0| -0.03|    -0.00375|8990 Holdings, Inc.|       HOUSE|Property| Property|
| 5.4|5.54|5.28| 5.3| 3/11/2017|1213800.0|2017| 11|  3| 44|  4|5.33|  0.07| 0.013133208|8990 Holdings, Inc.|       HOUSE|Property| Prop

Final Preprocessing: stocks and data.csv

In [None]:
# Install PySpark if not already installed
!pip install -q pyspark

# Import libraries
from pyspark.sql import SparkSession
from google.colab import drive
import os
import shutil
import glob

# Mount Google Drive
drive.mount('/content/drive')

# Initialize Spark session
spark = SparkSession.builder.appName("Data Preprocessing").getOrCreate()

# Define paths
base_path = "/content/drive/MyDrive/Big Data Final Project/Dataset"
input_file = f"{base_path}/dataset_with_NULL_values/data_with_nulls.csv"
output_folder = f"{base_path}/Cleaned_Dataset"
final_output_path = f"{output_folder}/cleaned_data_V2.csv"
temp_output_dir = f"{output_folder}/temp_output"
final_dir = output_folder

# Load dataset
df = spark.read.option("header", "true").csv(input_file)

# Drop rows with null values
df = df.dropna()

# Drop unnecessary columns
df_cleaned = df.drop("w", "wd", "last", "pchange")

# Sort alphabetically by 'symbol'
df_sorted = df_cleaned.orderBy("symbol")

# Show cleaned and sorted sample
print("Cleaned & Sorted Data Preview:")
df_sorted.show(10)

# Save the cleaned and sorted data
os.makedirs(final_dir, exist_ok=True)

# Write to a temporary output directory
df_sorted.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_dir)

# Move the actual part file to the final location and rename
part_files = glob.glob(f"{temp_output_dir}/part-*.csv")
if len(part_files) != 1:
    raise ValueError("Expected exactly one part file, but found multiple or none.")
shutil.move(part_files[0], final_output_path)

# Remove the temporary directory
shutil.rmtree(temp_output_dir)

print("✅ Cleaned and sorted dataset saved as 'cleaned_data_V2.csv' in your Google Drive!")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Cleaned & Sorted Data Preview:
+----+----+----+----+----------+------+----+---+---+------+------+
|   c|   h|   l|   o|         t|     v|   y|  m|  d|change|symbol|
+----+----+----+----+----------+------+----+---+---+------+------+
|1.66|1.66|1.66|1.66|03/04/2006|250000|2006|  4|  3|     0|   2GO|
|1.64|1.64|1.64|1.64|04/04/2006| 13000|2006|  4|  4| -0.02|   2GO|
|1.64|1.64| 1.6| 1.6|12/04/2006|320000|2006|  4| 12|     0|   2GO|
|1.68|1.68|1.68|1.68|20/04/2006|  1000|2006|  4| 20|  0.04|   2GO|
|1.68|1.68|1.68|1.68|21/04/2006|  3000|2006|  4| 21|     0|   2GO|
|1.68|1.68|1.66|1.66|24/04/2006|  3000|2006|  4| 24|     0|   2GO|
|1.66|1.68|1.66|1.68|25/04/2006| 23000|2006|  4| 25| -0.02|   2GO|
|1.66|1.66|1.66|1.66|26/04/2006|  3000|2006|  4| 26|     0|   2GO|
| 1.6|1.66| 1.6|1.66|27/04/2006| 76000|2006|  4| 27| -0.06|   2GO|
|1.22|1.22|1.22|1.22|17/08/2006|  20

In [None]:
# Install PySpark if not already installed
!pip install -q pyspark

# Import required libraries
from pyspark.sql import SparkSession
from google.colab import drive
import os
import shutil
import glob

# Step 1: Mount Google Drive
drive.mount('/content/drive')

# Step 2: Start a Spark session
spark = SparkSession.builder.appName("Data Preprocessing").getOrCreate()

# Step 3: Define correct paths
base_path = "/content/drive/MyDrive/Big Data Final Project/Dataset"
input_file = f"{base_path}/dataset_with_NULL_values/stocks_with_nulls.csv"
output_folder = f"{base_path}/Cleaned_Dataset"
final_output_path = f"{output_folder}/cleaned_stocks_V2.csv"
temp_output_dir = "/content/cleaned_temp_output"

# Step 4: Validate input file
if not os.path.exists(input_file):
    raise FileNotFoundError(f"Input file not found: {input_file}")

# Step 5: Load the dataset
df = spark.read.option("header", "true").csv(input_file)

# Step 6: Drop rows with null values and duplicates
df_cleaned = df.dropna().dropDuplicates()

# Sort alphabetically by 'Stock Name'
df_sorted = df_cleaned.orderBy("Stock Name")

# Step 7: Show sample of cleaned data
print("Cleaned Data:")
df_sorted.show()
print(f"Cleaned Row Count: {df_sorted.count()}")

# Step 8: Save the cleaned data as a single CSV
os.makedirs(output_folder, exist_ok=True)

# Write to temporary output directory
df_sorted.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_output_dir)

# Move the actual part file to the final location and rename
part_files = glob.glob(f"{temp_output_dir}/part-*.csv")
if len(part_files) != 1:
    raise ValueError("Expected exactly one part file, but found multiple or none.")
shutil.move(part_files[0], final_output_path)

# Remove temporary Spark output folder
shutil.rmtree(temp_output_dir)

print("✅ Cleaned dataset saved as a single CSV in your Google Drive!")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Cleaned Data:
+-------------+----+------------+-----+-----+-----+-----+-------+-------+
|   Stock Name|Code|        Date|Price| Open| High|  Low| Volume|Change%|
+-------------+----+------------+-----+-----+-----+-----+-------+-------+
|2GO Group Inc| 2GO|Aug 26, 2016|  7.3| 7.25|  7.3| 7.25|128.10K|  0.69%|
|2GO Group Inc| 2GO|Nov 20, 2019| 10.1| 10.5| 10.5|   10| 17.70K|  1.00%|
|2GO Group Inc| 2GO|Dec 20, 2018|14.18|14.12|14.48|14.02|356.90K|  1.14%|
|2GO Group Inc| 2GO|Aug 19, 2015|   10| 9.48|   10| 9.48|803.10K|  5.49%|
|2GO Group Inc| 2GO|Oct 02, 2015|  7.6|  7.9|  7.9| 7.55|318.50K| -3.18%|
|2GO Group Inc| 2GO|Aug 09, 2017| 21.2| 22.3| 22.7| 20.4|579.00K| -7.02%|
|2GO Group Inc| 2GO|Sep 14, 2016| 7.28| 7.19| 7.28| 7.14|108.00K|  1.11%|
|2GO Group Inc| 2GO|Jan 09, 2020|  9.6| 9.52|  9.9| 9.52| 11.90K|  0.84%|
|2GO Group Inc| 2GO|Aug 27, 2015|  9.2| 9.4