# Data Extract and Transformation

Script to download and transform Stock Market Dataset - historical daily prices of Nasdaq-traded stocks and ETFs.

In [9]:
# Import the necessary dependencies.
import pandas as pd
import numpy as np
import os
import sys
import zipfile
import shutil
import pyspark

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, Row
from pyspark.sql.types import StructField, StructType, FloatType, StringType, DateType
from pyspark.sql.functions import min, max, year, month, dayofmonth



In [2]:
# Download the dataset from kaggle
# !kaggle datasets download -d jacksoncrow/stock-market-dataset -p "C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\raw_data"

In [3]:
# Download the metadata
# !kaggle datasets metadata jacksoncrow/stock-market-dataset -p "C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\raw_data"

In [4]:
# Path to the downloaded zip file and the folder to extarct the files to.
zipped_file = r"C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\raw_data\stock-market-dataset.zip"
extracted = r"C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\extracted"

In [5]:
# Extracting the zip file
if not os.path.exists(extracted):
    os.makedirs(extracted)
    print(f"Directory created: '{extracted}'")
else:
    print(f"Directory '{extracted}' already exists.")

# Extract files to the destination directory
with zipfile.ZipFile(zipped_file, 'r') as zip_ref:
    zip_ref.extractall(path=extracted)

print(f"All files extracted to: '{extracted}'")

Directory 'C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\extracted' already exists.


FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\USER\\Desktop\\Projects\\Data-Profiling-and-Quality-Testing\\data\\extracted\\etfs\\PRN.csv'

Further manual operations on the zip file shows that the error above is due to fact that the PRN.csv file is named after some windows reserved names for specific operations. In this case, PRN - print, for print operation.
Thus, to extract the files in the zip file, the PRN.csv file would be renamed to PRN_FILE.csv before extraction.

In [6]:
# Extract the zip file
with zipfile.ZipFile(zipped_file, 'r') as zip_ref:
    for file in zip_ref.infolist():
        original_filename = file.filename
        # Ensure filename replacements are correct and assigned properly
        filename = original_filename.replace('PRN.csv', 'PRN_FILE.csv')
        
        # Define the full path for the extracted file
        path = os.path.join(extracted, filename)

        # Ensure the directory exists where the file will be extracted.
        directory = os.path.dirname(path)
        if not os.path.exists(directory):
            os.makedirs(directory, exist_ok=True)  # Use exist_ok=True
    
        # Extracting to path. Overwrite if files already exist in path.
        with zip_ref.open(file) as source, open(path, 'wb') as target:
            shutil.copyfileobj(source, target)

print(f"All files extracted to: {extracted}")


All files extracted to: C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\extracted


In [7]:
# Merge the stock files into one
def merge_stock_files(input_path, output_file):
    """
    Merge multiple stock CSV files in a director into a single CSV file.
    A new column 'Stock' will be added to the single CSV, containig the stock name derived from each file name.
    
    Arg:
    input_path: The path to the directory containing all the CSV files to be merged.
    output_path: The path to where the merged CSV file wil be saved.
    """

    # Empty list for the stocks names.
    stocks = []

    # Iterate over each file in the input path.
    for filename in os.listdir(input_path):
        if filename.endswith('.csv'):
            # Define the full path to the file
            file_path = os.path.join(input_path, filename)

            # Read the CSV file paths into a dataframe
            df = pd.read_csv(file_path)

            # Extract the stock name from the filename
            stock_name = filename.replace('.csv', '')

            # Add a new column with the stock name
            df['Stcoks'] = stock_name

            # Append the dataframe to the list
            stocks.append(df)

    # Concatenate all dataframes into one
    combined_df = pd.concat(stocks)

    # Save the combined dataframe to a CSV file
    combined_df.to_csv(output_file, index=True)

    print(f"Combined CSV file created at '{output_file}'")


In [10]:
# Merge the etfs CSV
input_path = r"C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\extracted\etfs"
output_file = r"C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\merged\etfs\etfs.csv"
merge_stock_files(input_path, output_file)

Combined CSV file created at 'C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\merged\etfs\etfs.csv'


In [11]:
# Merge the stocks CSV
input_path = r"C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\extracted\stocks"
output_file = r"C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\merged\stocks\stocks.csv"
merge_stock_files(input_path, output_file)

Combined CSV file created at 'C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\merged\stocks\stocks.csv'


Performing exploratory analysis on the created files

In [12]:
# Set up the environment vairables for PySpark
os.environ['PYSPARK_PYTHON'] = sys.executable

os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [13]:
# Define SparkContext and SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [14]:
# Read the CSV files
etfs = spark.read.csv(r"C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\merged\etfs\etfs.csv", header=True)
stocks = spark.read.csv(r"C:\Users\USER\Desktop\Projects\Data-Profiling-and-Quality-Testing\data\merged\stocks\stocks.csv", header=True)

In [15]:
# Dimension of the datasets

print(f"The etfs dataset has ({etfs.count()}, {len(etfs.columns)}) dimension.")
print(f"The stocks dataset has ({stocks.count()}, {len(stocks.columns)}) dimension.")

The etfs dataset has (3954316, 9) dimension.
The stocks dataset has (24197442, 9) dimension.


In [16]:
# Check dataset schema
etfs.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Stcoks: string (nullable = true)



In [17]:
stocks.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Stcoks: string (nullable = true)



In [18]:
etfs.show(5)

+---+----------+------------------+------------------+------------------+------------------+------------------+--------+------+
|_c0|      Date|              Open|              High|               Low|             Close|         Adj Close|  Volume|Stcoks|
+---+----------+------------------+------------------+------------------+------------------+------------------+--------+------+
|  0|2018-08-15| 11.84000015258789| 11.84000015258789|11.739999771118164|11.739999771118164|11.739999771118164| 27300.0|  AAAU|
|  1|2018-08-16|11.779999732971191|11.800000190734863|11.739999771118164|11.739999771118164|11.739999771118164|428400.0|  AAAU|
|  2|2018-08-17|11.800000190734863| 11.81999969482422|11.770000457763672| 11.81999969482422| 11.81999969482422| 52400.0|  AAAU|
|  3|2018-08-20|11.880000114440918| 11.90999984741211|11.850000381469728|11.899999618530272|11.899999618530272| 28700.0|  AAAU|
|  4|2018-08-21|11.920000076293944|11.949999809265137|11.890000343322754| 11.93000030517578| 11.93000030

In [19]:
stocks.show(5)

+---+----------+------------------+------------------+------------------+------------------+------------------+----------+------+
|_c0|      Date|              Open|              High|               Low|             Close|         Adj Close|    Volume|Stcoks|
+---+----------+------------------+------------------+------------------+------------------+------------------+----------+------+
|  0|1999-11-18| 32.54649353027344|   35.765380859375|28.612302780151367|31.473533630371094| 27.06866455078125|62546300.0|     A|
|  1|1999-11-19|30.713520050048828| 30.75822639465332| 28.47818374633789|28.880542755126957| 24.83857727050781|15234100.0|     A|
|  2|1999-11-22|29.551143646240234|31.473533630371094| 28.65700912475586|31.473533630371094| 27.06866455078125| 6577800.0|     A|
|  3|1999-11-23| 30.40057182312012|31.205293655395508|28.612302780151367|28.612302780151367|24.607879638671875| 5975600.0|     A|
|  4|1999-11-24|28.701717376708984|29.998210906982425|28.612302780151367|29.37231826782226

In [20]:
# Loop through the columns in both dataset.
# Convert to lowercase and replace white space with underscore.

for column in etfs.columns:
    etfs = etfs.withColumnRenamed(column, '_'.join(column.split()).lower())

for column in stocks.columns:
    stocks = stocks.withColumnRenamed(column, '_'.join(column.split()).lower())

In [21]:
print(etfs.columns)
print(stocks.columns)

['_c0', 'date', 'open', 'high', 'low', 'close', 'adj_close', 'volume', 'stcoks']
['_c0', 'date', 'open', 'high', 'low', 'close', 'adj_close', 'volume', 'stcoks']


In [22]:
# Drop the _c0 column in both datasets
etfs = etfs.drop("_c0")
stocks = stocks.drop("_c0")

In [23]:
# Confirm the drop action by checking dimension.

print(f"The etfs dataset has ({etfs.count()}, {len(etfs.columns)}) dimension.")
print(etfs.columns)
print(f"The stocks dataset has ({stocks.count()}, {len(stocks.columns)}) dimension.")
print(stocks.columns)

The etfs dataset has (3954316, 8) dimension.
['date', 'open', 'high', 'low', 'close', 'adj_close', 'volume', 'stcoks']
The stocks dataset has (24197442, 8) dimension.
['date', 'open', 'high', 'low', 'close', 'adj_close', 'volume', 'stcoks']


In [24]:
# Typecast to convert the stringtypes datatypes in the dataset to their appropriate datatypes.

schema = StructType([
    StructField('date', DateType(), True),
    StructField('open', FloatType(), True),
    StructField('high', FloatType(), True),
    StructField('low', FloatType(), True),
    StructField('close', FloatType(), True),
    StructField('adj_close', FloatType(), True),
    StructField('volume', FloatType(), True),
    StructField('stocks', StringType(), True),
])

In [25]:
# Apply the schema to the Dataframe

etfs_data = etfs.select([etfs[column].cast(schema.fields[i].dataType) for i, column in enumerate(etfs.columns)])
stocks_data = stocks.select([stocks[column].cast(schema.fields[i].dataType) for i, column in enumerate(stocks.columns)])

In [26]:
etfs_data.printSchema()

root
 |-- date: date (nullable = true)
 |-- open: float (nullable = true)
 |-- high: float (nullable = true)
 |-- low: float (nullable = true)
 |-- close: float (nullable = true)
 |-- adj_close: float (nullable = true)
 |-- volume: float (nullable = true)
 |-- stcoks: string (nullable = true)



In [27]:
stocks_data.printSchema()

root
 |-- date: date (nullable = true)
 |-- open: float (nullable = true)
 |-- high: float (nullable = true)
 |-- low: float (nullable = true)
 |-- close: float (nullable = true)
 |-- adj_close: float (nullable = true)
 |-- volume: float (nullable = true)
 |-- stcoks: string (nullable = true)



In [28]:
# Obtaining the date range for the etfs dataset.
date_range = etfs_data.select(min("date").alias("min_date"), max("date").alias("max_date")).first()

# Retrieve the minimum and maximum dates
min_date = date_range["min_date"]
max_date = date_range["max_date"]

print(f"The date range for the etfs stocks data is: ({min_date}, {max_date}).")

The date range for the etfs stocks data is: (1986-04-03, 2020-04-01).


In [29]:
# Obtaining the date range for the stocks dataset.
date_range = stocks_data.select(min("date").alias("min_date"), max("date").alias("max_date")).first()

# Retrieve the minimum and maximum dates
min_date = date_range["min_date"]
max_date = date_range["max_date"]

print(f"The date range for the stocks data is: ({min_date}, {max_date}).")

The date range for the stocks data is: (1962-01-02, 2020-04-02).


In [None]:
def organize_by_date(data, output_dir, file_name):
    """
    Function to re-organize a stock historical dataset.
    Organize by date and save in a directory such as ".../year/month/day/file.csv"

    Args:
        data: the dataset to be organized.
        output_dir: the output directory.
        file_name: specifies the file name to be created.
    """
    # Extract year, month, and day from the date column.
    data = data.withColumn("year", F.year('date'))
    data = data.withColumn("month", F.month('date'))
    data = data.withColumn("day", F.dayofmonth('date'))

    # Repartition data to improve parallelism
    data = data.repartition("year", "month", "day")

    # Group data by year, month, and day and aggregate to a single CSV file
    grouped_df = data.groupBy("year", "month", "day").agg(F.collect_list(F.struct(*data.columns)).alias("data"))

    # Iterate over each group and write to a single CSV file
    for row in grouped_df.collect():
        year_val = row["year"]
        month_val = row["month"]
        day_val = row["day"]

        # Create the directory structure
        year_dir = os.path.join(output_dir, str(year_val))
        month_dir = os.path.join(year_dir, str(month_val))
        day_dir = os.path.join(month_dir, str(day_val))
        os.makedirs(day_dir, exist_ok=True)

        # Write data to CSV file
        file_path = os.path.join(day_dir, file_name)
        spark.createDataFrame(row["data"]).coalesce(1).write.csv(file_path, header=True, mode="overwrite")
    
    print(f"'{data}' re-organized and saved to: '{output_dir}'")

In [None]:

data = etfs_data  
output_dir = ''
file_name = "etf.csv"

organize_by_date(data, output_dir, file_name)
