<a href="https://colab.research.google.com/github/veydantkatyal/mlops-apache-overflow/blob/main/apache_overflow_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **DATA PREPARATION**

**IMPORT LIBRARIES**

In [1]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

**DATA LOADING**

In [2]:
data = pd.read_csv('screentime_analysis.csv')

**REMOVE DUPLICATES**

In [9]:
print(data.isnull().sum())

Date                           0
Usage (minutes)                0
Notifications                  0
Times Opened                   0
DayOfWeek                      0
Month                          0
App_Facebook                   0
App_Instagram                  0
App_LinkedIn                   0
App_Netflix                    0
App_Safari                     0
App_WhatsApp                   0
App_X                          0
Previous_Day_Usage             1
Notifications_x_TimesOpened    0
dtype: int64


In [10]:
print(data.duplicated().sum())

0


# **DATA PRE-PROCESSING**

In [4]:
data['Date'] = pd.to_datetime(data['Date'])
data['DayOfWeek'] = data['Date'].dt.dayofweek
data['Month'] = data['Date'].dt.month

# encode the categorical 'App' column using one-hot encoding
data = pd.get_dummies(data, columns=['App'], drop_first=True)

# scale numerical features using MinMaxScaler
scaler = MinMaxScaler()
data[['Notifications', 'Times Opened']] = scaler.fit_transform(data[['Notifications', 'Times Opened']])

# feature engineering
data['Previous_Day_Usage'] = data['Usage (minutes)'].shift(1)
data['Notifications_x_TimesOpened'] = data['Notifications'] * data['Times Opened']

data.to_csv('preprocessed_screentime_analysis.csv', index=False)

# **MODEL TRAINING**

In [5]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error

# split data into features and target variable
X = data.drop(columns=['Usage (minutes)', 'Date'])
y = data['Usage (minutes)']

In [6]:
# train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# train the model
model = RandomForestRegressor(random_state=42)
model.fit(X_train, y_train)

In [7]:
# evaluate the model
predictions = model.predict(X_test)
mae = mean_absolute_error(y_test, predictions)
print(f'Mean Absolute Error: {mae}')

Mean Absolute Error: 15.398500000000002


In [8]:
!pip install apache-airflow

Collecting apache-airflow
  Downloading apache_airflow-2.10.5-py3-none-any.whl.metadata (45 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/45.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.4/45.4 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting alembic<2.0,>=1.13.1 (from apache-airflow)
  Downloading alembic-1.15.1-py3-none-any.whl.metadata (7.2 kB)
Collecting argcomplete>=1.10 (from apache-airflow)
  Downloading argcomplete-3.6.0-py3-none-any.whl.metadata (16 kB)
Collecting asgiref>=2.3.0 (from apache-airflow)
  Downloading asgiref-3.8.1-py3-none-any.whl.metadata (9.3 kB)
Collecting colorlog>=6.8.2 (from apache-airflow)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Collecting configupdater>=3.1.1 (from apache-airflow)
  Downloading ConfigUpdater-3.2-py2.py3-none-any.whl.metadata (10 kB)
Collecting connexion<3.0,>=2.14.2 (from connexion[flask]<3.0,>=2.14.2->apache-airf

In [11]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# define the data preprocessing function
def preprocess_data():
    file_path = 'screentime_analysis.csv'
    data = pd.read_csv(file_path)

    data['Date'] = pd.to_datetime(data['Date'])
    data['DayOfWeek'] = data['Date'].dt.dayofweek
    data['Month'] = data['Date'].dt.month

    data = data.drop(columns=['Date'])

    data = pd.get_dummies(data, columns=['App'], drop_first=True)

    scaler = MinMaxScaler()
    data[['Notifications', 'Times Opened']] = scaler.fit_transform(data[['Notifications', 'Times Opened']])

    preprocessed_path = 'preprocessed_screentime_analysis.csv'
    data.to_csv(preprocessed_path, index=False)
    print(f"Preprocessed data saved to {preprocessed_path}")


In [12]:
# define the DAG
dag = DAG(
    dag_id='data_preprocessing',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
)

In [13]:
# define the task
preprocess_task = PythonOperator(
    task_id='preprocess',
    python_callable=preprocess_data,
    dag=dag,
)

In [15]:
!airflow db init

DB: sqlite:////root/airflow/airflow.db
[[34m2025-03-19T08:24:02.460+0000[0m] {[34mmigration.py:[0m207} INFO[0m - Context impl [1mSQLiteImpl[22m.[0m
[[34m2025-03-19T08:24:02.464+0000[0m] {[34mmigration.py:[0m210} INFO[0m - Will assume [1mnon-transactional[22m DDL.[0m
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision  -> 5f2621c13b39
WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
Initialization done


In [17]:
!airflow webserver --port 8080

[[34m2025-03-19T08:24:17.335+0000[0m] {[34mconfiguration.py:[0m2112} INFO[0m - Creating new FAB webserver config file in: [1m/root/airflow/webserver_config.py[22m[0m
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
Access Logformat: 
[[34m2025-03-19T08:24:19.917+0000[0m] {[34moverride.py:[0m1526} INFO[0m - Inserted Role: [1mAdmin[22m[0m
[[34m2025-03-19T08:24:20.016+0000[0m] {[34moverride.py:[0m1930} INFO[0m - Created Permission View: [1mcan edit on Passwords[22m[0m
[[34m2025-03-19T08:24:20.033+0000[0m] {[34moverride.py:[0m1981} INFO[0m - Added Permission [1mcan edit on Passwords[22m to role [1mAdmin[22m[0m
[[34m2025-03-19T08:24:20.059+0000[0m] {[34moverride.py:[0m1930} I

In [18]:
!airflow scheduler

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[[34m2025-03-19T08:25:21.649+0000[0m] {[34mutils.py:[0m162} INFO[0m - NumExpr defaulting to 2 threads.[0m
[[34m2025-03-19T08:25:22.077+0000[0m] {[34mexecutor_loader.py:[0m258} INFO[0m - Loaded executor: [1mSequentialExecutor[22m[0m
[2025-03-19 08:25:22 +0000] [32620] [INFO] Starting gunicorn 23.0.0
[2025-03-19 08:25:22 +0000] [32620] [INFO] Listening at: http://[::]:8793 (32620)
[2025-03-19 08:25:22 +0000] [32620] [INFO] Using worker: sync
[[34m2025-03-19T08:25:22.142+0000[0m] {[34mscheduler_job_runner.py:[0m950} INFO[0m - Starting the scheduler[0m
[[34m2025-03-19T08:25:22.143+0000[0m] {[34mscheduler_job_runner.py:[0m957} INFO[0m - Processing each file at most -1 times[0m
[2025-03-19 08:25:22 +0000] [32621] [INFO] Booting worker