# ETL Pipeline
To load data from SQL into python

In [5]:
from sqlalchemy import create_engine
import pandas as pd
import dask.dataframe as dd

# Extract
Load all 3 tables from SQL into pandas DataFrames

In [6]:
%%time
# connect to MS-SQL
server = "WATER\SQLEXPRESS" # SQL Server Name
database = "JustTaxi" # database name
con_string = f'mssql+pyodbc://{server}/{database}?driver=SQL Server'
engine = create_engine(con_string)

# retrieve data
connection = engine.connect()

# driver data
drivers = connection.execute('SELECT * FROM drivers')
driver_data = pd.DataFrame(data=drivers.fetchall(), columns=drivers.keys())

# trip data
trips = connection.execute('SELECT * FROM trips')
trip_data = pd.DataFrame(data=trips.fetchall(), columns=trips.keys())

connection.close() # close connection explicitly

Wall time: 660 ms


In [7]:
%%time
# get sensor data by chunksize
# connection = engine.connect().execution_options(stream_results=True)
sensor_data_generator = pd.read_sql_query('SELECT * FROM sensor_data', con_string, chunksize=100000)
sensor_data = pd.concat([chunk for chunk in sensor_data_generator])

Wall time: 7min


In [8]:
driver_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 148 entries, 0 to 147
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   driver_id      148 non-null    object
 1   name           148 non-null    object
 2   date_of_birth  148 non-null    object
 3   gender         148 non-null    object
 4   car_model      148 non-null    object
 5   car_make_year  148 non-null    object
 6   rating         148 non-null    object
dtypes: object(7)
memory usage: 8.2+ KB


In [9]:
trip_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20000 entries, 0 to 19999
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   booking_id    20000 non-null  object
 1   driver_id     20000 non-null  object
 2   safety_label  20000 non-null  object
dtypes: object(3)
memory usage: 468.9+ KB


In [10]:
sensor_data.info()  

<class 'pandas.core.frame.DataFrame'>
Int64Index: 7469656 entries, 0 to 69655
Data columns (total 11 columns):
 #   Column          Dtype  
---  ------          -----  
 0   booking_id      object 
 1   accuracy        float64
 2   bearing         float64
 3   acceleration_x  float64
 4   acceleration_y  float64
 5   acceleration_z  float64
 6   gyro_x          float64
 7   gyro_y          float64
 8   gyro_z          float64
 9   second          float64
 10  speed           float64
dtypes: float64(10), object(1)
memory usage: 683.9+ MB


# Transform
Clean and merge the DataFrames

## Remove duplicated data

In [11]:
# remove duplicated data from all 3 dataframes
driver_data = driver_data.drop_duplicates()
trip_data = trip_data.drop_duplicates()
sensor_data = sensor_data.drop_duplicates()

## Merge dataframes

In [12]:
%%time
# merge driver and trip data
driver_trips = trip_data.merge(driver_data, on='driver_id', how='left')

# merge driver_trips and sensor data
driver_trips_sensor = sensor_data.merge(driver_trips, on='booking_id', how='left')

Wall time: 4.1 s
