In [1]:

import pandas as pd
from pandas import json_normalize
import json

import re

import requests
from requests.auth import HTTPDigestAuth
from urllib.parse import urlparse, parse_qs


import os
from google.cloud import bigquery

# Thinking through the Medallion architecture 

In [2]:
# bronze layer
class extraction_layer:
    def __init__(self, _url, username= None, password= None, api_key= None):
        self._url= _url
        self.headers= {}
        if username and password:
            self.auth = HTTPDigestAuth(username, password)
            
        if api_key:
            self.headers['x-api-key'] = api_key
            
    
    def connect_(self, endpoint= None):
        
        url= f"{self._url}{endpoint}"
        response = requests.get(url, auth=self.auth, headers=self.headers)
        
        if response.status_code == 200:
            print(f'Connection successful: {response.status_code}')
            return response# .json()
        else:
            print(f'Error: {response.status_code}')
            return None
            

In [3]:
# Silver layer
class data_t:
    
    # try exception can used to any error
    #def __init__(self, data=None):
     #   self.data = data  # Instance variable to hold data if needed
    
    def extract_utm_params(self, url):
        parsed_url = urlparse(url)
        query_params = parse_qs(parsed_url.query)
        # Extract parameters, defaulting to None if not present
        utm_source = query_params.get('utm_source', [None])[0]
        utm_campaign = query_params.get('utm_campaign', [None])[0]
        utm_medium = query_params.get('utm_medium', [None])[0]
        gclid = query_params.get('gclid', [None])[0]
        return pd.Series([utm_source, utm_campaign, utm_medium, gclid])
         
    def map_channel(self, row):
        # Rule 1- If `utm_medium` matches organic then channel is "Organic Search"
        if row['utm_medium'] and row['utm_medium'].strip().lower() == 'organic': # to avoid AttributeError: 'NoneType' object has no attribute 'strip', checking row['utm_medium'] is not None before calling strip()
            return 'Organic Search'
        # Rule 2- If `utm_medium` matches ^(social|sm).* then channel is "Social"
        elif re.match(r'^(social|sm).*', str(row['utm_medium']), re.IGNORECASE):
            return 'Social'
        # Rule 3- If `utm_source` matches ^(facebook|instagram|ig|fb|linkedin)$ then channel is "Social"
        elif re.match(r'^(facebook|instagram|ig|fb|linkedin)$', str(row['utm_source']), re.IGNORECASE):
            return 'Social'
        # Rule 4-  If `utm_medium` matches email then channel is "Email"
        elif row['utm_medium'] and row['utm_medium'].strip().lower() == 'email': # to avoid AttributeError: 'NoneType' object has no attribute 'strip'
            return 'Email'
        # Rule 5- If `utm_medium` matches ^(cpc|ppc|paid|paidsearch)$ then channel is "Paid Search"
        elif re.match(r'^(cpc|ppc|paid|paidsearch)$', str(row['utm_medium']), re.IGNORECASE):
            return 'Paid Search'
        # Rule 6- If `gclid` is not NULL then channel is "Paid Search"
        elif pd.notna(row['gclid']):
            return 'Paid Search'
        # Rule 7- If `utm_source` matches .*(google|yahoo).* then channel is "Paid Search"
        elif re.search(r'(google|yahoo)', str(row['utm_source']), re.IGNORECASE):
            return 'Paid Search'
        # Rule 8- If `utm_source`, `utm_campaign`, and `utm_medium` are NULL and referrer_domain is not then channel is "Referral"

        elif pd.isna(row['utm_source']) and pd.isna(row['utm_campaign']) and pd.isna(row['utm_medium']) and pd.notna(row['referrer_domain']):
            return 'Referral'
        # Rule 9- If `utm_source`, `utm_campaign`, `utm_medium`, and `referrer_domain` are NULL then channel is "Direct"
        elif pd.isna(row['utm_source']) and pd.isna(row['utm_campaign']) and pd.isna(row['utm_medium']) and pd.isna(row['referrer_domain']):
            return 'Direct'
        # Rule 10
        else:
            return 'Unknown'

    def trans(self, response):
        print(response)
        r = response.text
        #print(r)
        jsondata = json.loads(r)
        df = pd.json_normalize(jsondata)
        df_n = df

        df_n[['utm_source', 'utm_campaign', 'utm_medium', 'gclid']] = df_n['location'].apply(self.extract_utm_params)
        
        df_n['channel'] = df_n.apply(self.map_channel, axis=1)
        return df_n

In [4]:
conn_obj= extraction_layer( _url='https://de-demo-api.adtriba.app',
    username='de-demo-api',
    password='woope1Pei5zieg',
    api_key= 'woope1Pei5zieg')

In [5]:
response= conn_obj.connect_('/v1/api/data')

Connection successful: 200


In [6]:
response

<Response [200]>

In [7]:
obj1= data_t()
df_m= obj1.trans(response)

<Response [200]>


# Gold layer

In [8]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'C:\Users\bhard\Funnel\booking-26091-6274ab5fe09a.json'  # Bigquery credentials json

In [9]:

# Initialize the BigQuery client
client = bigquery.Client()

In [10]:

# Define table ID
project_id = 'booking-26091'
dataset_id = 'api_marketing_adtriba'
table_name = 'api_marketing'
table_id = f"{project_id}.{dataset_id}.{table_name}"

try:
    client.delete_table(table_id)
    print(f"Deleted table {table_id}.")
except Exception as e:
    print(f"Table {table_id} does not exist or could not be deleted: {e}")

# Load DataFrame
job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)
job = client.load_table_from_dataframe(df_m, table_id, job_config=job_config)
job.result()  # Wait for the job to complete

print(f"{job.output_rows} rows are loaded into {table_id}.")


Deleted table booking-26091.api_marketing_adtriba.api_marketing.
7616 rows are loaded into booking-26091.api_marketing_adtriba.api_marketing.
