In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("EV Analytics Pipeline") \
    .config("spark.jars", "/opt/spark/jars/hudi-spark3-bundle_2.12-1.0.0.jar") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED") \
    .config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED") \
    .master("spark://spark-master:7077")\
    .getOrCreate()

In [16]:
df = spark.read.load("hdfs://namenode:9000/data/Electric_Vehicle_Population_Data.csv", format="csv", sep=",", inferSchema="true", header="true")

In [17]:
df.show()

+----------+---------+----------+-----+-----------+----------+---------+-----------+---------------------+-------------------------------------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+
|VIN (1-10)|   County|      City|State|Postal Code|Model Year|     Make|      Model|Electric Vehicle Type|Clean Alternative Fuel Vehicle (CAFV) Eligibility|Electric Range|Base MSRP|Legislative District|DOL Vehicle ID|    Vehicle Location|    Electric Utility|2020 Census Tract|
+----------+---------+----------+-----+-----------+----------+---------+-----------+---------------------+-------------------------------------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+
|5YJ3E1EB6K|     King|   Seattle|   WA|      98178|      2019|    TESLA|    MODEL 3| Battery Electric ...|                             Clean Alternative...|          

In [18]:
df.schema

StructType([StructField('VIN (1-10)', StringType(), True), StructField('County', StringType(), True), StructField('City', StringType(), True), StructField('State', StringType(), True), StructField('Postal Code', IntegerType(), True), StructField('Model Year', IntegerType(), True), StructField('Make', StringType(), True), StructField('Model', StringType(), True), StructField('Electric Vehicle Type', StringType(), True), StructField('Clean Alternative Fuel Vehicle (CAFV) Eligibility', StringType(), True), StructField('Electric Range', IntegerType(), True), StructField('Base MSRP', IntegerType(), True), StructField('Legislative District', IntegerType(), True), StructField('DOL Vehicle ID', IntegerType(), True), StructField('Vehicle Location', StringType(), True), StructField('Electric Utility', StringType(), True), StructField('2020 Census Tract', LongType(), True)])

In [40]:
#raname all the columns as per choice general convention lower case an snake case
rename_cols = {
    'VIN (1-10)': 'vehical_number',
    'Postal Code': 'postal_code',
    'Electric Vehicle Type': 'vehical_type',
    'Clean Alternative Fuel Vehicle (CAFV) Eligibility': 'cavf_eligibility',
    'Electric Range': 'electric_range',
    'Base MSRP': 'base_msrp',
    'Legislative District': 'legislative_district',
    'Vehicle Location': 'vehicle_location',
    'Electric Utility': 'electric_utility',
    '2020 Cencus Tract': 'cencus_tract',
    'Model Year':'model_year'
}


for old_name, new_name in rename_cols.items():
    df = df.withColumnRenamed(old_name, new_name)


In [41]:
#Make all columns lower case
for col in df.columns:
    df = df.withColumnRenamed(col, col.lower())

In [42]:
df.show()

+--------------+---------+----------+-----+-----------+----------+---------+-----------+--------------------+--------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+
|vehical_number|   county|      city|state|postal_code|model_year|     make|      model|        vehical_type|    cavf_eligibility|electric_range|base_msrp|legislative_district|dol vehicle id|    vehicle_location|    electric_utility|2020 census tract|
+--------------+---------+----------+-----+-----------+----------+---------+-----------+--------------------+--------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+
|    5YJ3E1EB6K|     King|   Seattle|   WA|      98178|      2019|    TESLA|    MODEL 3|Battery Electric ...|Clean Alternative...|           220|        0|                  37|     101250425|POINT (-122.23825...|CITY OF SEATTLE -...|      53033

In [35]:
from pyspark.sql.functions import col
df_state_not_wa = df.filter(col('state')!="WA")
df_state_not_wa.show()
# So data of multiple states exists

+--------------+--------------+--------------+-----+-----------+----------+----------+--------------+--------------------+--------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+
|vehical_number|        county|          city|state|postal_code|model year|      make|         model|        vehical_type|    cavf_eligibility|electric_range|base_msrp|legislative_district|dol vehicle id|    vehicle_location|    electric_utility|2020 census tract|
+--------------+--------------+--------------+-----+-----------+----------+----------+--------------+--------------------+--------------------+--------------+---------+--------------------+--------------+--------------------+--------------------+-----------------+
|    WBAJA9C50K|          NULL|          NULL|   AE|       NULL|      2019|       BMW|          530E|Plug-in Hybrid El...|Not eligible due ...|            16|    53400|                NULL|     244582593| 

In [43]:
## See which column has null values
from pyspark.sql.functions import col, count, when

null_counts = df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df.columns
])

null_counts.show()

+--------------+------+----+-----+-----------+----------+----+-----+------------+----------------+--------------+---------+--------------------+--------------+----------------+----------------+-----------------+
|vehical_number|county|city|state|postal_code|model_year|make|model|vehical_type|cavf_eligibility|electric_range|base_msrp|legislative_district|dol vehicle id|vehicle_location|electric_utility|2020 census tract|
+--------------+------+----+-----+-----------+----------+----+-----+------------+----------------+--------------+---------+--------------------+--------------+----------------+----------------+-----------------+
|             0|     4|   4|    0|          4|         0|   0|    0|           0|               0|            17|       17|                 540|             0|              11|               4|                4|
+--------------+------+----+-----+-----------+----------+----+-----+------------+----------------+--------------+---------+--------------------+--------

In [46]:
#Handling Null values 
# county - Unknown
# city - Unknown
# postal_code = 0
df_filled = df.fillna({
    'city':'Unknown',
    'county':'Unknown',
    'postal_code': 0,
    'electric_range': 0,
    'electric_utility': 'Unknown',
    'model_year': 0,
    'vehicle_location': 'Unknown',
    'legislative_district':0,
})

In [53]:
#Dropping unnecessary column
new_df = df_filled.drop('2020 census tract', 'dol vehicle id')

In [54]:
new_df.show()

+--------------+---------+----------+-----+-----------+----------+---------+-----------+--------------------+--------------------+--------------+---------+--------------------+--------------------+--------------------+
|vehical_number|   county|      city|state|postal_code|model_year|     make|      model|        vehical_type|    cavf_eligibility|electric_range|base_msrp|legislative_district|    vehicle_location|    electric_utility|
+--------------+---------+----------+-----+-----------+----------+---------+-----------+--------------------+--------------------+--------------+---------+--------------------+--------------------+--------------------+
|    5YJ3E1EB6K|     King|   Seattle|   WA|      98178|      2019|    TESLA|    MODEL 3|Battery Electric ...|Clean Alternative...|           220|        0|                  37|POINT (-122.23825...|CITY OF SEATTLE -...|
|    5YJYGAEE5M|   Yakima|     Selah|   WA|      98942|      2021|    TESLA|    MODEL Y|Battery Electric ...|Eligibility unk

In [64]:
#Make lat and lon columns 
from pyspark.sql.functions import col
new_df = new_df.withColumn('longitude', regexp_extract(col("vehicle_location"), r'POINT \((-?\d+\.\d+)', 1).cast(DoubleType()))
new_df = new_df.withColumn("latitude", regexp_extract(col("vehicle_location"), r'POINT \(-?\d+\.\d+ (-?\d+\.\d+)\)', 1).cast(DoubleType()))

In [68]:
new_df = new_df.drop('vehicle_location')

In [69]:
new_df.show()

+--------------+---------+----------+-----+-----------+----------+---------+-----------+--------------------+--------------------+--------------+---------+--------------------+--------------------+--------+----------+
|vehical_number|   county|      city|state|postal_code|model_year|     make|      model|        vehical_type|    cavf_eligibility|electric_range|base_msrp|legislative_district|    electric_utility|latitude| longitude|
+--------------+---------+----------+-----+-----------+----------+---------+-----------+--------------------+--------------------+--------------+---------+--------------------+--------------------+--------+----------+
|    5YJ3E1EB6K|     King|   Seattle|   WA|      98178|      2019|    TESLA|    MODEL 3|Battery Electric ...|Clean Alternative...|           220|        0|                  37|CITY OF SEATTLE -...|47.49461|-122.23825|
|    5YJYGAEE5M|   Yakima|     Selah|   WA|      98942|      2021|    TESLA|    MODEL Y|Battery Electric ...|Eligibility unkno..

In [76]:
new_df = new_df.fillna({
    'latitude':0,
    'longitude':0,
    'base_msrp':0
})

In [74]:
new_df.show()

+--------------+---------+----------+-----+-----------+----------+---------+-----------+--------------------+--------------------+--------------+---------+--------------------+--------------------+--------+----------+
|vehical_number|   county|      city|state|postal_code|model_year|     make|      model|        vehical_type|    cavf_eligibility|electric_range|base_msrp|legislative_district|    electric_utility|latitude| longitude|
+--------------+---------+----------+-----+-----------+----------+---------+-----------+--------------------+--------------------+--------------+---------+--------------------+--------------------+--------+----------+
|    5YJ3E1EB6K|     King|   Seattle|   WA|      98178|      2019|    TESLA|    MODEL 3|Battery Electric ...|Clean Alternative...|           220|        0|                  37|CITY OF SEATTLE -...|47.49461|-122.23825|
|    5YJYGAEE5M|   Yakima|     Selah|   WA|      98942|      2021|    TESLA|    MODEL Y|Battery Electric ...|Eligibility unkno..

In [79]:
new_df.schema

StructType([StructField('vehical_number', StringType(), True), StructField('county', StringType(), False), StructField('city', StringType(), False), StructField('state', StringType(), True), StructField('postal_code', IntegerType(), False), StructField('model_year', IntegerType(), False), StructField('make', StringType(), True), StructField('model', StringType(), True), StructField('vehical_type', StringType(), True), StructField('cavf_eligibility', StringType(), True), StructField('electric_range', IntegerType(), False), StructField('base_msrp', IntegerType(), False), StructField('legislative_district', IntegerType(), False), StructField('electric_utility', StringType(), False), StructField('latitude', DoubleType(), False), StructField('longitude', DoubleType(), False)])

In [83]:
from pyspark.sql.functions import current_timestamp

new_df = new_df.withColumn('event_time', current_timestamp())

In [85]:
new_df.show()

+--------------+---------+----------+-----+-----------+----------+---------+-----------+--------------------+--------------------+--------------+---------+--------------------+--------------------+--------+----------+--------------------+
|vehical_number|   county|      city|state|postal_code|model_year|     make|      model|        vehical_type|    cavf_eligibility|electric_range|base_msrp|legislative_district|    electric_utility|latitude| longitude|          event_time|
+--------------+---------+----------+-----+-----------+----------+---------+-----------+--------------------+--------------------+--------------+---------+--------------------+--------------------+--------+----------+--------------------+
|    5YJ3E1EB6K|     King|   Seattle|   WA|      98178|      2019|    TESLA|    MODEL 3|Battery Electric ...|Clean Alternative...|           220|        0|                  37|CITY OF SEATTLE -...|47.49461|-122.23825|2025-05-22 10:45:...|
|    5YJYGAEE5M|   Yakima|     Selah|   WA| 

In [86]:
# Define Hudi write options
hudi_options = {
    'hoodie.table.name': 'ev_data_cleaned',
    'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field': 'vehical_number',
    'hoodie.datasource.write.partitionpath.field': 'state',
    'hoodie.datasource.write.precombine.field': 'event_time',
    'hoodie.upsert.shuffle.parallelism': '2',
    'hoodie.insert.shuffle.parallelism': '2',
    
}

In [89]:
output_path = "hdfs://namenode:9000/data/hudi/ev_data_cleaned"

# Write to Hudi
new_df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("append") \
    .save(output_path)