<a href="https://colab.research.google.com/github/usshaa/GoogleColab/blob/main/Airflow_with_python_data_preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install apache-airflow

Collecting apache-airflow
  Downloading apache_airflow-2.10.4-py3-none-any.whl.metadata (43 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/43.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━[0m [32m41.0/43.5 kB[0m [31m4.8 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m780.0 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting alembic<2.0,>=1.13.1 (from apache-airflow)
  Downloading alembic-1.14.1-py3-none-any.whl.metadata (7.4 kB)
Collecting argcomplete>=1.10 (from apache-airflow)
  Downloading argcomplete-3.5.3-py3-none-any.whl.metadata (16 kB)
Collecting asgiref>=2.3.0 (from apache-airflow)
  Downloading asgiref-3.8.1-py3-none-any.whl.metadata (9.3 kB)
Collecting colorlog>=6.8.2 (from apache-airflow)
  Downloading colorlog-6.9.0-py3-none-any.whl.metadata (10 kB)
Collecting configupdater>=3.1.1 (from apache-airflow)
  Downloadin

In [1]:
!curl -o screentime_analysis.csv https://statso.io/wp-content/uploads/2025/01/screentime_analysis.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  5523  100  5523    0     0  29233      0 --:--:-- --:--:-- --:--:-- 29377


In [2]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

# load the dataset
data = pd.read_csv('screentime_analysis.csv')

# check for missing values and duplicates
print(data.isnull().sum())
print(data.duplicated().sum())

# convert Date column to datetime and extract features
data['Date'] = pd.to_datetime(data['Date'])
data['DayOfWeek'] = data['Date'].dt.dayofweek
data['Month'] = data['Date'].dt.month

# encode the categorical 'App' column using one-hot encoding
data = pd.get_dummies(data, columns=['App'], drop_first=True)

# scale numerical features using MinMaxScaler
scaler = MinMaxScaler()
data[['Notifications', 'Times Opened']] = scaler.fit_transform(data[['Notifications', 'Times Opened']])

# feature engineering
data['Previous_Day_Usage'] = data['Usage (minutes)'].shift(1)
data['Notifications_x_TimesOpened'] = data['Notifications'] * data['Times Opened']

# save the preprocessed data to a file
data.to_csv('preprocessed_screentime_analysis.csv', index=False)

Date               0
App                0
Usage (minutes)    0
Notifications      0
Times Opened       0
dtype: int64
0


In [3]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error

# split data into features and target variable
X = data.drop(columns=['Usage (minutes)', 'Date'])
y = data['Usage (minutes)']

# train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# train the model
model = RandomForestRegressor(random_state=42)
model.fit(X_train, y_train)

# evaluate the model
predictions = model.predict(X_test)
mae = mean_absolute_error(y_test, predictions)
print(f'Mean Absolute Error: {mae}')

Mean Absolute Error: 15.398500000000002


In [4]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# define the data preprocessing function
def preprocess_data():
    file_path = 'screentime_analysis.csv'
    data = pd.read_csv(file_path)

    data['Date'] = pd.to_datetime(data['Date'])
    data['DayOfWeek'] = data['Date'].dt.dayofweek
    data['Month'] = data['Date'].dt.month

    data = data.drop(columns=['Date'])

    data = pd.get_dummies(data, columns=['App'], drop_first=True)

    scaler = MinMaxScaler()
    data[['Notifications', 'Times Opened']] = scaler.fit_transform(data[['Notifications', 'Times Opened']])

    preprocessed_path = 'preprocessed_screentime_analysis.csv'
    data.to_csv(preprocessed_path, index=False)
    print(f"Preprocessed data saved to {preprocessed_path}")

# define the DAG
dag = DAG(
    dag_id='data_preprocessing',
    schedule='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
)

# define the task
preprocess_task = PythonOperator(
    task_id='preprocess',
    python_callable=preprocess_data,
    dag=dag,
)

In [5]:
!airflow db init

DB: sqlite:////root/airflow/airflow.db
[[34m2025-02-05T04:10:51.043+0000[0m] {[34mmigration.py:[0m207} INFO[0m - Context impl [1mSQLiteImpl[22m.[0m
[[34m2025-02-05T04:10:51.045+0000[0m] {[34mmigration.py:[0m210} INFO[0m - Will assume [1mnon-transactional[22m DDL.[0m
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision  -> 5f2621c13b39
WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
Initialization done


In [None]:
!airflow webserver --port 8081

In [None]:
!hostname -I


172.28.0.12 


In [None]:
!pip install colab_ssh

Collecting colab_ssh
  Downloading colab_ssh-0.3.27-py3-none-any.whl.metadata (7.7 kB)
Downloading colab_ssh-0.3.27-py3-none-any.whl (26 kB)
Installing collected packages: colab_ssh
Successfully installed colab_ssh-0.3.27


In [6]:
!wget https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 -O cloudflared
!chmod +x cloudflared
!mv cloudflared /usr/local/bin/


--2025-02-05 04:11:01--  https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://github.com/cloudflare/cloudflared/releases/download/2025.1.1/cloudflared-linux-amd64 [following]
--2025-02-05 04:11:01--  https://github.com/cloudflare/cloudflared/releases/download/2025.1.1/cloudflared-linux-amd64
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/106867604/df805b98-f2b9-4323-ba62-1081c44a2925?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20250205%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250205T041101Z&X-Amz-Expires=300&X-Amz-Signature=dff7fa8fa006f2992bfd4c979cc1396bf4d9c2f3f8d8ef79ef3a3bf5d0eb04bb&X-Amz-S

In [7]:
!cloudflared --version

cloudflared version 2025.1.1 (built 2025-01-30-1649 UTC)


In [None]:
# !apt install cloudflared

In [None]:
!kill -9 2381

In [8]:
!airflow webserver --port 8081 -D

[[34m2025-02-05T04:11:16.102+0000[0m] {[34mconfiguration.py:[0m2112} INFO[0m - Creating new FAB webserver config file in: [1m/root/airflow/webserver_config.py[22m[0m
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8081
Timeout: 120
Logfiles: - -
Access Logformat: 
[[34m2025-02-05T04:11:16.329+0000[0m] {[34mdagbag.py:[0m588} INFO[0m - Filling up the DagBag from [1m/dev/null[22m[0m
[[34m2025-02-05T04:11:16.587+0000[0m] {[34moverride.py:[0m1508} INFO[0m - Inserted Role: [1mAdmin[22m[0m
[[34m2025-02-05T04:11:16.678+0000[0m] {[34moverride.py:[0m1912} INFO[0m - Created Permission View: [1mcan edit on Passwords[22m[0m
[[34m2025-02-05T04:11:16.692+0000[0m] {[34moverride.py:[0m1963} INFO[0m - Added Permission 

In [9]:
!airflow users create \
    --username admin \
    --password admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

[[34m2025-02-05T04:11:36.776+0000[0m] {[34mutils.py:[0m162} INFO[0m - NumExpr defaulting to 2 threads.[0m
[[34m2025-02-05T04:11:37.294+0000[0m] {[34moverride.py:[0m1597} INFO[0m - Added user [1madmin[22m[0m
User "admin" created with role "Admin"


In [11]:
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2025, 2, 2), schedule="0 0 * * *") as dag:
    # Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")

    @task()
    def airflow():
        print("airflow")

    # Set dependencies between tasks
    hello >> airflow()

In [None]:
!cloudflared tunnel --url http://localhost:8081

[90m2025-02-05T04:17:59Z[0m [32mINF[0m Thank you for trying Cloudflare Tunnel. Doing so, without a Cloudflare account, is a quick way to experiment and try it out. However, be aware that these account-less Tunnels have no uptime guarantee, are subject to the Cloudflare Online Services Terms of Use (https://www.cloudflare.com/website-terms/), and Cloudflare reserves the right to investigate your use of Tunnels for violations of such terms. If you intend to use Tunnels in production you should use a pre-created named tunnel by following: https://developers.cloudflare.com/cloudflare-one/connections/connect-apps
[90m2025-02-05T04:17:59Z[0m [32mINF[0m Requesting new quick Tunnel on trycloudflare.com...
[90m2025-02-05T04:18:03Z[0m [32mINF[0m +--------------------------------------------------------------------------------------------+
[90m2025-02-05T04:18:03Z[0m [32mINF[0m |  Your quick Tunnel has been created! Visit it at (it may take some time to be reachable):  |
[90m2025