Create schema for csv file and import it as a dataframe

In [0]:
from pyspark.sql.types import StructType, StructField, ShortType, StringType, LongType

df_schema = StructType([
    StructField("STATISTIC", StringType()),
    StructField("STATISTIC Label", StringType()),
    StructField("TLIST(A1)", ShortType()),
    StructField("Year", ShortType()),
    StructField("Town", StringType()),
    StructField("C02172V02618", StringType()),
    StructField("Type of Vehicle Registration", StringType()),
    StructField("C01841V02268", StringType()),
    StructField("Type of Fuel", StringType()),
    StructField("UNIT", StringType()),
    StructField("VALUE", LongType())
])

df = spark.read.load('/bronze/vehicle_licensing.csv', format='csv', header=True, schema=df_schema)

Drop some columns: 
- The first 2 columns are redundant.
- The 3rd column is a duplicate of the fourth column.
- The 6th and 8th columns have no meaning.
- The 10th column is redundant.

In [0]:
# Get the column names
all_columns = df.columns

# Specify the indices of the columns to drop
indices_to_drop = [0, 1, 2, 5, 7, 9]

# Get the column names to drop based on their indices
columns_to_drop = [all_columns[i] for i in indices_to_drop]

# Drop the specified columns
df = df.drop(*columns_to_drop)


Rename some columns

In [0]:
# Dictionary of columns to rename and their new names
columns_to_rename = {
    "Type of Vehicle Registration": "Type_of_vehicle_registration",
    "Type of Fuel": "Type_of_fuel",
    "VALUE": "No_of_licensed_vehicles"
}

# Rename columns
for old_col, new_col in columns_to_rename.items():
    df = df.withColumnRenamed(old_col, new_col)

Replace null and blank values in string columns with "Unknown"

In [0]:
from pyspark.sql.functions import when, lit, col
 
string_cols = ['Town', 'Type_of_vehicle_registration', 'Type_of_fuel']

for col_name in string_cols:
    df = df.withColumn(col_name, when(
        (col(col_name).isNull() | (col(col_name)=="")), 
        lit("Unknown")
    ).otherwise(col(col_name)))

Extract the left-most word from **Town** before delimiter using regular expressions

In [0]:
from pyspark.sql.functions import regexp_extract

# Use regular expression to extract the first word based on multiple delimiters
df = df.withColumn("Town", regexp_extract(col("Town"), r"^[^\s-_]+", 0))

Trim string columns

In [0]:
from pyspark.sql.functions import trim

for col_name in string_cols:
    df = df.withColumn(col_name, trim(df[col_name]))

Write to silver layer as a delta table

In [0]:
from delta.tables import DeltaTable

df.write.format("delta").mode("append").save("/delta/vehicle_licensing_silver")