In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, isnan, when, count, lag, monotonically_increasing_id, last, lit, month, year, date_format
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType, FloatType, DateType

In [None]:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "67e2a154-bbe2-41d3-bf7f-fa532f6ca4a3",
"fs.azure.account.oauth2.client.secret": "KOt8Q~nmkLthLI9Pk3lHruzr3a9~MpMxTPdZibxK",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/2704fe5d-4853-46d7-933b-33ee001a4da5/oauth2/token"}

dbutils.fs.mount(
source = "abfss://gold-price-data@goldpricedata.dfs.core.windows.net", # contrainer@storageacc
mount_point = "/mnt/db_data2",
extra_configs = configs)

Out[4]: True

In [None]:
%fs
ls "/mnt/db_data2"

path,name,size,modificationTime
dbfs:/mnt/db_data2/raw-data/,raw-data/,0,1693307157000
dbfs:/mnt/db_data2/transformed-data/,transformed-data/,0,1693307180000


In [None]:
spark

## Loading Data from raw-data folder from account-storage

In [None]:
gold = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/db_data2/raw-data/gold.csv")
silver = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/db_data2/raw-data/silver.csv")

## Transformation of Data

In [None]:
gold.show()

+----------+----------+------+------+------+------+
|      Date|Close/Last|Volume|  Open|  High|   Low|
+----------+----------+------+------+------+------+
|2023-08-17|    1915.2|146770|1922.4|1933.5|1914.2|
|2023-08-16|    1928.3|124766|1933.1|1938.2|1922.0|
|2023-08-15|    1935.2|161512|1939.4|1944.3|1927.5|
|2023-08-14|    1944.0|117514|1945.6|1948.2|1934.2|
|2023-08-11|    1946.6|119090|1944.9|1953.6|1942.7|
|2023-08-10|    1948.9|163805|1947.7|1963.5|1944.4|
|2023-08-09|    1950.6|130739|1959.3|1966.1|1947.2|
|2023-08-08|    1959.9|138581|1971.5|1972.8|1956.5|
|2023-08-07|    1970.0|101170|1977.6|1981.7|1966.1|
|2023-08-04|    1939.6|   290|1934.9|1945.0|1920.0|
|2023-08-03|    1932.0|   773|1934.2|1936.5|1928.0|
|2023-08-02|    1937.4|   652|1948.4|1953.6|1933.2|
|2023-08-01|    1940.7|   688|1964.9|1965.4|1940.8|
|2023-07-31|    1970.5|  1025|1959.2|1971.6|1950.0|
|2023-07-28|    1960.4| 22705|1945.5|1962.2|1944.2|
|2023-07-27|    1945.7|192370|1973.3|1982.6|1941.7|
|2023-07-26|

In [None]:
gold.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Close/Last: double (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)



In [None]:

gold = gold.withColumn("Date",col("Date").cast(DateType()))\
    .withColumn("Close/Last",col("Close/Last").cast(FloatType()))\
    .withColumn("Volume",col("Volume").cast(DoubleType()))\
    .withColumn("Open",col("Open").cast(FloatType()))\
    .withColumn("High",col("High").cast(FloatType()))\
    .withColumn("Low",col("Low").cast(FloatType()))

In [None]:
gold.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Close/Last: float (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)



In [None]:
gold_Columns=["Date", "Close/Last", "Volume", "Open", "High", "Low"]
missing_counts_exprs = []

# Loop through columns and create expressions to count missing values based on data type
for c in gold_Columns:
    if c == "Date":
        expr = count(when(col(c).isNull(), c)).alias(c)
    elif c in ["Close/Last", "Open", "High", "Low"]:
        expr = count(when(col(c).isNull() | isnan(col(c)), c)).alias(c)
    else:
        expr = count(when(col(c).isNull(), c)).alias(c)
    missing_counts_exprs.append(expr)

missing_counts = gold.select(missing_counts_exprs)
missing_counts.show()

+----+----------+------+----+----+---+
|Date|Close/Last|Volume|Open|High|Low|
+----+----------+------+----+----+---+
|   0|         0|    28|   0|   0|  0|
+----+----------+------+----+----+---+



### Removing null values from Volumn Column

In [None]:
gold.count()

Out[29]: 2539

In [None]:
# Specify the column name
column_name = "Volume"

# Create a Window specification to order by the DataFrame's index or any other suitable column
window_spec = Window.orderBy(monotonically_increasing_id())

# Replace null values with the previous non-null value using the last() window function
gold = gold.withColumn(column_name, last(column_name, True).over(window_spec))
gold.show()

+----------+----------+--------+------+------+------+
|      Date|Close/Last|  Volume|  Open|  High|   Low|
+----------+----------+--------+------+------+------+
|2023-08-17|    1915.2|146770.0|1922.4|1933.5|1914.2|
|2023-08-16|    1928.3|124766.0|1933.1|1938.2|1922.0|
|2023-08-15|    1935.2|161512.0|1939.4|1944.3|1927.5|
|2023-08-14|    1944.0|117514.0|1945.6|1948.2|1934.2|
|2023-08-11|    1946.6|119090.0|1944.9|1953.6|1942.7|
|2023-08-10|    1948.9|163805.0|1947.7|1963.5|1944.4|
|2023-08-09|    1950.6|130739.0|1959.3|1966.1|1947.2|
|2023-08-08|    1959.9|138581.0|1971.5|1972.8|1956.5|
|2023-08-07|    1970.0|101170.0|1977.6|1981.7|1966.1|
|2023-08-04|    1939.6|   290.0|1934.9|1945.0|1920.0|
|2023-08-03|    1932.0|   773.0|1934.2|1936.5|1928.0|
|2023-08-02|    1937.4|   652.0|1948.4|1953.6|1933.2|
|2023-08-01|    1940.7|   688.0|1964.9|1965.4|1940.8|
|2023-07-31|    1970.5|  1025.0|1959.2|1971.6|1950.0|
|2023-07-28|    1960.4| 22705.0|1945.5|1962.2|1944.2|
|2023-07-27|    1945.7|19237

In [None]:
gold_Columns=["Date", "Close/Last", "Volume", "Open", "High", "Low"]
missing_counts_exprs = []

# Loop through columns and create expressions to count missing values based on data type
for c in gold_Columns:
    if c == "Date":
        expr = count(when(col(c).isNull(), c)).alias(c)
    elif c in ["Close/Last", "Open", "High", "Low"]:
        expr = count(when(col(c).isNull() | isnan(col(c)), c)).alias(c)
    else:
        expr = count(when(col(c).isNull(), c)).alias(c)
    missing_counts_exprs.append(expr)

missing_counts = gold.select(missing_counts_exprs)
missing_counts.show()

+----+----------+------+----+----+---+
|Date|Close/Last|Volume|Open|High|Low|
+----+----------+------+----+----+---+
|   0|         0|     0|   0|   0|  0|
+----+----------+------+----+----+---+



In [None]:
silver.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Close/Last: double (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)



In [None]:

silver = silver.withColumn("Date",col("Date").cast(DateType()))\
    .withColumn("Close/Last",col("Close/Last").cast(FloatType()))\
    .withColumn("Volume",col("Volume").cast(DoubleType()))\
    .withColumn("Open",col("Open").cast(FloatType()))\
    .withColumn("High",col("High").cast(FloatType()))\
    .withColumn("Low",col("Low").cast(FloatType()))

In [None]:
silver.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Close/Last: float (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)



In [None]:
silver_Columns=["Date", "Close/Last", "Volume", "Open", "High", "Low"]
missing_counts_exprs = []

# Loop through columns and create expressions to count missing values based on data type
for c in silver_Columns:
    if c == "Date":
        expr = count(when(col(c).isNull(), c)).alias(c)
    elif c in ["Close/Last", "Open", "High", "Low"]:
        expr = count(when(col(c).isNull() | isnan(col(c)), c)).alias(c)
    else:
        expr = count(when(col(c).isNull(), c)).alias(c)
    missing_counts_exprs.append(expr)

missing_counts = silver.select(missing_counts_exprs)
missing_counts.show()

+----+----------+------+----+----+---+
|Date|Close/Last|Volume|Open|High|Low|
+----+----------+------+----+----+---+
|   0|         0|    63|   0|   0|  0|
+----+----------+------+----+----+---+



In [None]:
# Specify the column name
column_name = "Volume"

# Create a Window specification to order by the DataFrame's index or any other suitable column
window_spec = Window.orderBy(monotonically_increasing_id())

# Replace null values with the previous non-null value using the last() window function
silver = silver.withColumn(column_name, last(column_name, True).over(window_spec))
silver.show()

+----------+----------+-------+------+------+------+
|      Date|Close/Last| Volume|  Open|  High|   Low|
+----------+----------+-------+------+------+------+
|2023-08-17|    23.042|11441.0|  22.8|23.385|22.725|
|2023-08-16|    22.856|10802.0|22.905| 23.17| 22.77|
|2023-08-15|    22.656|60396.0| 22.65| 22.77|22.265|
|2023-08-14|    22.708|51908.0| 22.74| 22.82| 22.41|
|2023-08-11|    22.743|48043.0| 22.76| 22.91| 22.61|
|2023-08-10|    22.821|71226.0| 22.73| 23.06|22.665|
|2023-08-09|    22.731|60561.0| 22.82| 22.99| 22.68|
|2023-08-08|    22.807|73338.0|  23.2|23.255| 22.72|
|2023-08-07|    23.232|55345.0| 23.72|23.775|23.145|
|2023-08-04|    23.716|56747.0| 23.71|23.895|23.275|
|2023-08-03|    23.697|58992.0|23.835| 23.94| 23.41|
|2023-08-02|    23.872|71917.0| 24.48| 24.63| 23.76|
|2023-08-01|    24.326|51464.0|  24.9|24.905|24.255|
|2023-07-31|    24.972|44254.0|24.475|24.985| 24.32|
|2023-07-28|    24.495|38405.0|24.265|24.545| 24.24|
|2023-07-27|    24.367|81985.0|25.075|25.325| 

In [None]:
silver_Columns=["Date", "Close/Last", "Volume", "Open", "High", "Low"]
missing_counts_exprs = []

# Loop through columns and create expressions to count missing values based on data type
for c in silver_Columns:
    if c == "Date":
        expr = count(when(col(c).isNull(), c)).alias(c)
    elif c in ["Close/Last", "Open", "High", "Low"]:
        expr = count(when(col(c).isNull() | isnan(col(c)), c)).alias(c)
    else:
        expr = count(when(col(c).isNull(), c)).alias(c)
    missing_counts_exprs.append(expr)

missing_counts = silver.select(missing_counts_exprs)
missing_counts.show()

+----+----------+------+----+----+---+
|Date|Close/Last|Volume|Open|High|Low|
+----+----------+------+----+----+---+
|   0|         0|     0|   0|   0|  0|
+----+----------+------+----+----+---+



In [None]:
gold = gold.withColumn("Category", lit("gold")).withColumn("Currency", lit("USD"))


In [None]:
silver = silver.withColumn("Category", lit("silver")).withColumn("Currency", lit("USD"))

In [None]:
gold = gold.withColumn("Month", month(col("Date"))).withColumn("Year", year(col("Date")))
gold = gold.withColumn("DayOfWeek", date_format("Date", "EEEE"))

In [None]:
silver = silver.withColumn("Month", month(col("Date"))).withColumn("Year", year(col("Date")))
silver = silver.withColumn("DayOfWeek", date_format("Date", "EEEE"))


In [None]:
silver.show()

+----------+----------+-------+------+------+------+--------+--------+-----+----+---------+
|      Date|Close/Last| Volume|  Open|  High|   Low|Category|Currency|Month|Year|DayOfWeek|
+----------+----------+-------+------+------+------+--------+--------+-----+----+---------+
|2023-08-17|    23.042|11441.0|  22.8|23.385|22.725|  silver|     USD|    8|2023| Thursday|
|2023-08-16|    22.856|10802.0|22.905| 23.17| 22.77|  silver|     USD|    8|2023|Wednesday|
|2023-08-15|    22.656|60396.0| 22.65| 22.77|22.265|  silver|     USD|    8|2023|  Tuesday|
|2023-08-14|    22.708|51908.0| 22.74| 22.82| 22.41|  silver|     USD|    8|2023|   Monday|
|2023-08-11|    22.743|48043.0| 22.76| 22.91| 22.61|  silver|     USD|    8|2023|   Friday|
|2023-08-10|    22.821|71226.0| 22.73| 23.06|22.665|  silver|     USD|    8|2023| Thursday|
|2023-08-09|    22.731|60561.0| 22.82| 22.99| 22.68|  silver|     USD|    8|2023|Wednesday|
|2023-08-08|    22.807|73338.0|  23.2|23.255| 22.72|  silver|     USD|    8|2023

In [None]:
gold.show()

+----------+----------+--------+------+------+------+--------+--------+-----+----+---------+
|      Date|Close/Last|  Volume|  Open|  High|   Low|Category|Currency|Month|Year|DayOfWeek|
+----------+----------+--------+------+------+------+--------+--------+-----+----+---------+
|2023-08-17|    1915.2|146770.0|1922.4|1933.5|1914.2|    gold|     USD|    8|2023| Thursday|
|2023-08-16|    1928.3|124766.0|1933.1|1938.2|1922.0|    gold|     USD|    8|2023|Wednesday|
|2023-08-15|    1935.2|161512.0|1939.4|1944.3|1927.5|    gold|     USD|    8|2023|  Tuesday|
|2023-08-14|    1944.0|117514.0|1945.6|1948.2|1934.2|    gold|     USD|    8|2023|   Monday|
|2023-08-11|    1946.6|119090.0|1944.9|1953.6|1942.7|    gold|     USD|    8|2023|   Friday|
|2023-08-10|    1948.9|163805.0|1947.7|1963.5|1944.4|    gold|     USD|    8|2023| Thursday|
|2023-08-09|    1950.6|130739.0|1959.3|1966.1|1947.2|    gold|     USD|    8|2023|Wednesday|
|2023-08-08|    1959.9|138581.0|1971.5|1972.8|1956.5|    gold|     USD

## Saving Transformed data into transformed-data folder in account-storage

In [None]:
gold.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/db_data2/transformed-data/gold")
silver.repartition(1).write.mode("overwrite").option("header","true").csv("/mnt/db_data2/transformed-data/silver")

In [None]:
gold.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Close/Last: float (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Open: float (nullable = true)
 |-- High: float (nullable = true)
 |-- Low: float (nullable = true)
 |-- Category: string (nullable = false)
 |-- Currency: string (nullable = false)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- DayOfWeek: string (nullable = true)

