In [1]:
pip install kafka-python pandas avro-python3

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
     ---------------------------------------- 0.0/246.5 kB ? eta -:--:--
     ----------------- -------------------- 112.6/246.5 kB 2.2 MB/s eta 0:00:01
     -------------------------------------- 246.5/246.5 kB 3.8 MB/s eta 0:00:00
Collecting avro-python3
  Downloading avro-python3-1.10.2.tar.gz (38 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: avro-python3
  Building wheel for avro-python3 (setup.py): started
  Building wheel for avro-python3 (setup.py): finished with status 'done'
  Created wheel for avro-python3: filename=avro_python3-1.10.2-py3-none-any.whl size=44035 sha256=6b4edd808c083a4f34d1a2d8fab47baaa7ddbd65ff52f49a28bd785356a76344
  Stored in directory: c:\users\hp\appdata\local\pip\cache\wheels\5a\29\4d\510c0e098c49c5e49519f430481a5425e60b8752682d7b1e55
Successfully built avro-python3
Ins

## Data Ingestion:

In [1]:
from kafka import KafkaProducer
import json
import csv

In [None]:
# Kafka configuration
bootstrap_servers = 'localhost:9092'

# Initialize Kafka producers
producer_json = KafkaProducer(bootstrap_servers=bootstrap_servers,
                              value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer_csv = KafkaProducer(bootstrap_servers=bootstrap_servers)

# Define topics for different data sources
topic_ad_impressions = 'ad_impressions'
topic_clicks_conversions = 'clicks_conversions'
topic_bid_requests = 'bid_requests'

# Function to publish JSON data to Kafka
def publish_json_data(topic, data):
    producer_json.send(topic, value=data)
    producer_json.flush()

# Function to publish CSV data to Kafka
def publish_csv_data(topic, data):
    producer_csv.send(topic, value=data.encode('utf-8'))
    producer_csv.flush()

# Function to read CSV file and publish data to Kafka
def publish_csv_file_to_kafka(file_path, topic):
    with open(file_path, 'r') as file:
        csv_reader = csv.DictReader(file)
        for row in csv_reader:
            publish_csv_data(topic, ','.join(row.values()))


In [None]:
from kafka import KafkaProducer
from avro.datafile import DataFileReader
from avro.io import DatumReader

# Kafka configuration
bootstrap_servers = 'localhost:9092'

# Initialize Kafka producer
producer_avro = KafkaProducer(bootstrap_servers=bootstrap_servers)

# Define topic for bid requests
topic_bid_requests = 'bid_requests'

# Function to publish Avro data to Kafka
def publish_avro_data(topic, data):
    producer_avro.send(topic, value=data)
    producer_avro.flush()

# Function to read Avro file and publish data to Kafka
def publish_avro_file_to_kafka(file_path, topic):
    with open(file_path, 'rb') as avro_file:
        reader = DataFileReader(avro_file, DatumReader())
        for record in reader:
            publish_avro_data(topic, record)

# Example usage:
avro_file_path = r"C:\Users\HP\Downloads\userdata5.avro"
publish_avro_file_to_kafka(avro_file_path, topic_bid_requests)

## Data Processing:

In [2]:
import pandas as pd
from kafka import KafkaConsumer
import json

In [None]:
# Kafka configuration
bootstrap_servers = 'localhost:9092'

# Function to consume data from Kafka topic
def consume_data_from_kafka(topic):
    consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers,
                             value_deserializer=lambda x: json.loads(x.decode('utf-8')))
    for message in consumer:
        yield message.value

# Consume ad impressions data from Kafka
ad_impressions_data = consume_data_from_kafka('ad_impressions')
ad_impressions_df = pd.DataFrame(ad_impressions_data)

# Consume clicks/conversions data from Kafka
clicks_conversions_data = consume_data_from_kafka('clicks_conversions')
clicks_conversions_df = pd.DataFrame(clicks_conversions_data)

# Consume bid requests data from Kafka
bid_requests_data = consume_data_from_kafka('bid_requests')
# Assuming bid requests data is in JSON format
bid_requests_df = pd.DataFrame(bid_requests_data)

In [None]:
# Data Processing

# Merge ad impressions with clicks/conversions
merged_data = pd.merge(ad_impressions_df, clicks_conversions_df, on='user_id', how='left')

# Filter out invalid data
valid_data = merged_data.dropna(subset=['conversion_type'])

# Deduplicate data
deduplicated_data = valid_data.drop_duplicates()

# Correlate ad impressions with clicks/conversions
correlation_results = deduplicated_data.groupby(['ad_id', 'conversion_type']).size().reset_index(name='counts')

# Display processed data
print("Processed Data:")
print(correlation_results.head())

## Data Storage & Query Performance:

In [None]:
import os
import psycopg2

# Fetch Redshift cluster details from environment variables
redshift_cluster_endpoint = os.environ.get('REDSHIFT_CLUSTER_ENDPOINT')
redshift_port = os.environ.get('REDSHIFT_PORT')
redshift_dbname = os.environ.get('REDSHIFT_DBNAME')
redshift_username = os.environ.get('REDSHIFT_USERNAME')
redshift_password = os.environ.get('REDSHIFT_PASSWORD')

# Ensure all required environment variables are set
if not all([redshift_cluster_endpoint, redshift_port, redshift_dbname, redshift_username, redshift_password]):
    raise ValueError("One or more Redshift environment variables are not set")

# Connect to Redshift cluster
conn = psycopg2.connect(
    host=redshift_cluster_endpoint,
    port=redshift_port,
    dbname=redshift_dbname,
    user=redshift_username,
    password=redshift_password
)

# Create tables in Redshift
create_table_query = """
CREATE TABLE IF NOT EXISTS ad_campaign_data (
    ad_id INT,
    conversion_type VARCHAR(100),
    counts INT
)
"""
with conn.cursor() as cur:
    cur.execute(create_table_query)
conn.commit()

# Load processed data into Redshift 
copy_query = """
COPY ad_campaign_data FROM 's3://your-s3-bucket/.hidden-folder/processed_data.csv'
CREDENTIALS 'aws_access_key_id=your-access-key;aws_secret_access_key=your-secret-key'
CSV;
"""
with conn.cursor() as cur:
    cur.execute(copy_query)
conn.commit()

# Perform analytical queries
query = """
SELECT ad_id, conversion_type, SUM(counts) AS total_count
FROM ad_campaign_data
GROUP BY ad_id, conversion_type
ORDER BY total_count DESC;
"""
with conn.cursor() as cur:
    cur.execute(query)
    rows = cur.fetchall()
    for row in rows:
        print(row)

# Close connection
conn.close()

## Error Handling and Monitoring:

In [None]:
import logging

# Configure logging
logging.basicConfig(filename='data_pipeline.log', level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')

# function for data processing
def process_data(data):
    try:
        # data processing logic
        processed_data = data * 2
        return processed_data
    except Exception as e:
        # Log error and raise exception
        logging.error(f"Error processing data: {e}")
        raise

# usage of data processing function
try:
    input_data = 10
    output_data = process_data(input_data)
    print("Processed data:", output_data)
except Exception as e:
    print("An error occurred:", e)

import random
# for monitoring
def monitor_pipeline():
    # Simulate retrieving performance metrics
    throughput = random.randint(50, 100)  # throughput in messages per second
    latency = random.uniform(0.1, 0.5)     # latency in seconds

    # Check for anomalies or deviations from expected behavior
    if throughput < 70:
        send_alert(f"Low throughput detected: {throughput} messages per second")
    if latency > 0.3:
        send_alert(f"High latency detected: {latency} seconds")

    # Log metrics
    print(f"Throughput: {throughput} messages per second")
    print(f"Latency: {latency} seconds")

# for alerting mechanism
def send_alert(message):
    # Simulate sending alert notification
    print("Alert:", message)


# usage of alerting mechanism
try:
    # Monitor pipeline and trigger alerts if needed
    monitor_pipeline()
except Exception as e:
    # Send alert for pipeline failure
    send_alert(f"Data pipeline failure: {e}")