In [9]:
import requests
import json
import os
from datetime import datetime


## Importing from pyspark after setting JAVA_HOME
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home'
os.environ['PATH'] = os.path.join(os.environ['JAVA_HOME'], 'bin') + ':' + os.environ.get('PATH', '')
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper, trim, current_timestamp, monotonically_increasing_id



In [10]:
spark = SparkSession.builder \
    .appName("AutomobileAPIIngestion") \
    .master("local[*]") \
    .getOrCreate()

spark


In [11]:
api_url = "https://vpic.nhtsa.dot.gov/api/vehicles/getallmakes?format=json"

In [12]:
response = requests.get(api_url)
data_json = response.json()

Bronze Layer for Getting all the Raw Data

In [None]:
from datetime import datetime

BRONZE_PATH = "data/bronze"
os.makedirs(BRONZE_PATH, exist_ok=True)

MAKES = ["honda", "toyota", "ford", "nissan", "bmw"]

def fetch_models(make):
    url = f"https://vpic.nhtsa.dot.gov/api/vehicles/getmodelsformake/{make}?format=json"
    print("Fetching:", url)
    response = requests.get(url)
    return response.json().get("Results", [])

# Fetch & saving the  raw JSON 
all_models = []
for make in MAKES:
    rows = fetch_models(make)
    for r in rows:
        r["QueriedMake"] = make
    all_models.extend(rows)

timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
bronze_file = f"{BRONZE_PATH}/automobiles_{timestamp}.json"

with open(bronze_file, "w") as f:
    json.dump(all_models, f)

print("Bronze snapshot saved:", bronze_file)


Fetching: https://vpic.nhtsa.dot.gov/api/vehicles/getmodelsformake/honda?format=json
Fetching: https://vpic.nhtsa.dot.gov/api/vehicles/getmodelsformake/toyota?format=json
Fetching: https://vpic.nhtsa.dot.gov/api/vehicles/getmodelsformake/ford?format=json
Fetching: https://vpic.nhtsa.dot.gov/api/vehicles/getmodelsformake/nissan?format=json
Fetching: https://vpic.nhtsa.dot.gov/api/vehicles/getmodelsformake/bmw?format=json
Bronze snapshot saved: data/bronze/automobiles_20251209_054452.json


  timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")


In [14]:
df_bronze = spark.read.json(BRONZE_PATH)
df_bronze.show(5, truncate=False)
df_bronze.printSchema()

+-------+---------+--------+----------+-----------+
|Make_ID|Make_Name|Model_ID|Model_Name|QueriedMake|
+-------+---------+--------+----------+-----------+
|474    |Honda    |1861    |Accord    |honda      |
|474    |Honda    |1863    |Civic     |honda      |
|474    |Honda    |1864    |Pilot     |honda      |
|474    |Honda    |1865    |CR-V      |honda      |
|474    |Honda    |1866    |Ridgeline |honda      |
+-------+---------+--------+----------+-----------+
only showing top 5 rows
root
 |-- Make_ID: long (nullable = true)
 |-- Make_Name: string (nullable = true)
 |-- Model_ID: long (nullable = true)
 |-- Model_Name: string (nullable = true)
 |-- QueriedMake: string (nullable = true)



Silver Layer for Cleaning and adding Data Schema Layer

In [None]:
SILVER_PATH = "data/silver"
os.makedirs(SILVER_PATH, exist_ok=True)

df_silver = (
    df_bronze
    .withColumn("Make_Name", upper(trim(col("Make_Name"))))
    .withColumn("Model_Name", trim(col("Model_Name")))
    .withColumn("IngestionTS", current_timestamp())
)

# adding duplicate price, since the api doesn;t have any price info
df_silver = df_silver.withColumn(
    "PriceUSD",
    (monotonically_increasing_id() % 20000) + 15000 
)

df_silver.write.mode("overwrite").parquet(SILVER_PATH)
df_silver.show(5, truncate=False)


[Stage 4:>                                                          (0 + 3) / 3]

+-------+---------+--------+----------+-----------+--------------------------+--------+
|Make_ID|Make_Name|Model_ID|Model_Name|QueriedMake|IngestionTS               |PriceUSD|
+-------+---------+--------+----------+-----------+--------------------------+--------+
|474    |HONDA    |1861    |Accord    |honda      |2025-12-08 23:44:54.037037|15000   |
|474    |HONDA    |1863    |Civic     |honda      |2025-12-08 23:44:54.037037|15001   |
|474    |HONDA    |1864    |Pilot     |honda      |2025-12-08 23:44:54.037037|15002   |
|474    |HONDA    |1865    |CR-V      |honda      |2025-12-08 23:44:54.037037|15003   |
|474    |HONDA    |1866    |Ridgeline |honda      |2025-12-08 23:44:54.037037|15004   |
+-------+---------+--------+----------+-----------+--------------------------+--------+
only showing top 5 rows


                                                                                

In [16]:
df_silver_loaded = spark.read.parquet(SILVER_PATH)
df_silver_loaded.show(5)
df_silver_loaded.printSchema()


+-------+---------+--------+----------+-----------+--------------------+--------+
|Make_ID|Make_Name|Model_ID|Model_Name|QueriedMake|         IngestionTS|PriceUSD|
+-------+---------+--------+----------+-----------+--------------------+--------+
|    474|    HONDA|    1861|    Accord|      honda|2025-12-08 23:44:...|   24184|
|    474|    HONDA|    1863|     Civic|      honda|2025-12-08 23:44:...|   24185|
|    474|    HONDA|    1864|     Pilot|      honda|2025-12-08 23:44:...|   24186|
|    474|    HONDA|    1865|      CR-V|      honda|2025-12-08 23:44:...|   24187|
|    474|    HONDA|    1866| Ridgeline|      honda|2025-12-08 23:44:...|   24188|
+-------+---------+--------+----------+-----------+--------------------+--------+
only showing top 5 rows
root
 |-- Make_ID: long (nullable = true)
 |-- Make_Name: string (nullable = true)
 |-- Model_ID: long (nullable = true)
 |-- Model_Name: string (nullable = true)
 |-- QueriedMake: string (nullable = true)
 |-- IngestionTS: timestamp (nul

Performing Aggregation in the Gold Layer

In [18]:
from pyspark.sql.functions import count, avg


GOLD_PATH = "data/gold"
os.makedirs(GOLD_PATH, exist_ok=True)

gold_make_summary = (
    df_silver_loaded
    .groupBy("Make_Name")
    .agg(
        count("*").alias("Model_Count"),
        avg("PriceUSD").alias("Avg_Price_USD")
    )
)

gold_make_summary.write.mode("overwrite").parquet(f"{GOLD_PATH}/make_summary")

gold_make_summary.show(truncate=False)


+---------------------------------+-----------+------------------+
|Make_Name                        |Model_Count|Avg_Price_USD     |
+---------------------------------+-----------+------------------+
|ASHFORD MFG                      |3          |23667.333333333332|
|WATERFORD TANK AND FABRICATION   |3          |23593.333333333332|
|STANFORD CUSTOMS                 |9          |23695.333333333332|
|SUNDIRO  HONDA MOTORCYCLE CO. LTD|3          |23300.333333333332|
|TOYOTA                           |171        |23511.333333333332|
|FORDS TRAILER SALES              |3          |23646.333333333332|
|LYFORD OVERLAND                  |3          |23707.333333333332|
|NISSAN                           |132        |23729.833333333332|
|FORD                             |447        |23618.957494407157|
|STAFFORD'S TRAILERS              |3          |23698.333333333332|
|MEDFORD STEEL                    |3          |23699.333333333332|
|MILFORD WELDING & MANUFACTURING  |3          |23586.333333333

In [19]:
print("Total Rows:", df_silver_loaded.count())

df_silver_loaded.groupBy("Make_Name").count().show()
df_silver_loaded.describe("PriceUSD").show()

# Most common models
df_silver_loaded.groupBy("Model_Name").count().orderBy(col("count").desc()).show(10)


Total Rows: 3255
+--------------------+-----+
|           Make_Name|count|
+--------------------+-----+
|         ASHFORD MFG|    3|
|WATERFORD TANK AN...|    3|
|    STANFORD CUSTOMS|    9|
|SUNDIRO  HONDA MO...|    3|
|              TOYOTA|  171|
| FORDS TRAILER SALES|    3|
|     LYFORD OVERLAND|    3|
|              NISSAN|  132|
|                FORD|  447|
| STAFFORD'S TRAILERS|    3|
|       MEDFORD STEEL|    3|
|MILFORD WELDING &...|    3|
|      BRADFORD BUILT|    3|
|         BRADFORD #1|    3|
|CRANFORD RADIATOR...|    3|
|        SWINFORD MFG|    3|
|                 BMW|  774|
|               HONDA| 1671|
| AFFORDABLE ALUMINUM|    3|
|FORD MOTOR COMPAN...|    3|
+--------------------+-----+
only showing top 20 rows
+-------+------------------+
|summary|          PriceUSD|
+-------+------------------+
|  count|              3255|
|   mean|23467.333333333332|
| stddev| 6032.341697063684|
|    min|             15000|
|    max|             30676|
+-------+------------------+



In [20]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

assembler = VectorAssembler(
    inputCols=["Make_ID", "Model_ID", "PriceUSD"],
    outputCol="features"
)

vec_df = assembler.transform(df_silver_loaded).select("features")

corr_matrix = Correlation.corr(vec_df, "features").head()[0]
print("Correlation Matrix:\n", corr_matrix)


25/12/08 23:48:12 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

Correlation Matrix:
 DenseMatrix([[1.        , 0.06874098, 0.00345268],
             [0.06874098, 1.        , 0.00391901],
             [0.00345268, 0.00391901, 1.        ]])


In [22]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

make_indexer = StringIndexer(inputCol="Make_Name", outputCol="MakeIndex")
model_indexer = StringIndexer(inputCol="Model_Name", outputCol="ModelIndex")

assembler = VectorAssembler(
    inputCols=["Make_ID", "Model_ID", "MakeIndex", "ModelIndex"],
    outputCol="features"
)

rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="PriceUSD",
    predictionCol="PredictedPrice",
    maxBins=1100,  # increase to handle 1084 categories
    numTrees=10,
    maxDepth=5
)

pipeline = Pipeline(stages=[make_indexer, model_indexer, assembler, rf])

# Train model
model = pipeline.fit(df_silver_loaded)

print("Model Trained Successfully!")

Model Trained Successfully!


Giving my own data for the Model to Predict the Output 

In [23]:
new_data = spark.createDataFrame([
    ("HONDA", "CIVIC", 474, 1863),
    ("TOYOTA", "CAMRY", 452, 1861),
    ("BMW", "X5", 4520, 9250)
], ["Make_Name", "Model_Name", "Make_ID", "Model_ID"])

pred = model.transform(new_data)
pred.select("Make_Name", "Model_Name", "PredictedPrice").show(truncate=False)


25/12/08 23:50:33 ERROR Executor: Exception in task 1.0 in stage 71.0 (TID 110) 
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] User defined function (`StringIndexerModel$$Lambda$5230/0x000000e8026590f0`: (string) => double) failed due to: org.apache.spark.SparkException: Unseen label: CIVIC. To handle unseen labels, set Param handleInvalid to keep.. SQLSTATE: 39000
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:195)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	

Py4JJavaError: An error occurred while calling o785.showString.
: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] User defined function (`StringIndexerModel$$Lambda$5230/0x000000e8026590f0`: (string) => double) failed due to: org.apache.spark.SparkException: Unseen label: CIVIC. To handle unseen labels, set Param handleInvalid to keep.. SQLSTATE: 39000
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:195)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:544)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:497)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:58)
	at org.apache.spark.sql.classic.Dataset.collectFromPlan(Dataset.scala:2244)
	at org.apache.spark.sql.classic.Dataset.$anonfun$head$1(Dataset.scala:1379)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2234)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2232)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
	at org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2232)
	at org.apache.spark.sql.classic.Dataset.head(Dataset.scala:1379)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2810)
	at org.apache.spark.sql.classic.Dataset.getRows(Dataset.scala:339)
	at org.apache.spark.sql.classic.Dataset.showString(Dataset.scala:375)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: Unseen label: CIVIC. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:387)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:372)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:402)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:901)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:901)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [25]:
MODEL_PATH = "/Users/sandeepreddy/Desktop/WorldPac/ml/saved_model"
os.makedirs("ml", exist_ok=True)

model.write().overwrite().save(MODEL_PATH)

print("Model saved to:", MODEL_PATH)


Model saved to: /Users/sandeepreddy/Desktop/WorldPac/ml/saved_model
