---
Author: Mustapha Bouhsen <br>
[LinkedIn](https://www.linkedin.com/in/mustapha-bouhsen/)<br>
[Git](https://github.com/mus514)<br>
Date: February 2, 2024<br>
---

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
from pyspark.sql import functions as F
import pandas as pd
import numpy as np
from datetime import datetime
import json


### Load files from Azure blob storage : Set the data location and type


In [0]:
# storage_account_name = "mymlprojects"
# storage_key = "?sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2024-03-09T09:14:29Z&st=2024-02-03T01:14:29Z&spr=https&sig=v%2Bmvq02eWWEzGfaXqGJ%2F8BJiTJrD3PPGS4eL66SIsC8%3D"

# container_name = "prod"
# mount_point = "/mnt/prod"

# dbutils.fs.mount(
#   source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
#   mount_point = mount_point,
#   extra_configs = {f"fs.azure.sas.{container_name}.{storage_account_name}.blob.core.windows.net":storage_key})

In [0]:
def json_to_spark_df(file_path, schema):
    """
    Reads a JSON file into a Spark DataFrame, processes the data, and returns a new Spark DataFrame.

    Parameters:
    - file_path (str): The path to the JSON file.
    - schema (pyspark.sql.types.StructType): The schema of the resulting Spark DataFrame.

    Returns:
    - DataFrame: Spark DataFrame containing the processed data.

    """
    # Read JSON file into a Spark DataFrame
    data = spark.read.json(file_path).collect()

    # Extract the actual data from the collected DataFrame
    # Assuming the JSON file contains a single row of data
    df = pd.DataFrame(data[0][1].asDict()).T.reset_index()

    # Create a Spark DataFrame from the pandas DataFrame, specifying the schema
    return spark.createDataFrame(df, schema=schema)

In [0]:
def get_files_paths_from_folders(folder_path, endsWith=None):
    """
    Recursively retrieves the paths of all files within the specified folder and its subfolders.

    Parameters:
    - folder_path (str): The path to the folder for which file paths are to be retrieved.
    - endsWith (str, optional): The suffix to filter files by. Defaults to ".parquet".

    Returns:
    - List[str]: A list containing the paths of all files within the specified folder and its subfolders that end with the specified suffix.
    """
    # Get the list of paths (files and subfolders) within the specified folder
    paths = dbutils.fs.ls(folder_path)

    # Initialize an empty list to store file paths
    my_paths = []

    # Iterate through the paths to identify files and subfolders
    for key in paths:
        # Check if the current path corresponds to a file
        if key.isFile():
            # If it's a file, append its path to the list
            my_paths.append(key[0])
        else:
            # If it's a subfolder, recursively call the function to get file paths within the subfolder
            my_paths = my_paths + get_files_paths_from_folders(key[0])

    if endsWith != None:
        # Filter the list of paths to include only those ending with the specified suffix
        my_paths = [path for path in my_paths if path.endswith(endsWith)]

    # Return the final list of file paths
    return my_paths

In [0]:
def delete_contents_recursively(folder_path):
    # List all files and subdirectories in the folder
    contents = dbutils.fs.ls(folder_path)

    # Delete each file and subdirectory
    for content in contents:
        if content.isDir():
            # Recursively delete contents of subfolder
            delete_contents_recursively(content.path)
        else:
            # Delete file
            dbutils.fs.rm(content.path)

    # After deleting all contents, delete the folder itself
    dbutils.fs.rm(folder_path)

In [0]:
def ingest_and_transform_to_parquet(files_paths, prod_folder_path, stock_name):
    """
    Ingests data from specified files, extracts date information, and transforms it to Parquet format.

    Parameters:
    - files_paths (list): List of file paths to be ingested.
    - prod_folder_path (str): Base path for the Parquet output.
    - stock_name (str) : the stock name
    """
    for file in files_paths:
        # Extract date information from the file path
        date_file = file.split("/")[-3:-1]
        year = int(date_file[0].split("=")[1])
        month = int(date_file[1].split("=")[1])

        # Build the destination Parquet file path
        prod_file_path = f'{prod_folder_path}{stock_name.lower()}/year={year}/month={"{:02}".format(month)}/{stock_name}.parquet'
        # Copy the file to the Parquet destination
        dbutils.fs.cp(file, prod_file_path)

In [0]:
#-----------------------------------------
# Set the raw and the prod folder paths
#-----------------------------------------
raw_folder_path = "/mnt/raw/"
prod_folder_path = "/mnt/prod/"

raw_files_paths = [file.path for file in dbutils.fs.ls(raw_folder_path)]

In [0]:
#-----------------------------------------
# The schema
#-----------------------------------------
schema = StructType([
    StructField("date", StringType(), True),
    StructField("open", StringType(), True),
    StructField("high", StringType(), True),
    StructField("low", StringType(), True),
    StructField("close", StringType(), True),
    StructField("volume", StringType(), True)
])

col_float = ["open", "high", "low", "close", "volume"]

In [0]:
def process_stocks(raw_files_paths):
    try:
        for file in raw_files_paths:
            stock_name = file.split("/")[-1].split(".")[0]
            print(f'process start for {stock_name}')
            # Load the data
            df = json_to_spark_df(file, schema=schema)

            #Convert the columns type
            # Date column
            df = df.withColumn("date", F.col("date").cast(DateType()))
            # Float columns
            for col in col_float:
                df = df.withColumn(col, F.col(col).cast(FloatType()))
            
            # Add year and month to make partion
            df = df.withColumn("year", F.year(F.col("date")))
            df = df.withColumn("month", F.month(F.col("date")))

            # Temp folder to save temp parquet files
            temp_folder = prod_folder_path+"temp/"
            # Partion files in folders by year and month
            df.write.partitionBy(["year", "month"]).mode("overwrite").parquet(temp_folder)
            # Delet the succes file
            dbutils.fs.rm(temp_folder+"_SUCCESS")

            # get all files path ending with .parquet
            files_paths = get_files_paths_from_folders(temp_folder, ".parquet")
            
            # Copy parquet files to final destination
            ingest_and_transform_to_parquet(files_paths, prod_folder_path, stock_name)

            # delete the temp folder
            delete_contents_recursively(temp_folder)
            print(f'process end for {stock_name}')
            print('--------------------------------')  

    except Exception as e:
        print("The error is: ",e)
    


In [0]:
process_stocks(raw_files_paths)

process start for AAPL
process end for AAPL
--------------------------------
process start for AMZN
process end for AMZN
--------------------------------
process start for MSFT
process end for MSFT
--------------------------------
process start for TSLA
process end for TSLA
--------------------------------
