In [29]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import boto3
import pathlib

In [30]:
# pulling the parquets from s3 and loading into pandas dataframes

storage_options = {
    "client_kwargs": {"endpoint_url": "http://localhost:4566"},
    "key": "test",
    "secret": "test"
}
df = pd.read_parquet('s3://ev-data/raw/', 
                           storage_options=storage_options,
                           engine='pyarrow')
df.head()

Unnamed: 0,session_id,garage_id,user_id,user_type,shared_id,start_plugin,start_plugin_hour,end_plugout,end_plugout_hour,el_kwh,duration_hours,month_plugin,weekdays_plugin,plugin_category,duration_category
0,1,AdO3,AdO3-4,Private,,2018-12-21 10:20:00,10,2018-12-21 10:23:00,10.0,0.3,0.05,Dec,Friday,late morning (9-12),Less than 3 hours
1,2,AdO3,AdO3-4,Private,,2018-12-21 10:24:00,10,2018-12-21 10:32:00,10.0,0.87,0.136667,Dec,Friday,late morning (9-12),Less than 3 hours
2,3,AdO3,AdO3-4,Private,,2018-12-21 11:33:00,11,2018-12-21 19:46:00,19.0,29.87,8.216389,Dec,Friday,late morning (9-12),Between 6 and 9 hours
3,4,AdO3,AdO3-2,Private,,2018-12-22 16:15:00,16,2018-12-23 16:40:00,16.0,15.56,24.419722,Dec,Saturday,late afternoon (15-18),More than 18 hours
4,5,AdO3,AdO3-2,Private,,2018-12-24 22:03:00,22,2018-12-24 23:02:00,23.0,3.62,0.970556,Dec,Monday,late evening (21-midnight),Less than 3 hours


In [31]:
#standardizing column names
df.columns = df.columns.str.lower().str.replace(' ', '_')
df.columns

Index(['session_id', 'garage_id', 'user_id', 'user_type', 'shared_id',
       'start_plugin', 'start_plugin_hour', 'end_plugout', 'end_plugout_hour',
       'el_kwh', 'duration_hours', 'month_plugin', 'weekdays_plugin',
       'plugin_category', 'duration_category'],
      dtype='object')

In [32]:
#converting el_kwh and duration_hours to numeric, coercing errors to NaN
df['el_kwh'] = pd.to_numeric(df['el_kwh'], errors='coerce')
df['duration_hours'] = pd.to_numeric(df['duration_hours'], errors='coerce')

In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6878 entries, 0 to 6877
Data columns (total 15 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   session_id         6878 non-null   int64         
 1   garage_id          6878 non-null   object        
 2   user_id            6878 non-null   object        
 3   user_type          6878 non-null   object        
 4   shared_id          1412 non-null   object        
 5   start_plugin       6878 non-null   datetime64[ns]
 6   start_plugin_hour  6878 non-null   int64         
 7   end_plugout        6844 non-null   datetime64[ns]
 8   end_plugout_hour   6844 non-null   float64       
 9   el_kwh             6878 non-null   float64       
 10  duration_hours     6844 non-null   float64       
 11  month_plugin       6878 non-null   object        
 12  weekdays_plugin    6878 non-null   object        
 13  plugin_category    6878 non-null   object        
 14  duration

In [33]:
df.isna().sum()

session_id              0
garage_id               0
user_id                 0
user_type               0
shared_id            5466
start_plugin            0
start_plugin_hour       0
end_plugout            34
end_plugout_hour       34
el_kwh                  0
duration_hours         34
month_plugin            0
weekdays_plugin         0
plugin_category         0
duration_category      34
dtype: int64

In [36]:
#fixing missing values in 'end_plugout' and 'duration_hours' by assigning correct values
missing_both = df[df['end_plugout'].isna() & df['duration_hours'].isna() & df['duration_category'].isna()]
print("Rows with both end_plugout and duration_hours NULL:", len(missing_both))
missing_both.sample(1)

Rows with both end_plugout and duration_hours NULL: 34


Unnamed: 0,session_id,garage_id,user_id,user_type,shared_id,start_plugin,start_plugin_hour,end_plugout,end_plugout_hour,el_kwh,duration_hours,month_plugin,weekdays_plugin,plugin_category,duration_category
5172,5173,SR2,SR2-3,Private,,2019-12-19 16:23:00,16,NaT,,24.36,,Dec,Thursday,late afternoon (15-18),


In [None]:
# Step 1: calculate average charging rate
complete_sessions = df.dropna(subset=['end_plugout', 'duration_hours']).copy()
complete_sessions['charging_rate'] = complete_sessions['el_kwh'] / complete_sessions['duration_hours']
avg_charging_rate = complete_sessions['charging_rate'].median()

print(f"Average charging rate: {avg_charging_rate:.2f} kWh/hour")

# Step 2: Get indices of missing rows before fixing them
missing_indices = df[df['end_plugout'].isna() & df['duration_hours'].isna()].index
print(f"Found {len(missing_indices)} missing sessions")

# Step 3: Estimate missing durations using average rate
df.loc[missing_indices, 'duration_hours'] = df.loc[missing_indices, 'el_kwh'] / avg_charging_rate

# Step 4: Calculate end_plugout from estimated duration
df.loc[missing_indices, 'end_plugout'] = df.loc[missing_indices, 'start_plugin'] + pd.to_timedelta(df.loc[missing_indices, 'duration_hours'], unit='h')

# Step 5: Update end_plugout_hour
df.loc[missing_indices, 'end_plugout_hour'] = df.loc[missing_indices, 'end_plugout'].dt.hour

# Step 6: Update duration_category based on estimated duration
for idx in missing_indices:
    duration = df.loc[idx, 'duration_hours']
    if duration > 18:
        df.loc[idx, 'duration_category'] = "More than 18 hours"
    elif duration > 15:
        df.loc[idx, 'duration_category'] = "Between 15 and 18 hours"
    elif duration > 12:
        df.loc[idx, 'duration_category'] = "Between 12 and 15 hours"
    elif duration > 9:
        df.loc[idx, 'duration_category'] = "Between 9 and 12 hours"
    elif duration > 6:
        df.loc[idx, 'duration_category'] = "Between 6 and 9 hours"
    elif duration > 3:
        df.loc[idx, 'duration_category'] = "Between 3 and 6 hours"
    else:
        df.loc[idx, 'duration_category'] = "Less than 3 hours"

print(f"Fixed {len(missing_indices)} missing sessions")
print(df.isna().sum())

Average charging rate: 1.52 kWh/hour
Found 0 missing sessions
Fixed 0 missing sessions
session_id              0
garage_id               0
user_id                 0
user_type               0
shared_id            5466
start_plugin            0
start_plugin_hour       0
end_plugout             0
end_plugout_hour        0
el_kwh                  0
duration_hours          0
month_plugin            0
weekdays_plugin         0
plugin_category         0
duration_category       0
dtype: int64


0            Less than 3 hours
1            Less than 3 hours
2       Between 6 and 9  hours
3           More than 18 hours
4            Less than 3 hours
                 ...          
6873     Between 3 and 6 hours
6874     Between 3 and 6 hours
6875         Less than 3 hours
6876         Less than 3 hours
6877     Between 3 and 6 hours
Name: duration_category, Length: 6878, dtype: category
Categories (7, object): ['Between 12 and 15 hours', 'Between 15 and 18 hours', 'Between 3 and 6 hours', 'Between 6 and 9  hours', 'Between 9 and 12 hours', 'Less than 3 hours', 'More than 18 hours']

In [19]:
# Fix duration mismatches
df['charging_duration'] = (df['end_plugout'] - df['start_plugin']).dt.total_seconds() / 3600
duration_diff = abs(df['duration_hours'] - df['charging_duration'])
df.loc[duration_diff > 0.1, 'duration_hours'] = df['charging_duration']
df = df.drop('charging_duration', axis=1)

In [20]:
category_columns = [
    'user_type', 'shared_id', 'month_plugin', 'weekdays_plugin', 'plugin_category', 'duration_category'
]
for col in category_columns:
    df[col] = df[col].astype('category')
df['end_plugout_hour'] = df['end_plugout_hour'].astype('int64')
print(df.dtypes)

IntCastingNaNError: Cannot convert non-finite values (NA or inf) to integer

In [None]:
#verifying start_plugin_hour
df['hour'] = df['start_plugin'].dt.hour


df['start_plugin_hour'] = np.where(
    df['start_plugin_hour'] != df['hour'],
    df['hour'],                             
    df['start_plugin_hour']                 
)

In [None]:
print("Mismatches left:", (df['start_plugin_hour'] != df['hour']).sum())
df=df.drop(columns='hour')

Mismatches left: 0


In [None]:
# aggregating data on hourly basis
df['hour'] = df['start_plugin'].dt.floor('h')

In [None]:
df.shape

(6844, 16)

In [None]:
df = df.drop_duplicates()

# Saving the df into a new bucket

In [None]:
'''
import boto3
from pathlib import Path

local_processed_dir = "C:/Users/GIGABYTE/Documents/ml/mlops/data/processed"
local_file_path = f"{local_processed_dir}/cleaned_ev_data.parquet"
s3_bucket = "ev-data"
s3_key = "processed/cleaned_ev_data.parquet"

# Create directory if it doesn't exist
Path(local_processed_dir).mkdir(parents=True, exist_ok=True)

# Save locally
df.to_parquet(local_file_path, 
                     engine='pyarrow', 
                     compression='snappy',
                     index=False)
print(f"✅ Cleaned data saved locally: {local_file_path}")

# Uploading to S3
s3 = boto3.client('s3', 
                 endpoint_url="http://localhost:4566",
                 aws_access_key_id="test", 
                 aws_secret_access_key="test")

s3.upload_file(local_file_path, s3_bucket, s3_key)
print(f"✅ Cleaned data uploaded to S3: s3://{s3_bucket}/{s3_key}")

# Verify upload
response = s3.list_objects_v2(Bucket=s3_bucket, Prefix="processed/")
if 'Contents' in response:
    print("Files in S3 processed folder:")
    for obj in response['Contents']:
        print(f"   - {obj['Key']}")'''

✅ Cleaned data saved locally: C:/Users/GIGABYTE/Documents/ml/mlops/data/processed/cleaned_ev_data.parquet
✅ Cleaned data uploaded to S3: s3://ev-data/processed/cleaned_ev_data.parquet
Files in S3 processed folder:
   - processed/cleaned_ev_data.parquet


In [None]:
# uploading the partitioned data from local to s3
'''
import boto3
import os

# LocalStack endpoint
LOCALSTACK_ENDPOINT = "http://localhost:4566"
s3 = boto3.client('s3', endpoint_url=LOCALSTACK_ENDPOINT,
                  aws_access_key_id="test", aws_secret_access_key="test")

# create bucket if not exists (LocalStack ignores region)
try:
    s3.create_bucket(Bucket="ev-data")
except Exception as e:
    print("create_bucket:", e)

# if you saved a partitioned folder, upload recursively:
import pathlib
folder = pathlib.Path('../data/processed/raw_parquet/trondheim_sessions_v1_partitioned')
for p in folder.rglob('*.parquet'):
    # compute key relative to folder
    relative = p.relative_to(folder)
    key = f'raw/trondheim_partitioned/{relative.as_posix()}'
    s3.upload_file(str(p), "ev-data", key)
    print("Uploaded:", key)
'''

'\nimport boto3\nimport os\n\n# LocalStack endpoint\nLOCALSTACK_ENDPOINT = "http://localhost:4566"\ns3 = boto3.client(\'s3\', endpoint_url=LOCALSTACK_ENDPOINT,\n                  aws_access_key_id="test", aws_secret_access_key="test")\n\n# create bucket if not exists (LocalStack ignores region)\ntry:\n    s3.create_bucket(Bucket="ev-data")\nexcept Exception as e:\n    print("create_bucket:", e)\n\n# if you saved a partitioned folder, upload recursively:\nimport pathlib\nfolder = pathlib.Path(\'../data/processed/raw_parquet/trondheim_sessions_v1_partitioned\')\nfor p in folder.rglob(\'*.parquet\'):\n    # compute key relative to folder\n    relative = p.relative_to(folder)\n    key = f\'raw/trondheim_partitioned/{relative.as_posix()}\'\n    s3.upload_file(str(p), "ev-data", key)\n    print("Uploaded:", key)\n'

# Verifying the files existance

In [None]:
# List all objects recursively in the bucket
response = s3.list_objects_v2(Bucket="ev-data-clean", Prefix="ev-clean/")

# Print each file
for obj in response.get('Contents', []):
    print(f"Key: {obj['Key']}, Size: {obj['Size']} bytes")

NoSuchBucket: An error occurred (NoSuchBucket) when calling the ListObjectsV2 operation: The specified bucket does not exist

# Reading from the s3 bucket

In [None]:
# downloading then reading the downloaded file
'''
download_path = '../data/downloaded/ev_downloaded.parquet'
s3.download_file("ev-data-clean", "ev-clean/year=2020/month=1/39b0d0debd45456488561c24e7466d83-0.parquet", download_path)
df2 = pd.read_parquet(download_path, engine='pyarrow')
df2.head()
'''

'\ndownload_path = \'../data/downloaded/ev_downloaded.parquet\'\ns3.download_file("ev-data-clean", "ev-clean/year=2020/month=1/39b0d0debd45456488561c24e7466d83-0.parquet", download_path)\ndf2 = pd.read_parquet(download_path, engine=\'pyarrow\')\ndf2.head()\n'