In [19]:
import json
from pymongo import MongoClient

#1:Now we try to upload the json to mongodb
cl=MongoClient('mongodb://localhost:27017/')
dab=cl['ev_project']
coll=dab['ev_registrations']
coll.drop()

with open('ev_registrations.json','r') as file:
    json_da=json.load(file)
da=json_da['data']
colm=[col['name']for col in json_da['meta']['view']['columns']]
data_dt=[dict(zip(colm,recd))for recd in da]
 


In [20]:
#it is also necessary for us to insert it in chunks for better efficiency
cz=1000
total_rec=len(data_dt)
for i in range(0,total_rec,cz):
    ck=data_dt[i:i+cz]
    coll.insert_many(ck)
print(f"Total in MongoDB: {coll.count_documents({})}")  

Total in MongoDB: 770809


In [21]:
import pandas as pd
from pymongo import MongoClient
from datetime import datetime

cl=MongoClient('mongodb://localhost:27017/')
dab=cl['ev_project']
coll=dab['ev_registrations']




In [22]:
sd=datetime(2023,1,1)
ed=datetime(2025,6,30)
evda=list(coll.find({'Fuel Type':'Electric','Transaction Date':{'$gte':sd.strftime('%Y-%m-%d'),'$lte':ed.strftime('%Y-%m-%d')}}))


In [23]:
df=pd.DataFrame(evda).drop(columns=['_id','sid','id','position','created_at','updated_at','updated_meta','meta','Fiscal Year'],errors='ignore')

df['Counts']=pd.to_numeric(df['Counts'],errors='coerce')
df=df.drop_duplicates()
print(len(df))


8797


In [24]:
print('uniq residential county',df['Residential County'].nunique())
print('unique fuel type',df['Fuel Type'].nunique())
print(df['Counts'].max())  #we try to see the max value of counts


uniq residential county 39
unique fuel type 1
8448


In [25]:
#we try to print only the monthly data instead of daily data
df['Transaction Date']=pd.to_datetime(df['Transaction Date'])
df['YearMonth']=df['Transaction Date'].dt.to_period('M')
df_group=df.groupby(['Residential County','YearMonth','Fuel Type','Primary Use Class'],as_index=False)['Counts'].sum()
df_group['Residential County']=df_group['Residential County'].str.strip()

print('first 5 rows',df_group.head())
df_group.to_csv('ev_cleaned_monthly.csv',index=False)

first 5 rows   Residential County YearMonth Fuel Type  Primary Use Class  Counts
0              Adams   2023-01  Electric         Commercial       1
1              Adams   2023-01  Electric  Passenger Vehicle       1
2              Adams   2023-02  Electric              Truck       1
3              Adams   2023-03  Electric  Passenger Vehicle       1
4              Adams   2023-04  Electric  Passenger Vehicle       4


In [1]:
from dagster import job, op, In, Out
import pandas as pd
from pymongo import MongoClient
from sqlalchemy import create_engine
from datetime import datetime

In [2]:
@op(out=Out(pd.DataFrame))
def transform_evdata():
    cl=MongoClient('mongodb://localhost:27017/')
    dab=cl['ev_project']
    coll=dab['ev_registrations']

    sd=datetime(2023,1,1)
    ed=datetime(2025,12,31)
    evda=list(coll.find({'Fuel Type':'Electric','Transaction Date':{'$gte':sd.strftime('%Y-%m-%d'),'$lte':ed.strftime('%Y-%m-%d')}}))

    df=pd.DataFrame(evda).drop(columns=['_id','sid','id','position','created_at','updated_at','updated_meta','meta','Fiscal Year'],errors='ignore')
    df['Counts']=pd.to_numeric(df['Counts'],errors='coerce')
    df=df.drop_duplicates()
    print("raw data",len(df))
    

# we are actually trying to debug the raw data present
    df['Transaction Date']=pd.to_datetime(df['Transaction Date'])
    df['YearMonth']=df['Transaction Date'].dt.to_period('M')
    print("Unique YearMonths \t", df['YearMonth'].nunique())
    df_group=df.groupby(['Residential County','YearMonth','Fuel Type','Primary Use Class'],as_index=False)['Counts'].sum()
    df_group['Residential County']=df_group['Residential County'].str.strip()
    df_group['YearMonth']=df_group['YearMonth'].astype(str)
    
    df['Transaction Date']=pd.to_datetime(df['Transaction Date'])
    df['YearMonth']=df['Transaction Date'].dt.to_period('M')
    print("Unique YearMonths \t",df['YearMonth'].nunique())
    
    df_group=df.groupby(['Residential County','YearMonth','Fuel Type','Primary Use Class'],as_index=False)['Counts'].sum()
    df_group['Residential County']=df_group['Residential County'].str.strip()
    df_group['YearMonth']=df_group['YearMonth'].astype(str)
    
    
# now once debugging is done we try to debug the aggregated data
    print("\nAggregated data \t")
    print("Total rows \t",len(df_group))
    print("Unique Residential Counties \t",df_group['Residential County'].nunique())
    print("Unique YearMonths \t",df_group['YearMonth'].nunique())
    print("Unique Primary Use Classes \t",df_group['Primary Use Class'].nunique())
    print("Aggregated Counts summary \n",df_group['Counts'].describe())
    return df_group

In [6]:
@op(ins={"df":In(pd.DataFrame)},out=Out(str))
def save_to_postgres(df):
    try:
        engine = create_engine('postgresql://postgres:password@localhost:5432/ev_project_cleaned')
        df.to_sql('ev_registrations_cleaned',engine,if_exists='replace',index=False)
        print("Data saved into the PostgreSQL")
        return "Saved to PostgreSQL"
    except Exception as e:
        print(f"Error saving to PostgreSQL:{e}")
        return f"Failed to save:{e}"

In [7]:
@job 
def ev_data_pipeline():
    df=transform_evdata()
    save_to_postgres(df)