# Belgian Brewery Data Ingestion - Step 1

This notebook handles **Step 1: Data Infrastructure** of the Belgian Brewery "Glide Template" Strategy Project.

## Objective
Load Belgian beers and breweries data into Google BigQuery as the foundation for our analytics pipeline.

## Data Sources
1. **Belgian Beers**: Wikipedia list of Belgian beers
2. **Belgian Municipalities**: European data portal for province mapping

## Prerequisites
✅ Google Cloud CLI installed and configured  
✅ BigQuery API enabled  
✅ Project authentication set up

In [None]:
# Create a conda environment for the Belgian Brewery project
# conda create --name belgian-brewery python=3.12
# conda activate belgian-brewery

# Install required packages
# pip install -r requirements.txt

## Import Libraries & Setup

In [None]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
from google.cloud import bigquery
import json
import re
from datetime import datetime
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

print("📦 Libraries imported successfully")

## BigQuery Authentication & Setup

Since you have gcloud CLI installed, we'll use application default credentials.

In [None]:
# Initialize BigQuery client (using gcloud authentication)
try:
    # Initialize BigQuery client
    client = bigquery.Client()

    # Set project and dataset
    PROJECT_ID = client.project
    DATASET_ID = "BE_beers_breweries_province"

    print(f"✅ Connected to BigQuery project: {PROJECT_ID}")
    print(f"🎯 Target dataset: {DATASET_ID}")

    # Verify dataset exists
    dataset_id = f"{PROJECT_ID}.{DATASET_ID}"
    dataset = client.get_dataset(dataset_id)
    print(f"✅ Dataset {DATASET_ID} already exists")

except Exception as e:
    print(f"❌ Error setting up BigQuery: {e}")
    print("💡 Make sure you've run: gcloud auth application-default login")

## Data Source 1: Belgian Beers from Wikipedia

Extract beer data from Wikipedia's list of Belgian beers.

In [None]:
def scrape_belgian_beers():
    """
    Scrape Belgian beer data from Wikipedia's comprehensive list
    Returns: DataFrame with beer information
    
    The Wikipedia page contains multiple sections (A-Z plus "Buiten alfabet")
    with detailed beer information in wikitable format.
    """
    url = "https://nl.wikipedia.org/wiki/Lijst_van_Belgische_bieren"
    
    try:
        print("🔄 Fetching Belgian beer data from Wikipedia...")
        
        # Use session for better connection handling
        session = requests.Session()
        session.headers.update({
            'User-Agent': 'Mozilla/5.0 (compatible; BeersDataProject/1.0)'
        })
        
        response = session.get(url, timeout=30)
        response.raise_for_status()
        
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # Find all wikitable tables - these contain the beer data
        tables = soup.find_all('table', {'class': 'wikitable'})
        print(f"📊 Found {len(tables)} data tables on the page")
        
        all_beers = []
        processed_count = 0
        
        for table_idx, table in enumerate(tables):
            rows = table.find_all('tr')
            
            if len(rows) < 2:  # Skip tables without data rows
                continue
                
            # Skip header row(s) and process data rows
            data_rows = rows[1:]  
            
            for row_idx, row in enumerate(data_rows):
                cells = row.find_all(['td', 'th'])
                
                # Wikipedia tables have varying structures, need at least 2 columns
                if len(cells) < 2:
                    continue
                
                try:
                    # Extract and clean beer name (first column)
                    beer_name = cells[0].get_text(strip=True)
                    if not beer_name or beer_name in ['', '|']:
                        continue
                    
                    # Extract brewery information 
                    if len(cells) > 3:
                        brewery_cell = cells[3].get_text(strip=True)
                        # Clean brewery name from common patterns
                        brewery_name = re.sub(r'\s*\(.*?\)', '', brewery_cell)  # Remove parentheses
                        brewery_name = re.sub(r'\s*voor\s.*', '', brewery_name)  # Remove "voor" clauses
                        brewery_name = brewery_name.strip()
                    else:
                        brewery_name = None
                    
                    # Extract beer style (usually third column)
                    beer_style = None
                    if len(cells) > 1:
                        style_text = cells[1].get_text(strip=True)
                        if style_text and style_text not in ['', '?', '?%']:
                            beer_style = style_text
                    
                    # Extract ABV information (usually fourth column)
                    abv_text = None
                    abv_pct = None
                    if len(cells) > 2:
                        abv_cell = cells[2].get_text(strip=True)
                        if abv_cell and abv_cell not in ['', '?', '?%']:
                            abv_text = abv_cell
                            # Extract numeric ABV with improved regex
                            abv_matches = re.findall(r'(\d+(?:[.,]\d+)?)\s*%', abv_cell)
                            if abv_matches:
                                try:
                                    # Handle both comma and dot as decimal separator
                                    abv_str = abv_matches[0].replace(',', '.')
                                    abv_pct = float(abv_str)
                                    # Sanity check for ABV values
                                    if abv_pct > 50:  # Unlikely for beer
                                        abv_pct = None
                                except (ValueError, IndexError):
                                    abv_pct = None
                    
                    # Extract brewery name and additional info (usually fifth column)
                    production_period = None
                    additional_info = None
                    if len(cells) > 4:
                        period_text = cells[4].get_text(strip=True)
                        if period_text and period_text not in ['', '?']:
                            production_period = period_text
                    
                    # Only add if we have at least beer name and some brewery info
                    if beer_name and brewery_name:
                        beer_data = {
                            'beer_name': beer_name,
                            'brewery_name': brewery_name,
                            'beer_style': beer_style,
                            'abv_text': abv_text,
                            'abv_pct': abv_pct,
                            'production_period': production_period,
                            'source': 'wikipedia_nl',
                            'source_table': table_idx,
                            'scraped_at': datetime.now()
                        }
                        
                        all_beers.append(beer_data)
                        processed_count += 1
                        
                except Exception as e:
                    # Log parsing errors but continue processing
                    print(f"⚠️  Error parsing row {row_idx} in table {table_idx}: {str(e)[:100]}")
                    continue
        
        # Convert to DataFrame with better error handling
        if not all_beers:
            print("❌ No beer data extracted")
            return pd.DataFrame()
        
        df_beers = pd.DataFrame(all_beers)
        
        # Data cleaning and deduplication
        print(f"🧹 Cleaning {len(df_beers)} raw records...")
        
        # Remove empty names and duplicates
        initial_count = len(df_beers)
        df_beers = df_beers[df_beers['beer_name'].str.len() > 0]
        df_beers = df_beers[df_beers['brewery_name'].str.len() > 0]
        
        # Remove exact duplicates
        df_beers = df_beers.drop_duplicates(subset=['beer_name', 'brewery_name'])
        
        # Sort by beer name for consistency
        df_beers = df_beers.sort_values('beer_name').reset_index(drop=True)
        
        final_count = len(df_beers)
        removed_count = initial_count - final_count
        
        print(f"✅ Successfully scraped {final_count} Belgian beers")
        print(f"🗑️  Removed {removed_count} invalid/duplicate records")
        
        return df_beers
        
    except requests.exceptions.RequestException as e:
        print(f"❌ Network error while fetching data: {e}")
        return pd.DataFrame()
    except Exception as e:
        print(f"❌ Unexpected error during scraping: {e}")
        return pd.DataFrame()

# Scrape the beer data with enhanced extraction
df_beers = scrape_belgian_beers()

if not df_beers.empty:
    # print("\n📊 Sample beer data:")
    # print(df_beers.head(10))
    print(f"\n📈 Data shape: {df_beers.shape}")
    print(f"🍺 Unique beers: {df_beers['beer_name'].nunique()}")
    print(f"🏭 Unique breweries: {df_beers['brewery_name'].nunique()}")
    
    # Show beer style distribution
    if 'beer_style' in df_beers.columns:
        print(f"🎨 Beer styles found: {df_beers['beer_style'].nunique()}")
        print("\n🔝 Top beer styles:")
        style_counts = df_beers['beer_style'].value_counts().head(5)
        for style, count in style_counts.items():
            if style:  # Only show non-null styles
                print(f"   {style}: {count}")
    
    # Show ABV statistics
    abv_data = df_beers['abv_pct'].dropna()
    if len(abv_data) > 0:
        print(f"\n🍻 ABV statistics (from {len(abv_data)} beers with ABV data):")
        print(f"   Min: {abv_data.min():.1f}%")
        print(f"   Max: {abv_data.max():.1f}%")
        print(f"   Average: {abv_data.mean():.1f}%")
        # print(f"   Median: {abv_data.median():.1f}%")
else:
    print("❌ No data extracted. Please check the Wikipedia page structure.")

## Data Source 2: Belgian Municipalities & Provinces

Load Belgian municipality and province data for geographic mapping.

In [None]:
def load_belgian_municipalities():
    """
    Load Belgian municipality and province data
    This could be from the European data portal or a simplified dataset
    """
    # For now, let's create a mapping of major Belgian provinces
    # In a real scenario, you'd load this from the data.europa.eu API
    
    belgian_provinces = {
        'Antwerp': {'region': 'Flanders', 'population': 1857986},
        'East Flanders': {'region': 'Flanders', 'population': 1515064},
        'West Flanders': {'region': 'Flanders', 'population': 1195796},
        'Flemish Brabant': {'region': 'Flanders', 'population': 1146175},
        'Limburg': {'region': 'Flanders', 'population': 874048},
        'Hainaut': {'region': 'Wallonia', 'population': 1344241},
        'Liège': {'region': 'Wallonia', 'population': 1106992},
        'Namur': {'region': 'Wallonia', 'population': 494325},
        'Luxembourg': {'region': 'Wallonia', 'population': 284638},
        'Walloon Brabant': {'region': 'Wallonia', 'population': 403599},
        'Brussels': {'region': 'Brussels-Capital', 'population': 1218255}
    }
    
    df_provinces = pd.DataFrame([
        {
            'province_name': province,
            'region': data['region'],
            'population': data['population'],
            'source': 'manual_entry'
        }
        for province, data in belgian_provinces.items()
    ])
    
    print(f"✅ Loaded {len(df_provinces)} Belgian provinces")
    return df_provinces

df_provinces = load_belgian_municipalities()
print("\n📊 Belgian provinces:")
print(df_provinces)

## Data Enhancement: Assign Provinces to Breweries

Create a simple mapping to assign provinces to breweries (this will be enhanced with geocoding in Step 2).

In [None]:
def assign_provinces_to_breweries(df_beers, df_provinces):
    """
    Assign provinces to breweries based on brewery names
    This is a simplified approach - Step 2 will use geocoding for accuracy
    """
    # Create a simple mapping based on common brewery location patterns
    province_keywords = {
        'Antwerp': ['antwerp', 'antwerpen'],
        'East Flanders': ['ghent', 'gent', 'aalst', 'oudenaarde'],
        'West Flanders': ['bruges', 'brugge', 'kortrijk', 'ostend', 'oostende'],
        'Flemish Brabant': ['leuven', 'vilvoorde'],
        'Limburg': ['hasselt', 'genk'],
        'Hainaut': ['mons', 'charleroi', 'tournai'],
        'Liège': ['liège', 'liege', 'verviers'],
        'Namur': ['namur', 'dinant'],
        'Luxembourg': ['arlon', 'bastogne'],
        'Walloon Brabant': ['wavre', 'nivelles'],
        'Brussels': ['brussels', 'brussel', 'bruxelles']
    }
    
    def assign_province(brewery_name):
        brewery_lower = brewery_name.lower()
        for province, keywords in province_keywords.items():
            for keyword in keywords:
                if keyword in brewery_lower:
                    return province
        return 'Unknown'  # Will be geocoded in Step 2
    
    df_enhanced = df_beers.copy()
    df_enhanced['province'] = df_enhanced['brewery_name'].apply(assign_province)
    
    print("📍 Province assignment summary:")
    print(df_enhanced['province'].value_counts())
    
    return df_enhanced

# Enhance the beer data with province assignments
df_beers_enhanced = assign_provinces_to_breweries(df_beers, df_provinces)

print(f"\n✅ Enhanced dataset ready: {df_beers_enhanced.shape}")
print(f"🏭 Breweries with known provinces: {(df_beers_enhanced['province'] != 'Unknown').sum()}")
print(f"❓ Breweries needing geocoding: {(df_beers_enhanced['province'] == 'Unknown').sum()}")

## Upload Data to BigQuery

Upload the raw data tables to BigQuery for the dbt transformation pipeline.

In [None]:
def upload_to_bigquery(df, table_name, description=""):
    """Upload DataFrame to BigQuery with proper configuration"""
    
    table_id = f"{PROJECT_ID}.{DATASET_ID}.{table_name}"
    
    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_TRUNCATE",
        autodetect=True
    )
    
    try:
        job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
        job.result()  # Wait for completion
        
        # Add table description
        if description:
            table = client.get_table(table_id)
            table.description = description
            client.update_table(table, ["description"])
        
        print(f"✅ Uploaded {len(df)} rows to {table_name}")
        return True
        
    except Exception as e:
        print(f"❌ Error uploading {table_name}: {e}")
        return False

# Upload raw beer data
if not df_beers_enhanced.empty:
    upload_to_bigquery(
        df_beers_enhanced, 
        "raw_beers",
        "Raw Belgian beer data scraped from Wikipedia with initial province assignments"
    )

# Upload province reference data
upload_to_bigquery(
    df_provinces,
    "raw_provinces", 
    "Belgian provinces reference data with population and region information"
)

print("\n🎉 Data ingestion complete!")

## Verify Data Upload

Query the uploaded data to ensure everything worked correctly.

In [None]:
# Verify the uploaded data
tables_to_check = ['raw_beers', 'raw_provinces']

for table_name in tables_to_check:
    query = f"""
    SELECT 
        COUNT(*) as row_count,
        COUNT(DISTINCT brewery_name) as unique_breweries
    FROM `{PROJECT_ID}.{DATASET_ID}.{table_name}`
    """
    
    if table_name == 'raw_provinces':
        query = f"""
        SELECT 
            COUNT(*) as row_count,
            COUNT(DISTINCT province_name) as unique_provinces
        FROM `{PROJECT_ID}.{DATASET_ID}.{table_name}`
        """
    
    try:
        result = client.query(query).to_dataframe()
        print(f"📊 {table_name}: {result.iloc[0].to_dict()}")
    except Exception as e:
        print(f"❌ Error querying {table_name}: {e}")

# Sample the data
print("\n🔍 Sample beer data from BigQuery:")
sample_query = f"""
SELECT 
    beer_name,
    brewery_name,
    beer_style,
    abv_pct,
    province
FROM `{PROJECT_ID}.{DATASET_ID}.raw_beers`
WHERE beer_name IS NOT NULL
LIMIT 10
"""

try:
    sample_data = client.query(sample_query).to_dataframe()
    print(sample_data)
except Exception as e:
    print(f"❌ Error fetching sample: {e}")

## ✅ Step 1 Complete: Data Infrastructure

**What we accomplished:**

✅ **Data Source Integration**: Scraped Belgian beer data from Wikipedia  
✅ **Reference Data**: Loaded Belgian provinces and regions  
✅ **Data Enhancement**: Initial province assignment to breweries  
✅ **BigQuery Upload**: Raw data tables ready for transformation  
✅ **Data Validation**: Verified upload and data quality  

**Next Steps:**

📍 **Step 2**: Data enrichment with geocoding API for brewery locations  
🔧 **Step 3**: dbt transformation pipeline setup  
📊 **Step 4**: Hex dashboard development  

**Raw Tables Created:**
- `raw_beers`: {len(df_beers_enhanced) if not df_beers_enhanced.empty else 0} Belgian beers with brewery and style information
- `raw_provinces`: {len(df_provinces)} Belgian provinces with region and population data

Ready to proceed to geocoding and data enrichment! 🚀