# Data Workflow Automation and Analysis Project

## Overview
The project focuses on establishing automated data workflows to streamline the process of data extraction, transformation, and loading (ETL). It involves creating Python scripts for database operations, formulating SQL queries for data manipulation, and configuring Apache Airflow DAGs for task scheduling.

## Aim
The primary aim of the project is to:
- Enhance efficiency by automating routine data tasks.
- Simplify the data analysis process, allowing for quick and accurate insights.
- Improve productivity by reducing manual data handling, freeing up resources for strategic work.

Overall, the project serves as a foundational framework for robust data pipelines, enabling sophisticated data management and informed decision-making.


# Task One Solution

## Database Connection and Table Creation

In [1]:
# Import necessary libraries
import clickhouse_connect  # Imports the library for connecting to ClickHouse databases.
import sqlite3  # Imports the library for interacting with SQLite databases.
from airflow import DAG  # Imports the DAG class from Airflow to define a workflow.
from airflow.operators.python import PythonOperator  # Imports the PythonOperator to execute Python functions.
from datetime import datetime, timedelta  # Imports datetime and timedelta for working with dates and time deltas.

# Connect to ClickHouse database
client = clickhouse_connect.get_client(
    host='github.demo.trial.altinity.cloud',  # The hostname of the ClickHouse server.
    port=8443,  # The port number for the connection.
    username='demo',  # The username for authentication.
    password='demo'  # The password for authentication.
)

# Show tables in ClickHouse
tables_query = "SHOW TABLES"  # SQL query to list all tables in the ClickHouse database.
tables = client.query_df(tables_query)  # Executes the query and stores the result in a DataFrame.
tables  # Displays the DataFrame containing the list of tables.


Unnamed: 0,name
0,airports
1,dockerhub_repos
2,events_local
3,events_local_2
4,github_events
5,github_events_aggregate
6,github_events_aggregate_2
7,github_events_aggregate_3
8,github_events_aggregate_4
9,github_user_merges


## Python Script Explanation

### Importing Libraries
- `clickhouse_connect`: This library is used to connect to ClickHouse databases, which are column-oriented DBMS (Database Management Systems) designed for online analytical processing.
- `sqlite3`: This is a library for interacting with SQLite databases, which are lightweight disk-based databases that don't require a separate server process.
- `airflow`: Specifically, the `DAG` class from Airflow is imported to define a Directed Acyclic Graph, which is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.
- `PythonOperator`: This is an operator from Airflow that executes a Python function.
- `datetime`, `timedelta`: These are imported from the `datetime` module and are used to work with dates and time differences.

### Database Connection
- A client object is created using the `clickhouse_connect.get_client` method to connect to a ClickHouse database with specified host, port, username, and password.

### Executing Queries
- A SQL query to show all tables in the ClickHouse database is defined as `tables_query`.
- This query is executed by the client, and the result is stored in a DataFrame called `tables`.
- Finally, the `tables` DataFrame is displayed, which contains the list of tables from the ClickHouse database.


## ClickHouse Trip Data Query Analysis

In [2]:
# Query trip data from ClickHouse
trip_data_query = """
SELECT
    formatDateTime(toStartOfMonth(pickup_datetime), '%Y-%m') AS month,  # Formats the pickup_datetime to the start of its month and aliases it as 'month'.
    AVG(CASE WHEN toDayOfWeek(pickup_datetime) = 6 THEN 1 ELSE 0 END) AS Sat_mean_trip_count,  # Calculates the average number of trips on Saturdays.
    AVG(CASE WHEN toDayOfWeek(pickup_datetime) = 6 THEN fare_amount ELSE NULL END) AS Sat_mean_fare_per_trip,  # Calculates the average fare amount for trips on Saturdays.
    AVG(CASE WHEN toDayOfWeek(pickup_datetime) = 6 THEN timestampDiff(MINUTE, pickup_datetime, dropoff_datetime) ELSE NULL END) AS Sat_mean_duration_per_trip,  # Calculates the average duration of trips on Saturdays in minutes.
    AVG(CASE WHEN toDayOfWeek(pickup_datetime) = 7 THEN 1 ELSE 0 END) AS Sun_mean_trip_count,  # Calculates the average number of trips on Sundays.
    AVG(CASE WHEN toDayOfWeek(pickup_datetime) = 7 THEN fare_amount ELSE NULL END) AS Sun_mean_fare_per_trip,  # Calculates the average fare amount for trips on Sundays.
    AVG(CASE WHEN toDayOfWeek(pickup_datetime) = 7 THEN timestampDiff(MINUTE, pickup_datetime, dropoff_datetime) ELSE NULL END) AS Sun_mean_duration_per_trip  # Calculates the average duration of trips on Sundays in minutes.
FROM
    tripdata  # Specifies the table 'tripdata' from which to retrieve the data.
WHERE
    pickup_datetime BETWEEN '2014-01-01' AND '2016-12-31'  # Filters the data for pickup dates between January 1, 2014, and December 31, 2016.
    AND toDayOfWeek(pickup_datetime) IN (6, 7)  # Further filters the data to include only Saturdays and Sundays.
GROUP BY
    month  # Groups the results by the 'month' alias.
ORDER BY
    month ASC  # Orders the results by 'month' in ascending order.
"""

result = client.query_df(trip_data_query)  # Executes the query and stores the result in a DataFrame.
result  # Displays the DataFrame containing the query results.


Unnamed: 0,month,Sat_mean_trip_count,Sat_mean_fare_per_trip,Sat_mean_duration_per_trip,Sun_mean_trip_count,Sun_mean_fare_per_trip,Sun_mean_duration_per_trip
0,2014-01,0.527918,11.351955,11.260639,0.472082,11.966331,10.985067
1,2014-02,0.533951,11.642552,11.957357,0.466049,11.955278,11.249512
2,2014-03,0.53894,11.884995,11.962606,0.46106,12.509796,11.849229
3,2014-04,0.532575,12.08222,12.485764,0.467425,12.781188,12.110808
4,2014-05,0.584213,12.528668,12.788996,0.415787,13.052713,12.493877
5,2014-06,0.47772,12.414685,12.358393,0.52228,13.369205,12.760957
6,2014-07,0.51926,12.229957,11.679602,0.48074,12.9275,11.495239
7,2014-08,0.524448,12.507249,12.321142,0.475552,13.11596,12.449803
8,2014-09,0.535193,12.478453,13.274195,0.464807,13.272657,13.122259
9,2014-10,0.531285,12.21671,12.754925,0.468715,13.19395,13.017479


## Task Explanation

The code snippet is a SQL query designed to extract and analyze trip data from a ClickHouse database. The query performs the following actions:

### Query Definition
- `trip_data_query`: A multi-line string that defines the SQL query.

### SQL Query Breakdown
- **Selection**: The query selects several calculated columns based on the `pickup_datetime` field from the `tripdata` table.
- **Formatting Date**: It uses `formatDateTime` to convert the `pickup_datetime` to the start of its respective month, labeling it as 'month'.
- **Calculating Averages for Saturdays**:
  - `Sat_mean_trip_count`: Calculates the average number of trips that occurred on Saturdays.
  - `Sat_mean_fare_per_trip`: Computes the average fare amount for trips on Saturdays.
  - `Sat_mean_duration_per_trip`: Determines the average duration of trips on Saturdays in minutes.
- **Calculating Averages for Sundays**:
  - `Sun_mean_trip_count`: Calculates the average number of trips that occurred on Sundays.
  - `Sun_mean_fare_per_trip`: Computes the average fare amount for trips on Sundays.
  - `Sun_mean_duration_per_trip`: Determines the average duration of trips on Sundays in minutes.

### Filtering and Grouping
- The query filters the data for `pickup_datetime` between January 1, 2014, and December 31, 2016, and only includes Saturdays and Sundays.
- It groups the results by the 'month' and orders them in ascending order.

### Execution and Result Display
- `result`: The `client.query_df` method executes the query and stores the results in a DataFrame.
- The DataFrame `result` is then displayed, showing the query results.


## SQLite Database Initialization and Table Creation

In [3]:
# Set up SQLite database
conn = sqlite3.connect('metrics.db')  # Connect to the SQLite database file 'metrics.db'.
cursor = conn.cursor()  # Create a cursor object to interact with the database.

# SQL query to create a new table if it doesn't exist
create_table_query = '''
CREATE TABLE IF NOT EXISTS metrics_table (
    month TEXT,  -- Column for the month, stored as text
    Sat_mean_trip_count REAL,  -- Column for average number of trips on Saturdays, stored as a real number
    Sat_mean_fare_per_trip REAL,  -- Column for average fare per trip on Saturdays, stored as a real number
    Sat_mean_duration_per_trip REAL,  -- Column for average duration per trip on Saturdays, stored as a real number
    Sun_mean_trip_count REAL,  -- Column for average number of trips on Sundays, stored as a real number
    Sun_mean_fare_per_trip REAL,  -- Column for average fare per trip on Sundays, stored as a real number
    Sun_mean_duration_per_trip REAL  -- Column for average duration per trip on Sundays, stored as a real number
);
'''

# Execute the SQL query using the cursor
cursor.execute(create_table_query)
# Commit the transaction to the database
conn.commit()
# Close the connection to the database
conn.close()


## Task Explanation

The provided code snippet is for setting up a SQLite database and creating a new table within it. Here's a step-by-step breakdown of what the code does:

### Set Up SQLite Database
- **Connection Establishment**: The script starts by connecting to an SQLite database file named `metrics.db`. If the file doesn't exist, SQLite will create it.
- **Cursor Creation**: A cursor object is created using the `cursor()` method. This object is used to execute SQL commands.

### SQL Query for Table Creation
- **Create Table Query**: A SQL query is defined in a multi-line string (`create_table_query`). This query creates a new table named `metrics_table` if it doesn't already exist.
- **Table Structure**: The table is structured to have columns for the month (`TEXT`) and various metrics for Saturdays and Sundays (`REAL`), such as the average number of trips, average fare per trip, and average duration per trip.

### Execute and Commit
- **Execute Query**: The cursor executes the `create_table_query` to create the `metrics_table`.
- **Commit Changes**: The `commit()` method is called to save the changes made by the SQL query.

### Close Connection
- **Close Database Connection**: Finally, the `close()` method is called to close the connection to the database.

This setup is typically used at the beginning of a script that intends to store and manipulate data within a local SQLite database.

## Airflow DAG and Task Definition
This code should be saved as a python file with the extention .py and place in the /home/airflow/dags folder. This is just for demonstration purpose to show the complete code implementation 

In [4]:
# Define Airflow DAG
default_args = {
    'owner': 'airflow',  # The username of the person who owns this DAG.
    'depends_on_past': False,  # If True, a task will only run if the previous run succeeded.
    'start_date': datetime(2024, 3, 7),  # The start date of the DAG; tasks won't run before this date.
    'email': ['ndubuisijoseph47@gmail.com'],  # Email address to send failure or retry notifications.
    'email_on_failure': False,  # If True, Airflow will send an email on task failure.
    'email_on_retry': False,  # If True, Airflow will send an email on task retry.
    'retries': 1,  # The number of retries that should be attempted on task failure.
    'retry_delay': timedelta(minutes=5),  # The delay between retry attempts.
}

dag = DAG(
    'metrics_dag',  # The unique identifier for this DAG.
    default_args=default_args,  # The default configuration for the tasks in this DAG.
    description='A simple DAG to fetch metrics',  # A brief description of this DAG's purpose.
    schedule=timedelta(days=1),  # The time interval between consecutive DAG runs.
    catchup=False  # This parameter determines whether to backfill past runs
)

# Define the Python function to be used as an Airflow task
def fetch_and_write_metrics():
    # This function will contain the logic to fetch and write metrics when called.
    
    # Connect to SQLite database
    conn = sqlite3.connect('metrics.db')  # Establishes a connection to the 'metrics.db' SQLite database.
    cursor = conn.cursor()  # Creates a cursor object to interact with the database.
    
    # Fetch data from ClickHouse and write to SQLite
    result.to_sql('metrics_table', conn, if_exists='replace', index=False)  # Writes the 'result' DataFrame to the 'metrics_table' in the database, replacing it if it exists.
    
    # Commit the changes and close the connection
    conn.commit()  # Commits any changes made during the database transaction.
    conn.close()  # Closes the connection to the database to free up resources.

# Create Airflow task
fetch_metrics_task = PythonOperator(
    task_id='fetch_and_write_metrics',  # The unique identifier for this task.
    python_callable=fetch_and_write_metrics,  # The Python function this task will call.
    dag=dag,  # The DAG to which this task is attached.
)


## Airflow DAG Setup Explanation

The code snippet defines an Airflow Directed Acyclic Graph (DAG) for scheduling tasks. It includes setting up default arguments for the DAG, defining a Python function as a task, and creating an Airflow task using the `PythonOperator`. Here's a detailed breakdown:

### Default Arguments
- **Owner**: Specifies the owner of the DAG, typically the username of the person responsible for the workflow.
- **Depends on Past**: Determines whether the task should depend on the success of the previous run.
- **Start Date**: Sets the start date for the DAG; tasks will not execute before this date.
- **Email Configuration**: Provides an email address for sending notifications and sets preferences for failure and retry notifications.
- **Retries and Delay**: Configures the number of retries on task failure and the delay between retries.

### DAG Definition
- **DAG ID**: A unique identifier for the DAG.
- **Default Args**: The default configuration for tasks within the DAG.
- **Description**: A brief description of the DAG's purpose.
- **Schedule**: The interval at which the DAG will run.

### Python Function for Task
- **Function Definition**: A Python function named `fetch_and_write_metrics` is defined to contain the logic for fetching and writing metrics.
- **Database Connection**: Establishes a connection to an SQLite database and creates a cursor for executing database operations.
- **Data Handling**: The function writes data from a DataFrame to an SQLite table and commits the changes.

### Airflow Task Creation
- **Task ID**: A unique identifier for the task within the DAG.
- **Python Callable**: The Python function that the task will execute.
- **DAG Attachment**: Associates the task with the defined DAG.

This setup is used to automate the process of fetching and writing metrics data, allowing for regular updates and maintenance of the data set.

## Airflow Task Testing Command Explanation

In [6]:
!airflow tasks test metrics_dag fetch_and_write_metrics 2024-03-08

[[34m2024-03-08T14:27:34.969+0100[0m] {[34mdagbag.py:[0m540} INFO[0m - Filling up the DagBag from [01m/home/techscholarhub/airflow/dags[22m[0m
[[34m2024-03-08T14:27:35.903+0100[0m] {[34mtaskinstance.py:[0m1979} INFO[0m - Dependencies all met for dep_context=[01mnon-requeueable deps[22m ti=[01m<TaskInstance: metrics_dag.fetch_and_write_metrics __airflow_temporary_run_2024-03-08T12:53:04.456889+00:00__ [success]>[22m[0m
[[34m2024-03-08T14:27:35.907+0100[0m] {[34mtaskinstance.py:[0m1979} INFO[0m - Dependencies all met for dep_context=[01mrequeueable deps[22m ti=[01m<TaskInstance: metrics_dag.fetch_and_write_metrics __airflow_temporary_run_2024-03-08T12:53:04.456889+00:00__ [success]>[22m[0m
[[34m2024-03-08T14:27:35.907+0100[0m] {[34mtaskinstance.py:[0m2193} INFO[0m - Starting attempt 2 of 2[0m
[[34m2024-03-08T14:27:35.908+0100[0m] {[34mtaskinstance.py:[0m2214} INFO[0m - Executing [01m<Task(PythonOperator): fetch_and_write_metrics>[22m on [01m2024-

### Airflow Task Testing Command Explanation

The command `!airflow tasks test metrics_dag fetch_and_write_metrics` is used to test a specific task within an Airflow DAG. Here's a breakdown of the command:

- `!`: In a Jupyter notebook, this character allows you to run shell commands directly from the notebook cells.
- `airflow`: This is the command-line utility for Airflow, used to manage Airflow DAGs and tasks.
- `tasks`: A subcommand of the Airflow CLI to perform operations on individual tasks.
- `test`: Runs a single task within a DAG in isolation, without checking dependencies or recording its state.
- `metrics_dag`: The identifier of the DAG containing the task.
- `fetch_and_write_metrics`: The identifier of the task to be tested.

When this command is executed, Airflow will run the `fetch_and_write_metrics` task as if it were part of the scheduled DAG runs, but without any dependencies or scheduling. It's a way to debug and ensure that the task functions as expected.
