In [1]:
import pandas as pd
import requests
from sqlalchemy import create_engine, Column, String, Integer, Float
from sqlalchemy.orm import declarative_base, sessionmaker
import sqlite3
import mlflow
import schedule
import time

In [2]:
##Creating the DataBase for the first time

engine = create_engine('sqlite:///f1.db')

Base = declarative_base()
# creating the tables in the database
class Circuits(Base):
    __tablename__ = 'Circuits'
    
    CircuitId = Column(String, nullable=False,primary_key=True)
    CircuitName= Column(String, nullable=False)
    Country = Column(String, nullable=False)
    City = Column(String, nullable=False)
    Longitude = Column(Float)
    Latitude = Column(Float)
    Season = Column(Integer)
    
class Drivers(Base):
    __tablename__ = 'Drivers'
    
    DriverId = Column(String, nullable=False,primary_key=True)
    DriverName= Column(String, nullable=False)
    DOB = Column(String, nullable=False)
    Nationality = Column(String, nullable=False)
    Number = Column(Integer)
    Season = Column(Integer)
    
class Results(Base):
    __tablename__ = 'Results'
    
    id = Column(Integer, autoincrement=True, primary_key = True)
    RaceName = Column(String, nullable=False)
    Round= Column(Integer)
    Season = Column(Integer)
    CircuitName = Column(String, nullable=False)
    DriverName = Column(String, nullable=False)
    grid = Column(Integer)
    laps = Column(Integer)
    status = Column(String, nullable=False)
    position = Column(Integer)
    Time = Column(String, nullable=True)
    Constructor = Column(String, nullable=False)

    
Base.metadata.create_all(engine)

print('Warehouse successfully created  🔥')

Warehouse successfully created  🔥


In [3]:
%run components.py

In [4]:
#Creating the ETL Pipeline

def warehouse_pipeline():
    Session = sessionmaker(bind=engine)  # establisha connection to the database
    session = Session()
    #mlflow.set_tracking_uri("http://127.0.0.1:5000/")
    #mlflow.set_experiment("ETL")
    with mlflow.start_run(run_name = 'Warehouse_Pipeline') as run:  #using mlflow to log 5
        mlflow.set_tag('pipeline run', 'full run')
        
        print('Extracting the data...🚀')
        raw_data = extract()
        mlflow.log_metric('rows extracted', len(raw_data))
    
        print('Transforming the data...🚀')
        results = transform_results(raw_data)
        mlflow.log_dict({'columns extracted' :list(results.columns)}, "results.json")
    
        drivers = transform_drivers(raw_data)
        mlflow.log_dict({'columns extracted' :list(drivers.columns)}, "drivers.json")
    
        circuits = transform_circuits(raw_data)
        mlflow.log_dict({'columns extracted' :list(circuits.columns)}, "circuits.json")
    
        print('Loading data into warehouse...🚀')
        load_data([
            (results,Results),
            (drivers, Drivers),
            (circuits, Circuits)
        ], session)
    
        print('Pipeline successfully created 🔥')
        mlflow.set_tag('status', 'completed')
    session.close() # close the connection
    
if __name__ == "__main__":
    warehouse_pipeline()
    

Extracting the data...🚀
Transforming the data...🚀
Loading data into warehouse...🚀
Pipeline successfully created 🔥


In [None]:
# run the ETL Pipeline evey tuesday at 1:15pm
"""
schedule.every().tuesday.at("13:15").do(warehouse_pipeline)


while True:
    schedule.run_pending()
    time.sleep(30)
"""

In [None]:
# engine.dispose()    #useful as a last resort before deleting the database

In [None]:
#deleting the databse
"""
import os

db_file = "f1.db"

if os.path.exists(db_file):
    os.remove(db_file)
    print("Database deleted.")
else:
    print("Database does not exist.")
"""