# ETL Pipeline: From Raw Sensor CSV to Clean Database

This notebook covers the **E**xtract, **T**ransform, and **L**oad (ETL) process.

1.  **Extract:** We'll read the `raw_sensor_data.csv` file.
2.  **Transform:** We'll clean the data, handle missing values, and engineer a new feature.
3.  **Load:** We'll save the clean data into a SQLite database (`oilfield_data.db`) for our dashboard.

## 1. Setup

Import necessary libraries.

In [4]:
import pandas as pd
import sqlite3
import os

# Define file paths
RAW_DATA_PATH = 'raw_sensor_data.csv'
DB_PATH = 'oilfield_data.db'

## 2. EXTRACT (E)

Read the raw data from the CSV file generated by our script.

In [5]:
df = pd.read_csv("C:/Users/tarci/IdeaProjects/operational-data-pipeline/scripts/raw_sensor_data.csv")

print(f"Loaded {len(df)} rows of raw data.")
df.head()

Loaded 5050 rows of raw data.


Unnamed: 0,timestamp,rig_id,well_id,depth_m,rop_m_hr,wob_tons,torque_kNm,mud_pressure_psi
0,2025-10-25 06:00:00,RIG-001,WELL-101A,2.546784,30.561408,15.623125,19.492452,2520.727381
1,2025-10-25 06:05:00,RIG-002,WELL-101A,1.933487,23.201849,20.67253,18.773712,2755.545004
2,2025-10-25 06:10:00,RIG-001,WELL-101A,4.814381,27.211169,13.204087,26.997291,2427.802702
3,2025-10-25 06:15:00,RIG-002,WELL-101A,4.875088,35.299206,14.953101,18.672886,2464.998837
4,2025-10-25 06:20:00,RIG-001,WELL-101A,7.234371,29.039876,14.508497,21.671093,2361.908202


In [6]:
# Get initial info about our data
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5050 entries, 0 to 5049
Data columns (total 8 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   timestamp         5050 non-null   object 
 1   rig_id            5050 non-null   object 
 2   well_id           5050 non-null   object 
 3   depth_m           5050 non-null   float64
 4   rop_m_hr          5000 non-null   float64
 5   wob_tons          5050 non-null   float64
 6   torque_kNm        5050 non-null   float64
 7   mud_pressure_psi  5050 non-null   float64
dtypes: float64(5), object(3)
memory usage: 315.8+ KB


## 3. TRANSFORM (T)

This is the data cleaning and feature engineering step.

In [7]:
# Step 3.1: Fix data types
# The 'timestamp' column was loaded as a string (object), we need it as a datetime object.
df['timestamp'] = pd.to_datetime(df['timestamp'])

print("Corrected 'timestamp' data type:")
df.info()

Corrected 'timestamp' data type:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5050 entries, 0 to 5049
Data columns (total 8 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   timestamp         5050 non-null   datetime64[ns]
 1   rig_id            5050 non-null   object        
 2   well_id           5050 non-null   object        
 3   depth_m           5050 non-null   float64       
 4   rop_m_hr          5000 non-null   float64       
 5   wob_tons          5050 non-null   float64       
 6   torque_kNm        5050 non-null   float64       
 7   mud_pressure_psi  5050 non-null   float64       
dtypes: datetime64[ns](1), float64(5), object(2)
memory usage: 315.8+ KB


In [8]:
# Step 3.2: Handle Duplicates
# Our simulation script added some duplicates. Let's find and remove them.
print(f"Rows before duplicate removal: {len(df)}")
df = df.drop_duplicates()
print(f"Rows after duplicate removal: {len(df)}")

Rows before duplicate removal: 5050
Rows after duplicate removal: 5000


In [9]:
# Step 3.3: Handle Missing Values (NaN)
# Let's check which columns have missing data.
print("Missing values per column:")
print(df.isnull().sum())

Missing values per column:
timestamp            0
rig_id               0
well_id              0
depth_m              0
rop_m_hr            50
wob_tons             0
torque_kNm           0
mud_pressure_psi     0
dtype: int64


In [10]:
# The 'rop_m_hr' column has missing data.
# For time-series sensor data, a good strategy is 'forward-fill' (ffill).
# This assumes the sensor value was the same as the last known reading.
# We'll sort by timestamp first to make sure the fill is chronological.

df = df.sort_values(by='timestamp')
df['rop_m_hr'] = df['rop_m_hr'].ffill()

print("\nMissing values after ffill:")
print(df.isnull().sum())
# Any remaining NaNs (e.g., if the very first row was NaN) can be filled with 0.
df = df.fillna(0)


Missing values after ffill:
timestamp           0
rig_id              0
well_id             0
depth_m             0
rop_m_hr            0
wob_tons            0
torque_kNm          0
mud_pressure_psi    0
dtype: int64


In [11]:
# Step 3.4: Feature Engineering
# Let's create a new metric: 'drilling_efficiency'.
# This is a simplified metric. A real one would be more complex.
# We'll define it as ROP / (WOB * Torque). Higher is better.
# We need to handle potential division by zero if WOB or Torque is 0.

def calculate_efficiency(row):
    denominator = row['wob_tons'] * row['torque_kNm']
    if denominator == 0:
        return 0  # Avoid division by zero
    return row['rop_m_hr'] / denominator

df['drilling_efficiency'] = df.apply(calculate_efficiency, axis=1)

print("DataFrame with new 'drilling_efficiency' feature:")
df.head()

DataFrame with new 'drilling_efficiency' feature:


Unnamed: 0,timestamp,rig_id,well_id,depth_m,rop_m_hr,wob_tons,torque_kNm,mud_pressure_psi,drilling_efficiency
0,2025-10-25 06:00:00,RIG-001,WELL-101A,2.546784,30.561408,15.623125,19.492452,2520.727381,0.100355
1,2025-10-25 06:05:00,RIG-002,WELL-101A,1.933487,23.201849,20.67253,18.773712,2755.545004,0.059783
2,2025-10-25 06:10:00,RIG-001,WELL-101A,4.814381,27.211169,13.204087,26.997291,2427.802702,0.076334
3,2025-10-25 06:15:00,RIG-002,WELL-101A,4.875088,35.299206,14.953101,18.672886,2464.998837,0.126422
4,2025-10-25 06:20:00,RIG-001,WELL-101A,7.234371,29.039876,14.508497,21.671093,2361.908202,0.092362


In [12]:
# Step 3.5: Final Data Rounding
# Let's round our float columns to 2 decimal places for cleaner storage.
float_cols = df.select_dtypes(include='float').columns
df[float_cols] = df[float_cols].round(2)

df.head()

Unnamed: 0,timestamp,rig_id,well_id,depth_m,rop_m_hr,wob_tons,torque_kNm,mud_pressure_psi,drilling_efficiency
0,2025-10-25 06:00:00,RIG-001,WELL-101A,2.55,30.56,15.62,19.49,2520.73,0.1
1,2025-10-25 06:05:00,RIG-002,WELL-101A,1.93,23.2,20.67,18.77,2755.55,0.06
2,2025-10-25 06:10:00,RIG-001,WELL-101A,4.81,27.21,13.2,27.0,2427.8,0.08
3,2025-10-25 06:15:00,RIG-002,WELL-101A,4.88,35.3,14.95,18.67,2465.0,0.13
4,2025-10-25 06:20:00,RIG-001,WELL-101A,7.23,29.04,14.51,21.67,2361.91,0.09


## 4. LOAD (L)

Now that our data is clean and transformed, we load it into our SQLite database.
This database will be the 'single source of truth' for our dashboard.

In [13]:
# Establish a connection to the SQLite database.
# This will create the file 'oilfield_data.db' if it doesn't exist.
conn = sqlite3.connect(DB_PATH)

# Define table name
TABLE_NAME = 'drilling_metrics'

# Use pandas to_sql to load the DataFrame into the database.
# if_exists='replace': This will drop the table every time the ETL runs and create a new one.
# For a real production pipeline, you might use 'append'.
df.to_sql(TABLE_NAME, conn, if_exists='replace', index=False)

# Close the connection
conn.close()

print(f"Successfully loaded {len(df)} rows into '{TABLE_NAME}' table in '{DB_PATH}'")

Successfully loaded 5000 rows into 'drilling_metrics' table in 'oilfield_data.db'


## 5. Verification

Let's quickly read back from the database to make sure everything worked.

In [14]:
conn = sqlite3.connect(DB_PATH)

# Read data back using a SQL query
check_df = pd.read_sql(f'SELECT * FROM {TABLE_NAME} LIMIT 5', conn)

conn.close()

check_df

Unnamed: 0,timestamp,rig_id,well_id,depth_m,rop_m_hr,wob_tons,torque_kNm,mud_pressure_psi,drilling_efficiency
0,2025-10-25 06:00:00,RIG-001,WELL-101A,2.55,30.56,15.62,19.49,2520.73,0.1
1,2025-10-25 06:05:00,RIG-002,WELL-101A,1.93,23.2,20.67,18.77,2755.55,0.06
2,2025-10-25 06:10:00,RIG-001,WELL-101A,4.81,27.21,13.2,27.0,2427.8,0.08
3,2025-10-25 06:15:00,RIG-002,WELL-101A,4.88,35.3,14.95,18.67,2465.0,0.13
4,2025-10-25 06:20:00,RIG-001,WELL-101A,7.23,29.04,14.51,21.67,2361.91,0.09


**ETL Process Complete!**

Our clean, transformed data is now ready in `oilfield_data.db`. We can now run our Streamlit dashboard.