### Section 1: Data pipelines

Installing required packages

In [48]:
%%capture

!pip install apache-airflow kubernetes
!pip install pyngrok
!pip install pandas

Setting up Airflow

In [49]:
%%capture
# Set up Airflow environment
import os
os.environ['AIRFLOW_HOME'] = '/content/airflow'

# Initialize the airflow database
!airflow db init

# Create admin user
!airflow users create \
    --username junyang \
    --firstname X \
    --lastname Y \
    --role Admin \
    --email admin@example.com \
    --password password

# Create DAGs directory
!mkdir -p /content/airflow/dags
!airflow info

Writing the DAG to the /dags/ folder

In [50]:
%%writefile /content/airflow/dags/section_1_data_pipelines_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import os

default_args = {
    'start_date': datetime(2024, 1, 22),
    'schedule_interval': "10 1 * * *",
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'email': 'example@admin.com'
}

def process_data(input_file_location):
  df = pd.read_csv(input_file_location)
  #To remove all rows with no name column
  df = df.dropna(subset=['name'])
  #To split name column up into first_name and last_name columns
  df['name'] = df['name'].astype(str)
  df[['first_name', 'last_name']] = df['name'].str.split(' ',n=1, expand = True)
  #To remove prepended zeros from price column, and convert back to numeric type from string
  df['price'] = df['price'].astype(str).str.lstrip('0')
  df['price'] = pd.to_numeric(df['price'], errors = 'coerce')
  #To create the above_100 column
  df['above_100'] = df['price'] > 100
  #To drop the original name column
  df.drop(columns = ['name'], inplace = True)
  original_file_name = os.path.basename(input_file_location)
  output_file_name = f'processed_{original_file_name}'
  output_path = os.path.join('/content/airflow/dags', output_file_name)
  #To output processed CSV files
  df.to_csv(output_path, index = False)


with DAG('section_1_data_pipelines_dag', default_args = default_args) as dag:

  process_dataset1_task = PythonOperator(
      task_id = 'process_dataset1_task',
      python_callable = process_data,
      op_kwargs = {'input_file_location': '/content/airflow/dags/dataset1.csv'}
  )

  process_dataset2_task = PythonOperator(
      task_id = 'process_dataset2_task',
      python_callable = process_data,
      op_kwargs = {'input_file_location': '/content/airflow/dags/dataset2.csv'}
  )

process_dataset1_task >> process_dataset2_task

Overwriting /content/airflow/dags/section_1_data_pipelines_dag.py


Starting the airflow webserver and scheduler

In [51]:

!nohup airflow webserver -p 8888 > webserver.log &

nohup: redirecting stderr to stdout


In [52]:
!nohup airflow scheduler > scheduler.log &

nohup: redirecting stderr to stdout


Using ngrok to get access the web UI for this instance of Airflow

In [54]:
from pyngrok import ngrok

#Replace the authtoken with your own authtoken including the pair of single quotes
ngrok.set_auth_token('2shIuMDntCePDqOPOBSCV24lQ15_36AfhLcpXwQHffHyyRy8n')

public_url = ngrok.connect(8888)
print(f"Airflow Web UI is available at: {public_url}")

Airflow Web UI is available at: NgrokTunnel: "https://fd79-34-48-13-194.ngrok-free.app" -> "http://localhost:8888"
