In [2]:
# Airflow using Bash Script

# Task 1.0: Import Libraries
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator

# Task 1.1: DAG Arguments
default_args = {
    "owner": "Shounak",
    "start_date": days_ago(0),
    "emails": ["shounak@mail.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Task 1.2: DAG Definition
dag = DAG(
    dag_id="ETL_toll_data",
    default_args=default_args,
    description="Apache Airflow Final Assignment",
    schedule=timedelta(days=1),
)

# Task 1.3: Unzip Data
unzip_data = BashOperator(
    task_id="unzip_data",
    bash_command="sudo tar zxvf /home/project/airflow/dags/finalassignment/tolldata.tgz",
    dag=dag,
)

# Task 1.4: Extract data from .csv
extract_data_from_csv = BashOperator(
    task_id="extract_data_from_csv",
    bash_command='sudo cut -d"," -f1-4 /home/project/airflow/dags/finalassignment/vehicle-data.csv > '\
    '/home/project/airflow/dags/finalassignment/staging/csv_data.csv',
    dag=dag,
)

# Task 1.5: Extract data from .tsv
extract_data_from_tsv = BashOperator(
    task_id="extract_data_from_tsv",
    bash_command="sudo cut -f5-7 /home/project/airflow/dags/finalassignment/tollplaza-data.tsv > "\
    "/home/project/airflow/dags/finalassignment/staging/tsv_data.csv",
    dag=dag,
)

# Task 1.6: Extract data from fixed width file
extract_data_from_fixed_width = BashOperator(
    task_id="extract_data_from_fixed_width",
    bash_command="cut -c59-62,63-67 /home/project/airflow/dags/finalassignment/payment-data.txt > "\
    "/home/project/airflow/dags/finalassignment/staging/fixed_width_data.csv",
    dag=dag,
)

# Task 1.7: Consolidate data
consolidate_data = BashOperator(
    task_id="consolidate_data",
    bash_command="paste /home/project/airflow/dags/finalassignment/staging/csv_data.csv "\
    "/home/project/airflow/dags/finalassignment/staging/tsv_data.csv "\
    "/home/project/airflow/dags/finalassignment/staging/fixed_width_data.csv > "\
    "/home/project/airflow/dags/finalassignment/staging/extracted_data.csv",
    dag=dag,
)

# Task 1.8: Transform data
transform_data = BashOperator(
    task_id="transform_data",
    bash_command='tr "[:lower:]" "[:upper:]" < /home/project/airflow/dags/finalassignment/staging/extracted_data.csv > '\
    '/home/project/airflow/dags/finalassignment/staging/transformed_data.csv',
    dag=dag,
)

# Task 1.9: Task Pipeline
unzip_data >> extract_data_from_csv >> extract_data_from_tsv >> extract_data_from_fixed_width >> consolidate_data >> transform_data



ModuleNotFoundError: No module named 'airflow'

In [None]:
# Airflow using Bash Script

# Task 1.0: Import Libraries
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator

# Task 1.1: DAG Arguments
default_args = {
    "owner": "Shounak",
    "start_date": days_ago(0),
    "emails": ["shounak@mail.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Task 1.2: DAG Definition
dag = DAG(
    dag_id="ETL_toll_data",
    default_args=default_args,
    description="Apache Airflow Final Assignment",
    schedule=timedelta(days=1),
)

# Task 1.3: Unzip Data
unzip_data = BashOperator(
    task_id="unzip_data",
    bash_command="sudo tar zxvf /home/project/airflow/dags/finalassignment/tolldata.tgz",
    dag=dag,
)

# Task 1.4: Extract data from .csv
extract_data_from_csv = BashOperator(
    task_id="extract_data_from_csv",
    bash_command='sudo cut -d"," -f1-4 /home/project/airflow/dags/finalassignment/vehicle-data.csv > /home/project/airflow/dags/finalassignment/staging/csv_data.csv',
    dag=dag,
)

# Task 1.5: Extract data from .tsv
extract_data_from_tsv = BashOperator(
    task_id="extract_data_from_tsv",
    bash_command="sudo cut -f5-7 /home/project/airflow/dags/finalassignment/tollplaza-data.tsv > /home/project/airflow/dags/finalassignment/staging/tsv_data.csv",
    dag=dag,
)

# Task 1.6: Extract data from fixed width file
extract_data_from_fixed_width = BashOperator(
    task_id="extract_data_from_fixed_width",
    bash_command="cut -c59-62,63-67 /home/project/airflow/dags/finalassignment/payment-data.txt > /home/project/airflow/dags/finalassignment/staging/fixed_width_data.csv",
    dag=dag,
)

# Task 1.7: Consolidate data
consolidate_data = BashOperator(
    task_id="consolidate_data",
    bash_command="paste /home/project/airflow/dags/finalassignment/staging/csv_data.csv /home/project/airflow/dags/finalassignment/staging/tsv_data.csv /home/project/airflow/dags/finalassignment/staging/fixed_width_data.csv > /home/project/airflow/dags/finalassignment/staging/extracted_data.csv",
    dag=dag,
)

# Task 1.8: Transform data
transform_data = BashOperator(
    task_id="transform_data",
    bash_command='tr "[:lower:]" "[:upper:]" < /home/project/airflow/dags/finalassignment/staging/extracted_data.csv > /home/project/airflow/dags/finalassignment/staging/transformed_data.csv',
    dag=dag,
)

# Task 1.9: Task Pipeline
unzip_data >> extract_data_from_csv >> extract_data_from_tsv >> extract_data_from_fixed_width >> consolidate_data >> transform_data

# Kafka

1. Set up kafka:
`wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz`
`tar -xzf kafka_2.12-2.8.0.tgz`

2. Start mysql and create database tolldata and table in it with name livetolldata:
`start_mysql`
`mysql --host=127.0.0.1 --port=3306 --user=root --password=MjE2Njctc2hvdW5h`
`create database tolldata;`
`use tolldata;`
`create table livetolldata(timestamp datetime,vehicle_id int,vehicle_type char(15),toll_plaza_id smallint);`
`bye`

3. Install python module for kafka and mysql
`pip3 install kafka-python`
`pip3 install mysql-connector-python `

1. Start Zookeeper:
`cd kafka_2.12-2.8.0
bin/zookeeper-server-start.sh config/zookeeper.properties`

2. Starts Kafka:
`cd kafka_2.12-2.8.0
bin/kafka-server-start.sh config/server.properties`

3. Create Topic:
`cd kafka_2.12-2.8.0
bin/kafka-topics.sh --create --topic toll --bootstrap-server localhost:9092 `



Download Notebook using wget
`https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py`


Code in the file:
`    
"""
Top Traffic Simulator
"""
from time import sleep, time, ctime
from random import random, randint, choice
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')

TOPIC = 'set your topic here'

VEHICLE_TYPES = ("car", "car", "car", "car", "car", "car", "car", "car",
                 "car", "car", "car", "truck", "truck", "truck",
                 "truck", "van", "van")
for _ in range(100000):
    vehicle_id = randint(10000, 10000000)
    vehicle_type = choice(VEHICLE_TYPES)
    now = ctime(time())
    plaza_id = randint(4000, 4010)
    message = f"{now},{vehicle_id},{vehicle_type},{plaza_id}"
    message = bytearray(message.encode("utf-8"))
    print(f"A {vehicle_type} has passed by the toll plaza {plaza_id} at {now}.")
    producer.send(TOPIC, message)
    sleep(random() * 2)
`

In [None]:
Exercise 2 - Start Kafka
Task 2.1 - Start Zookeeper
Start zookeeper server.

Take a screenshot of the command you run.

Name the screenshot start_zookeeper.jpg. (Images can be saved with either the .jpg or .png extension.)

Task 2.2 - Start Kafka server
Start Kafka server

Take a screenshot of the command you run.

Name the screenshot start_kafka.jpg. (Images can be saved with either the .jpg or .png extension.)

Task 2.3 - Create a topic named toll
Create a Kakfa topic named toll

Take a screenshot of the command you run.

Name the screenshot create_toll_topic.jpg. (Images can be saved with either the .jpg or .png extension.)

Task 2.4 - Download the Toll Traffic Simulator
Download the toll_traffic_generator.py from the url given below using 'wget'.

https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/toll_traffic_generator.py

Open the code using the theia editor using the "Menu --> File -->Open" option.

Take a screenshot of the task code.

Name the screenshot download_simulator.jpg. (Images can be saved with either the .jpg or .png extension.)

Task 2.5 - Configure the Toll Traffic Simulator
Open the toll_traffic_generator.py and set the topic to toll.

Take a screenshot of the task code with the topic clearly visible.

Name the screenshot configure_simulator.jpg. (Images can be saved with either the .jpg or .png extension.)

Task 2.6 - Run the Toll Traffic Simulator
Run the toll_traffic_generator.py.

Hint : python3 <pythonfilename> runs a python program on the theia lab.

Take a screenshot of the output of the simulator.

Name the screenshot simulator_output.jpg. (Images can be saved with either the .jpg or .png extension.)

Task 2.7 - Configure streaming_data_reader.py
Download the streaming_data_reader.py from the url below using 'wget'.

https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0250EN-SkillsNetwork/labs/Final%20Assignment/streaming_data_reader.py

Open the streaming_data_reader.py and modify the following details so that the program can connect to your mysql server.

TOPIC

DATABASE

USERNAME

PASSWORD

Take a screenshot of the code you modified.

Name the screenshot streaming_reader_code.jpg. (Images can be saved with either the .jpg or .png extension.)

Task 2.8 - Run streaming_data_reader.py
Run the streaming_data_reader.py

Take a screenshot of the output of the streamingdatareader.py.

Name the screenshot data_reader_output.jpg. (Images can be saved with either the .jpg or .png extension.)

python3 streaming_data_reader.py
Task 2.9 - Health check of the streaming data pipeline.
If you have done all the steps till here correctly, the streaming toll data would get stored in the table livetolldata.

List the top 10 rows in the table livetolldata.

Take a screenshot of the command and the output.

Name the screenshot output_rows.jpg. (Images can be saved with either the .jpg or .png extension.)

This concludes the assignment.