# 0. Setup

### Objective : 
### 1. to create and refresh the dataset of JPJ registrations
### 2. to allow ease of studies of top brands and models of new cars in malaysia
### 3. to allow trending of EV/hybrid adoption over time

##### source data:   
https://storage.data.gov.my/transportation/cars_2025.csv  
https://storage.data.gov.my/transportation/cars_2024.csv

In [0]:
# Step 1: Create schema/database if not exists under 'workspace' catalog
spark.sql("CREATE DATABASE IF NOT EXISTS workspace.data_gov_my")

In [0]:
# Step 2: Create volume (folder-like object inside the schema)
spark.sql("CREATE VOLUME IF NOT EXISTS workspace.mixture.jpj_landing")
spark.sql("CREATE VOLUME IF NOT EXISTS workspace.mixture.jpj_archive")


In [0]:
# step 3: upload the latest file to volume/folder: "workspace.mixture.jpj_landing", can do manual or via API direct from the source

In [0]:
import pandas as pd
from datetime import datetime

URL_DATA = 'https://storage.data.gov.my/transportation/cars_2025.parquet'

df = pd.read_parquet(URL_DATA)
if 'date' in df.columns: df['date'] = pd.to_datetime(df['date'])

today_str = datetime.now().strftime('%Y%m%d')
csv_file_path = f"/Volumes/workspace/mixture/jpj_landing/cars_{today_str}.csv"
df.to_csv(csv_file_path, index=False)

display(dbutils.fs.ls("/Volumes/workspace/mixture/jpj_landing/"))

# 1. initial ingestions

In [0]:
# #initial ingestion
# csv_file_path = "/Volumes/workspace/mixture/jpj_landing/cars_2000.csv"

# df = spark.read.format('com.databricks.spark.csv') \
#     .options(header='true', inferschema='true') \
#     .load(csv_file_path) # this is your csv file

# df.show()

In [0]:
# # #initial ingestion
# df.write.format("delta").mode("overwrite").saveAsTable("workspace.data_gov_my.cars")

In [0]:
# #initial ingestion
# %sql 
# select * from workspace.data_gov_my.cars limit 5

In [0]:
# # #initial ingestion
# import shutil

# src_path = "/Volumes/workspace/mixture/jpj_landing/cars_2000.csv"
# dst_path = "/Volumes/workspace/mixture/jpj_archive/cars_2000.csv"

# shutil.move(src_path, dst_path)

# 2. Operational update/refresh

In [0]:
existing_df = spark.sql("select * from workspace.data_gov_my.cars")
existing_df.createOrReplaceTempView("df_existing")

In [0]:
spark.sql("""
select date_reg, count(*) from df_existing  group by 1 order by 1 desc
""").toPandas().head()

In [0]:
from pyspark.sql.functions import col, desc

csv_file_path = "/Volumes/workspace/mixture/jpj_landing/cars*.csv"

new_df = spark.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load(csv_file_path) # this is your csv file

new_df.orderBy(desc("date_reg")).show()

In [0]:
new_df.createOrReplaceTempView("df_new")

In [0]:
## use this block if there are additional records added on top of previous reports
new_df_filtered= spark.sql("""
SELECT * FROM df_new 
WHERE NOT EXISTS (
    SELECT 1
    FROM df_existing
    WHERE df_existing.date_reg=df_new.date_reg
    and df_existing.type=df_new.type
    and df_existing.maker=df_new.maker
    and df_existing.model=df_new.model
    and df_existing.colour=df_new.colour
    and df_existing.fuel=df_new.fuel
    and df_existing.state=df_new.state
    );
""")

## use this block if the records are clean, no additional records added on top of previous reports
# new_df_filtered= spark.sql("""
# SELECT * FROM df_new 
# WHERE date_reg > (select max(date_reg) from df_existing)
# """)


In [0]:
new_df_filtered.show()

In [0]:
new_df_filtered.write.format("delta").mode("append").saveAsTable("workspace.data_gov_my.cars")

In [0]:
%sql 
select substr(date_reg,1,7) as month_reg, count(*) from workspace.data_gov_my.cars  group by 1 order by 1 desc

In [0]:
## archiving
from os import listdir
import os
import shutil


source_dir = "/Volumes/workspace/mixture/jpj_landing"
target_dir = "/Volumes/workspace/mixture/jpj_archive"
for file_name in listdir(source_dir):
    if file_name.endswith('.csv'):
        source_file=os.path.join(source_dir,file_name)
        target_file=os.path.join(target_dir,file_name)
        shutil.move(source_file,target_file)
        print("moved file: " + file_name)

# z. references:  