# Spark job on Databricks

## 1. Extract Data

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ShortType, DateType
from pyspark.sql.functions import col, datediff

# define schema
schema = StructType([
    StructField("Rego", StringType(), True),
    StructField("Brand", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("Trim", StringType(), True),
    StructField("Year", ShortType(), True),
    StructField("Odometer", IntegerType(), True),
    StructField("Price", IntegerType(), True),
    StructField("Date listed", DateType(), True),
    StructField("Date removed", DateType(), True),
    StructField("Turnover", ShortType(), True)
])

# Read the JSON files into a Spark DataFrame
df = spark.read.json("/mnt/bronze", schema=schema)

# Apply transformations
df = df.withColumn("Turnover", datediff(col("Date removed"), col("Date listed")).cast("int"))
# Apply transformations
df = df.withColumn("Turnover", datediff(col("Date removed"), col("Date listed")).cast("int"))

df.show()

## 2. Transform Data

In [None]:
# inspect mount
len(dbutils.fs.ls('/mnt/bronze'))

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ShortType, DateType
from pyspark.sql.functions import col, datediff

# define schema
schema = StructType([
    StructField("Rego", StringType(), True),
    StructField("Brand", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("Trim", StringType(), True),
    StructField("Year", ShortType(), True),
    StructField("Odometer", IntegerType(), True),
    StructField("Price", IntegerType(), True),
    StructField("Date listed", DateType(), True),
    StructField("Date removed", DateType(), True),
    StructField("Turnover", ShortType(), True)
])

# Read and Join the JSON files into a Spark DataFrame
df = spark.read.json("/mnt/bronze", schema=schema)

# Apply transformations
df = df.withColumn("Turnover", datediff(col("Date removed"), col("Date listed")).cast("int"))
df = df.orderBy("Brand")

# preview the dataframe
df.show()

## 3. Load Data

In [None]:
#mount silver folder --> Processed Zone

adlsFolderName = "silver"
mountPoint_silver = "/mnt/silver"

dbutils.fs.mount(
  source = "wasbs://" + adlsContainerName + "@" + adlsAccountName + ".blob.core.windows.net/" + adlsFolderName,
  mount_point = mountPoint_silver,
  extra_configs = {"fs.azure.account.key." + adlsAccountName + ".blob.core.windows.net":'ntTRTeNT+btriZRlZnfiVOifYio2WRPF7RY/fJKsrm4MM1b5rXKyp1oFWNgLULllaDO1hid1u5cd+ASt3SEDug=='}
)

In [None]:
# Parquet output
from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = f"processed_{now}.parquet"
df.write.parquet(f"/mnt/silver/{filename}")

In [None]:
# CSV output
from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = f"processed_{now}.csv"
df.write.format("csv").option("header", True).mode("overwrite").save(f"/mnt/silver/{filename}")

## Code below invalid 

#### Because Databricks doesn't support wildcard search

In [None]:
#mount archive folder --> cold storage
adlsAccountName = "demodatalake23020801"
adlsContainerName = "demo-container"
adlsFolderName = "archive"
mountPoint_archive = "/mnt/archive"

dbutils.fs.mount(
  source = "wasbs://" + adlsContainerName + "@" + adlsAccountName + ".blob.core.windows.net/" + adlsFolderName,
  mount_point = mountPoint_archive,
  extra_configs = {"fs.azure.account.key." + adlsAccountName + ".blob.core.windows.net":'ntTRTeNT+btriZRlZnfiVOifYio2WRPF7RY/fJKsrm4MM1b5rXKyp1oFWNgLULllaDO1hid1u5cd+ASt3SEDug=='}
)

In [None]:
# Get a list of files in the /mnt/bronze directory
files = dbutils.fs.ls("/mnt/bronze")

# Loop through the files and move the ones with the name format new_listing_*
for file in files:
    if file.name.startswith("new_listing_"):
        dbutils.fs.mv(file.path, "/mnt/archive/")

In [None]:
import os

# Get a list of files in the /mnt/bronze directory
files = dbutils.fs.ls("/mnt/bronze")

# Loop through the files and move the ones with the name format new_listing_*
for file in files:
    if os.path.basename(file.path).startswith("new"):
        dbutils.fs.mv(file.path, "/mnt/archive/")