<a href="https://colab.research.google.com/github/pkant-0/Power_BI-fabric/blob/main/Untitled54.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data ingestion layer

In [None]:
import pandas as pd
import numpy as np
from google.ads import GoogleAdsClient
from facebook_business.api import FacebookAdsApi
from azure.storage.blob import BlobServiceClient
import sqlalchemy

class DataIngestionManager:
    def __init__(self, config):
        self.config = config
        self.blob_service_client = BlobServiceClient.from_connection_string(config['azure_storage_connection_string'])

    def ingest_google_ads_data(self):
        """Ingest data from Google Ads API"""
        google_ads_client = GoogleAdsClient.load_from_storage()
        google_ads_service = google_ads_client.get_service('GoogleAdsService')

        query = '''
        SELECT
            campaign.id,
            metrics.impressions,
            metrics.clicks,
            metrics.cost_micros
        FROM campaign
        '''

        google_ads_data = google_ads_client.search(query=query)
        return self._transform_google_ads_data(google_ads_data)

    def ingest_meta_ads_data(self):
        """Ingest data from Meta Ads API"""
        FacebookAdsApi.init(access_token=self.config['meta_ads_token'])

        # Fetch ad performance data
        ad_insights = AdAccount(self.config['ad_account_id']).get_insights()
        return self._transform_meta_ads_data(ad_insights)

    def ingest_crm_data(self):
        """Extract CRM data using SQLAlchemy"""
        engine = sqlalchemy.create_engine(self.config['crm_database_connection'])

        query = """
        SELECT
            customer_id,
            total_spend,
            last_purchase_date
        FROM customer_transactions
        """

        crm_data = pd.read_sql(query, engine)
        return crm_data

    def ingest_excel_data(self, file_path):
        """Ingest manual Excel uploads"""
        excel_data = pd.read_excel(file_path)
        return excel_data

    def upload_to_blob_storage(self, data, container_name, blob_name):
        """Upload processed data to Azure Blob Storage"""
        blob_client = self.blob_service_client.get_blob_client(
            container=container_name,
            blob=blob_name
        )
        blob_client.upload_blob(data.to_csv(index=False))

# Data Transformation
def data_transformation_pipeline(ingested_data):
    """Standardize and clean ingested data"""
    # Data cleaning steps
    cleaned_data = (
        ingested_data
        .dropna()
        .reset_index(drop=True)
    )

    # Feature engineering
    cleaned_data['conversion_rate'] = (
        cleaned_data['conversions'] / cleaned_data['clicks']
    )

    return cleaned_data

# Machine learning model

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.preprocessing import StandardScaler
import mlflow

class MarketingPerformancePredictor:
    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()

    def prepare_data(self, data):
        """Prepare features and target variable"""
        features = [
            'spend',
            'impressions',
            'clicks',
            'conversion_rate'
        ]

        X = data[features]
        y = data['total_revenue']

        return train_test_split(X, y, test_size=0.2)

    def train_model(self, X_train, y_train):
        """Train Gradient Boosting Regressor"""
        with mlflow.start_run():
            # Scale features
            X_train_scaled = self.scaler.fit_transform(X_train)

            # Train model
            self.model = GradientBoostingRegressor(
                n_estimators=100,
                learning_rate=0.1,
                max_depth=3
            )
            self.model.fit(X_train_scaled, y_train)

            # Log model metrics
            mlflow.log_metric("r2_score", self.model.score(X_train_scaled, y_train))

    def predict(self, X_test):
        """Make predictions"""
        X_test_scaled = self.scaler.transform(X_test)
        return self.model.predict(X_test_scaled)

# Dashboard and visualization

In [None]:
import plotly.express as px
import dash
import dash_core_components as dcc
import dash_html_components as html

class MarketingDashboard:
    def __init__(self, data):
        self.data = data
        self.app = dash.Dash(__name__)

    def create_dashboard(self):
        """Create interactive marketing performance dashboard"""
        self.app.layout = html.Div([
            # Spend by Platform
            dcc.Graph(
                figure=px.bar(
                    self.data.groupby('platform')['spend'].sum(),
                    title='Total Spend by Platform'
                )
            ),

            # Conversion Rate Trends
            dcc.Graph(
                figure=px.line(
                    self.data,
                    x='date',
                    y='conversion_rate',
                    color='platform'
                )
            )
        ])

    def run_dashboard(self):
        self.app.run_server(debug=True)

# Orchestration and workflow

In [None]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def marketing_data_pipeline():
    # Initialize data ingestion
    ingestion_manager = DataIngestionManager(config)

    # Collect data from multiple sources
    google_ads_data = ingestion_manager.ingest_google_ads_data()
    meta_ads_data = ingestion_manager.ingest_meta_ads_data()
    crm_data = ingestion_manager.ingest_crm_data()

    # Combine and transform data
    combined_data = pd.concat([
        google_ads_data,
        meta_ads_data,
        crm_data
    ])

    transformed_data = data_transformation_pipeline(combined_data)

    # Train ML model
    predictor = MarketingPerformancePredictor()
    X_train, X_test, y_train, y_test = predictor.prepare_data(transformed_data)
    predictor.train_model(X_train, y_train)

    # Create dashboard
    dashboard = MarketingDashboard(transformed_data)
    dashboard.create_dashboard()

Key Components & Best Practices
Data Ingestion

Multi-source data collection
API integrations
Secure credential management
Error handling
Data Transformation

Standardized cleaning
Feature engineering
Data quality checks
Machine Learning

Gradient Boosting Regressor
MLflow for experiment tracking
Model performance logging
Visualization

Interactive Plotly Dash
Real-time updates
Multiple visualization types
Orchestration

Apache Airflow for workflow management
Scheduled data pipeline execution
Recommended Cloud Infrastructure
Azure Services
Data Factory
Blob Storage
Machine Learning Studio
Power BI
Databricks
Deployment Considerations
Containerize with Docker
Use Kubernetes for scaling
Implement CI/CD pipelines
Set up monitoring and logging
Estimated Cost & Performance
Monthly Estimated Cost:
500
−
500−1500
Data Processing: 1-10 TB/month
Update Frequency: Daily/Weekly
Latency: Near real-time
Compliance & Security
GDPR compliance
Data encryption
Role-based access control
Audit logging
This comprehensive solution provides a robust, scalable marketing data pipeline that integrates multiple data sources, applies machine learning for predictive insights, and creates interactive visualizations.