# Data Engineering Case Study
---------------

Imagine you are a data engineer working for AdvertiseX, a digital advertising technology company. AdvertiseX specializes in programmatic advertising and manages multiple online advertising campaigns for its clients. The company handles vast amounts of data generated by ad impressions, clicks, conversions, and more. Your role as a data engineer is to address the following challenges:

- ### Data Sources and Formats:

  - #### Ad Impressions:

    **Data Source:** AdvertiseX serves digital ads to various online platforms and websites. \
    **Data Format:** Ad impressions data is generated in JSON format, containing information such as ad creative ID, user ID, timestamp, and the website where the ad was displayed.

  - #### Clicks and Conversions:

    **Data Source:** AdvertiseX tracks user interactions with ads, including clicks and conversions (e.g., sign-ups, purchases). \
    **Data Format:** Click and conversion data is logged in CSV format and includes event timestamps, user IDs, ad campaign IDs, and conversion type.

  - #### Bid Requests:

    **Data Source:** AdvertiseX participates in real-time bidding (RTB) auctions to serve ads to users. \
    **Data Format:** Bid request data is received in a semi-structured format, mostly in Avro, and includes user information, auction details, and ad targeting criteria.

- ### Case Study Requirements:

  - #### Data Ingestion:

    Implement a scalable data ingestion system capable of collecting and processing ad impressions (JSON), clicks/conversions (CSV), and bid requests (Avro) data. \
    Ensure that the ingestion system can handle high data volumes generated in real-time and batch modes.

  - #### Data Processing:

    Develop data transformation processes to standardize and enrich the data. Handle data validation, filtering, and deduplication. \
    Implement logic to correlate ad impressions with clicks and conversions to provide meaningful insights.

  - #### Data Storage and Query Performance:

    Select an appropriate data storage solution for storing processed data efficiently, enabling fast querying for campaign performance analysis. \
    Optimize the storage system for analytical queries and aggregations of ad campaign data.

  - #### Error Handling and Monitoring:

    Create an error handling and monitoring system to detect data anomalies, discrepancies, or delays. \
    Implement alerting mechanisms to address data quality issues in real-time, ensuring that discrepancies are resolved promptly to maintain ad campaign effectiveness.

This Ad Tech case study scenario focuses on the challenges and data formats commonly encountered in the digital advertising industry. Candidates can use this information to design a data engineering solution that addresses the specific data processing and analysis needs of AdvertiseX.

----------------

# Solution
---------------

There are few ways to address the challenges mentioned above. However, based on different cloud platforms like GCP, AWS, etc. or Big data Platform (Hadoop), these service names may vary.   

- ### Let’s break down each requirement:

    - #### Data Ingestion
        - Scalable Data Ingestion System:
            - Set up a robust data ingestion pipeline capable of handling high volumes of real-time and batch data.
                - Google Pub/Sub, Apache Kafka or AWS Kinesis can be used for real-time streaming data ingestion.
                - Use Apache Airflow for periodic batch data loads.
        - Data Source-Specific Adapters:
            - Develop custom adapters for each data source (ad impressions, clicks/conversions, bid requests) which can handle data retrieval, format conversion, and initial validation.
            
                We can use `json` library to parse json and `avro` library to parse avro file. However, pandas provide a high level abstraction to load them as dataframe as such 

In [None]:
import pandas as pd
from fastavro import reader
from google.cloud import bigquery
from pymongo import MongoClient
from google.cloud import storage

# Data Ingestion

def ingest_ad_impressions(json_file_path):
    """
    Ingests ad impressions data from a JSON file.
    
    Args:
        json_file_path (str): Path to the JSON file containing ad impressions data.
    
    Returns:
        None
    """
    with open(json_file_path, 'r') as json_file:
        ad_impressions_data = pd.read_json(json_file)
        # Process and store ad impressions data (e.g., insert into a database)
        # Example: Insert into MongoDB or a relational database
        # ...

# Usage
json_file_path = 'path/to/ad_impressions.json'
ingest_ad_impressions(json_file_path)

def ingest_clicks_and_conversions(csv_file_path):
    """
    Ingests clicks and conversions data from a CSV file.
    
    Args:
        csv_file_path (str): Path to the CSV file containing clicks and conversions data.
    
    Returns:
        None
    """
    click_conversion_df = pd.read_csv(csv_file_path)
    # Process and store click/conversion data (e.g., filter, validate, enrich)
    # Example: Validate timestamps, join with campaign details
    # ...

# Usage
csv_file_path = 'path/to/clicks_conversions.csv'
ingest_clicks_and_conversions(csv_file_path)

def ingest_bid_requests(avro_file_path):
    """
    Ingests bid requests data from an Avro file.
    
    Args:
        avro_file_path (str): Path to the Avro file containing bid requests data.
    
    Returns:
        None
    """
    with open(avro_file_path, 'rb') as avro_file:
        avro_df = pd.DataFrame(reader(avro_file))
        # Process and store bid request data (e.g., extract user info, auction details)
        # Example: Extract user demographics, validate auction details
        # ...

# Usage
avro_file_path = 'path/to/bid_requests.avro'
ingest_bid_requests(avro_file_path)

- #### Data Processing
    - Standardization and Enrichment:
        - Transform raw data into a common schema for consistency.
            - Convert timestamps to a consistent format (e.g., ISO 8601)
            - Handle missing values (e.g., fill or drop)
            - Standardize column names (e.g., rename columns)
        - Enrich data by adding relevant metadata (e.g., campaign details, user demographics, Top of Funnel (TOF), Bottom of Funnel (BOF), conversion ratio, ).
        - Use tools like Apache Spark or Databricks for scalable data processing.
    - Correlation Logic:
        - Join ad impressions with clicks and conversions based on common identifiers (e.g., user ID, creative ID).
        - Calculate click-through rates (CTR) and conversion rates.
        - Identify successful campaigns and optimize targeting strategies.

In [None]:
# Data Processing

def correlate_impressions_with_clicks(impressions_df, clicks_df):
    """
    Correlates ad impressions with clicks.
    
    Args:
        impressions_df (pd.DataFrame): DataFrame containing ad impressions data.
        clicks_df (pd.DataFrame): DataFrame containing clicks data.
    
    Returns:
        pd.DataFrame: DataFrame with correlated data.
    """
    # Assuming both DataFrames have common identifiers (e.g., user ID, creative ID)
    # Merge ad impressions with clicks
    correlated_df = impressions_df.merge(clicks_df, on='user_id', how='left')
    return correlated_df

# Usage
correlated_data_df = correlate_impressions_with_clicks(ad_impressions_df, clicks_df)

- #### Data Storage and Query Performance
    - Storage Solution Selection:
        - Choose an appropriate storage system based on requirements:
            - **Data Warehouse:** For structured data (e.g., clicks/conversions), use solutions like Amazon Redshift, Google BigQuery, or Snowflake. Below is sample code to query with Bigquery with python client library.

    - Optimization for Analytical Queries
    
        - Create materialized views in BigQuery to pre-aggregate data for common queries (e.g., daily campaign performance).
        - Define the view using SQL and schedule refresh intervals.
        - Partition tables by hourly, daily, monthly, yearly (e.g., daily or hourly partitions) to improve query efficiency.
        - Use Clustering for field which have categorical data. 
        - Use ingestion-time partitioning for streaming data.

In [None]:
# Data Storage and Query Performance

def store_ad_impressions(dataframe):
    """
    Stores ad impressions data into MongoDB.
    
    Args:
        dataframe (pd.DataFrame): DataFrame containing ad impressions data.
    
    Returns:
        None
    """
    client = MongoClient('mongodb://localhost:27017/')
    db = client['advertising_db']
    collection = db['ad_impressions']

    # Insert data into MongoDB
    collection.insert_many(dataframe.to_dict('records'))

    # Close the connection
    client.close()

# Usage
store_ad_impressions(ad_impressions_df)

def upload_to_gcs(local_file_path, bucket_name, object_name):
    """
    Uploads a file to Google Cloud Storage (GCS).
    
    Args:
        local_file_path (str): Path to the local file to upload.
        bucket_name (str): Name of the GCS bucket.
        object_name (str): Name of the object to create in GCS.
    
    Returns:
        None
    """
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(object_name)

    # Upload the local file to GCS
    blob.upload_from_filename(local_file_path)

# Usage
local_file_path = 'path/to/local_file.avro'
bucket_name = 'your-gcs-bucket'
object_name = 'ad_impressions.avro'
upload_to_gcs(local_file_path, bucket_name, object_name)

- #### Error Handling and Monitoring

    - Set up monitoring tools (e.g., Grafana, or Google Cloud Monitoring) and create data quality dashboard to track data quality metrics (e.g., data completeness, latency).
    - Monitor data pipelines and identify bottlenecks or failures.
    - Implement automated retries for failed data ingestion or processing tasks.
    - Detect anomalies (e.g., sudden drops in impressions, high CTR) using statistical methods.
    - Trigger alerts via email, Slack, or SMS when discrepancies occur.

In [None]:
# Case Study Requirements: Error Handling and Monitoring

def monitor_data_quality():
    """
    Monitors data quality metrics and triggers alerts for anomalies.
    
    Args:
        None
    
    Returns:
        None
    """
    # Implement monitoring logic to detect anomalies, discrepancies, or delays
    # Trigger alerts via email, Slack, or SMS when discrepancies occur
    # Example: Monitor sudden drops in impressions, high CTR, etc.
    pass

# Usage
monitor_data_quality()

def handle_errors():
    """
    Handles errors and exceptions in the data pipeline.
    
    Args:
        None
    
    Returns:
        None
    """
    # Implement error handling mechanisms to address data anomalies, discrepancies, or delays
    # Example: Implement automated retries for failed data ingestion or processing tasks
    pass

# Usage
handle_errors()