Skip to content

tmph2003/Batch-Processing-Project

Repository files navigation

Batch Processing : Data Pipeline for Recruitment Start Up

Table of Contents

1. Introduction

Data is collected from an start up company about their recruitment. We will build ETL pipelines which will transform raw data into actionable insights, store them in data lake (Cassandra), data warehouse (Mysql, Amazon Redshift) for enhanced data analytics capabilities.

Technologies used

  • Python
  • Pyspark
  • Cassandra
  • Mysql
  • Airflow
  • AWS services : S3, Redshift (data warehouse)
  • Docker
  • Grafana

2. Implementation overview

Design data models for OLTP database (Cassandra) and data warehouse (MySQL, Amazon Redshift). Generate data from MySQL to Cassandra. Build an ETL pipeline to transform raw data from Cassandra and store them in S3 for staging. Then implement another ETL pipeline which process data from S3 and load them to Amazon Redshift for enhanced data analytics. Using Airflow to orchestrate pipeline workflow and Docker to containerize the project - allow for fast build, test, and deploy project.

Airflow conceptual view

3. Design

Data model

Data model for Data Warehouse



Airflow Workflow Airflow workflow

4. Project Structure

Batch-Processing-Project
├── airflow
│   ├── dags
│   │   ├── create_redshift.py
│   │   ├── dags_setup.py
│   │   ├── ETL_Cassandra_S3.py
│   │   └── fake_data.py
│   └── logs
│       ├── dag_processor_manager
│       │   └── dag_processor_manager.log
│       └── scheduler
│           └── latest
├── assets
│   ├── airflow_workflow.png
│   ├── data_visualization.png
│   ├── data_warehouse.png
│   ├── ddl.png
│   ├── extract.png
│   ├── grafana_config.png
│   ├── load.png
│   ├── project_architect_overview.png
│   └── transform.png
├── docker-compose.yml
├── Input_Data
│   ├── data_cassandra
│   │   ├── search.csv
│   │   └── tracking.csv
│   └── data_mysql
│       ├── application.csv
│       ├── campaign.csv
│       ├── company.csv
│       ├── conversation.csv
│       ├── dev_user.csv
│       ├── events.csv
│       ├── group.csv
│       ├── job.csv
│       ├── job_location.csv
│       ├── master_city.csv
│       ├── master_major_category.csv
│       ├── master_minor_category.csv
│       ├── master_publisher.csv
│       ├── master_role.csv
│       ├── user_client_assignee.csv
│       └── user_role.csv
├── Makefile
├── README.md
├── requirements.txt
└── Transformed_Data
    └── Transformed_Data.csv

5. Settings

Prerequisites

  • AWS account
  • AWS Services(S3, Redshift)
  • Docker
  • Airflow

Important note

You must specify AWS credentials for each of the following

  • S3 access : Create an IAM user "S3-admin" with S3FullAccess
  • Redshift access : Create an IAM user "Redshift-admin" with RedshiftFulAccess policy

Running

#Make must be installed first

# Start docker containers on your local computer
make up

# Add pip packages
make python

6. Implementation

Import data from Input Data

6.1 Generate recruitment data into Data Lake (Cassandra)

  python ./airflow/dags/fake_data.py

Generate data file

6.2 DDL in Redshift

Airflow tasks

create_redshift = PythonOperator(
    task_id='create_redshift_schema_and_copy_data_to_redshift',
    python_callable=create_redshift_schema
)

6.3 Extract data from data lake

Airflow tasks

Extract = PythonOperator(
    task_id='extract_data_from_cassandra',
    python_callable=main_process.extract_data_from_cassandra,
)

6.4 Transform

Airflow tasks

process_click_data = PythonOperator(
    task_id='process_click_data',
    python_callable=main_process.process_click_data,
    op_args=[Extract.output]
)

process_conversion_data = PythonOperator(
    task_id='process_conversion_data',
    python_callable=main_process.process_conversion_data,
    op_args=[Extract.output]
)

process_qualified_data = PythonOperator(
    task_id='process_qualified_data',
    python_callable=main_process.process_qualified_data,
    op_args=[Extract.output]
)

process_unqualified_data = PythonOperator(
    task_id='process_unqualified_data',
    python_callable=main_process.process_unqualified_data,
    op_args=[Extract.output]
)
Transform = PythonOperator(
    task_id='Transform_data',
    python_callable=main_process.process_result,
    op_args=[process_click_data.output, process_conversion_data.output,
              process_qualified_data.output, process_unqualified_data.output]
)

6.5 Load data to Redshift

Airflow tasks

Load = PythonOperator(
    task_id='Load_data_to_s3',
    python_callable=main_process.write_data_to_s3,
    op_args=[Transform.output]
)

7. Visualize result

Connect redshift to grafana and visualize results

connect_grafana_redshift

Connect to grafana

Visualization

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages