# Update gridVeg Survey Metadata in BigQuery

This notebook appends new survey metadata to the BigQuery table from a CSV file stored in GCS.

**Operation**: APPEND new rows (not replace entire table)

## Requirements
- Google Cloud credentials configured
- Configuration file: copy `config.example.yml` to `config.yml` and fill in your values
- Required packages: google-cloud-bigquery, google-cloud-storage, pandas, pyyaml


In [1]:
# Import required libraries
import yaml
import pandas as pd
from pathlib import Path
from google.cloud import bigquery
from google.cloud import storage
from datetime import datetime

print("Libraries imported successfully")


Libraries imported successfully


In [None]:
# Load configuration from YAML file
config_path = Path("../config.yml")

if not config_path.exists():
    raise FileNotFoundError(
        f"Configuration file not found: {config_path}\n"
        "Please copy config.example.yml to config.yml and fill in your values."
    )

with open(config_path, 'r') as f:
    config = yaml.safe_load(f)

# Extract configuration values for gridVeg survey metadata
GCS_CSV_URL = config['gridveg_survey_metadata']['gcs']['csv_url']
BACKUP_BUCKET = config['gridveg_survey_metadata']['gcs'].get('backup_bucket')
BACKUP_PREFIX = config['gridveg_survey_metadata']['gcs'].get('backup_prefix', 'backups/gridveg_survey_metadata')
BQ_TABLE_ID = config['gridveg_survey_metadata']['bigquery']['table_id']
BQ_PROJECT = config['gridveg_survey_metadata']['bigquery'].get('project')

# Verify required config values
if not GCS_CSV_URL or GCS_CSV_URL.startswith('gs://your-'):
    raise ValueError("Please configure gridveg_survey_metadata.gcs.csv_url in config.yml")
if not BQ_TABLE_ID or 'your-project' in BQ_TABLE_ID:
    raise ValueError("Please configure gridveg_survey_metadata.bigquery.table_id in config.yml")

print("✓ Configuration loaded successfully")
print(f"  CSV URL: {GCS_CSV_URL[:60]}..." if len(GCS_CSV_URL) > 60 else f"  CSV URL: {GCS_CSV_URL}")
print(f"  Table ID: {BQ_TABLE_ID}")
print(f"  Backup: gs://{BACKUP_BUCKET}/{BACKUP_PREFIX}" if BACKUP_BUCKET else "  Backup: Not configured")


In [3]:
# Initialize clients
bq_client = bigquery.Client(project=BQ_PROJECT) if BQ_PROJECT else bigquery.Client()
storage_client = storage.Client(project=BQ_PROJECT) if BQ_PROJECT else storage.Client()

print(f"✓ Clients initialized")
print(f"  Project: {bq_client.project}")


✓ Clients initialized
  Project: mpg-data-warehouse


In [4]:
# Read CSV from GCS (new data)
print("Reading CSV from GCS...")
df_new = pd.read_csv(GCS_CSV_URL)

print(f"✓ CSV loaded successfully:")
print(f"  Rows: {len(df_new)}")
print(f"  Columns: {list(df_new.columns)}")
print(f"\nFirst few rows:")
df_new.head()


Reading CSV from GCS...
✓ CSV loaded successfully:
  Rows: 39
  Columns: ['__kp_Survey', '_kf_Site', 'SurveyYear', 'SurveyDate', 'Surveyor1']

First few rows:


Unnamed: 0,__kp_Survey,_kf_Site,SurveyYear,SurveyDate,Surveyor1
0,B45700C5-D391-4679-8579-217DCB1385A2,227,2025,5/21/25,MLS
1,C0BD2A75-FF0B-48DC-BB9D-941267BF5838,190,2025,5/21/25,MLS
2,38A8FE64-8769-474C-BC25-01CBF006BFCC,331,2025,5/22/25,MLS
3,147224CA-F0FC-4E02-B2DE-8B17F5553B29,45,2025,5/26/25,MLS
4,CD7E5294-F7D8-4CD6-B35A-EDB356A88A73,165,2025,5/26/25,MLS


## Transform CSV Data

Apply schema transformations to match BigQuery table:
- Rename columns to match destination schema
- Convert date format from mm/dd/yyyy to ISO format (YYYY-MM-DD)
- Create `survey_sequence` variable from `year` (2011 and 2012 → "2011-12")


In [5]:
# Define column mapping from CSV to BigQuery
column_mapping = {
    '__kp_Survey': 'survey_ID',
    '_kf_Site': 'grid_point',
    'SurveyYear': 'year',
    'SurveyDate': 'date',
    'Surveyor1': 'surveyor'
}

print("Column mapping:")
for csv_col, bq_col in column_mapping.items():
    print(f"  {csv_col:20s} → {bq_col}")


Column mapping:
  __kp_Survey          → survey_ID
  _kf_Site             → grid_point
  SurveyYear           → year
  SurveyDate           → date
  Surveyor1            → surveyor


In [6]:
# Verify CSV columns match expected schema
expected_csv_columns = set(column_mapping.keys())
actual_csv_columns = set(df_new.columns)

if actual_csv_columns == expected_csv_columns:
    print("✓ CSV columns match expected schema")
else:
    print("⚠ CSV column differences detected:")
    if actual_csv_columns - expected_csv_columns:
        print(f"  Unexpected columns: {actual_csv_columns - expected_csv_columns}")
    if expected_csv_columns - actual_csv_columns:
        print(f"  Missing columns: {expected_csv_columns - actual_csv_columns}")
    
print(f"\nCSV columns: {list(df_new.columns)}")


✓ CSV columns match expected schema

CSV columns: ['__kp_Survey', '_kf_Site', 'SurveyYear', 'SurveyDate', 'Surveyor1']


In [7]:
# Apply transformation: rename columns
df_transformed = df_new.copy()
df_transformed = df_transformed.rename(columns=column_mapping)

print("✓ Columns renamed")
print(f"  Transformed columns: {list(df_transformed.columns)}")


✓ Columns renamed
  Transformed columns: ['survey_ID', 'grid_point', 'year', 'date', 'surveyor']


In [None]:
# Convert date from mm/dd/yyyy to ISO format (YYYY-MM-DD)
# Explicitly specify format to avoid parsing warnings and ensure consistency
df_transformed['date'] = pd.to_datetime(df_transformed['date'], format='%m/%d/%Y').dt.strftime('%Y-%m-%d')

print("✓ Date format converted to ISO (YYYY-MM-DD)")
print(f"  Sample dates: {df_transformed['date'].head().tolist()}")


✓ Date format converted to ISO (YYYY-MM-DD)
  Sample dates: ['2025-05-21', '2025-05-21', '2025-05-22', '2025-05-26', '2025-05-26']


  df_transformed['date'] = pd.to_datetime(df_transformed['date']).dt.strftime('%Y-%m-%d')


In [None]:
# Create survey_sequence variable from year
# Recode 2011 and 2012 to "2011-12", leave all other years as strings
def create_survey_sequence(year):
    if year in [2011, 2012]:
        return "2011-12"
    else:
        return str(year)

df_transformed['survey_sequence'] = df_transformed['year'].apply(create_survey_sequence)

print("✓ Created survey_sequence variable")
print(f"\nSurvey sequence mapping:")
for year in sorted(df_transformed['year'].unique()):
    seq = df_transformed[df_transformed['year'] == year]['survey_sequence'].iloc[0]
    count = len(df_transformed[df_transformed['year'] == year])
    print(f"  Year {year} → '{seq}' ({count} records)")


In [None]:
# Display transformed data info
print("Transformed Data Info:")
df_transformed.info()
print(f"\nTransformed data preview:")
df_transformed.head()


## Read Existing BigQuery Table

Load the current data from BigQuery to compare with the new data.


In [None]:
# Read existing data from BigQuery
print(f"Reading existing data from {BQ_TABLE_ID}...")
query = f"SELECT * FROM `{BQ_TABLE_ID}`"

try:
    df_existing = bq_client.query(query).to_dataframe()
    print(f"✓ Existing table loaded:")
    print(f"  Rows: {len(df_existing)}")
    print(f"  Columns: {list(df_existing.columns)}")
    print(f"\nExisting data preview:")
    display(df_existing.head())
except Exception as e:
    print(f"⚠ Error reading table: {e}")
    print("  This may be expected if the table doesn't exist yet.")
    df_existing = None


In [None]:
# Display existing data info (if available)
if df_existing is not None:
    print("Existing Data Info:")
    df_existing.info()


## Compare New vs Existing Data

Identify which rows in the new data are not already in the existing table.


In [None]:
# Compare datasets
if df_existing is not None:
    print("=== Comparison Summary ===\n")
    
    # Row count comparison
    print(f"Row count:")
    print(f"  Existing: {len(df_existing)}")
    print(f"  New CSV:  {len(df_transformed)}")
    
    # Column comparison
    existing_cols = set(df_existing.columns)
    new_cols = set(df_transformed.columns)
    
    if existing_cols == new_cols:
        print(f"\n✓ Columns match ({len(new_cols)} columns)")
    else:
        print("\n⚠ Column differences detected:")
        if new_cols - existing_cols:
            print(f"  New columns: {new_cols - existing_cols}")
        if existing_cols - new_cols:
            print(f"  Missing columns: {existing_cols - new_cols}")
    
    print(f"\nColumns: {list(df_transformed.columns)}")
else:
    print("No existing data to compare - this will be a new table creation.")


In [None]:
# Identify new records (not in existing table)
# Use survey_ID as the unique identifier
if df_existing is not None:
    existing_ids = set(df_existing['survey_ID'])
    new_ids = set(df_transformed['survey_ID'])
    
    # Find records in new data that aren't in existing
    ids_to_append = new_ids - existing_ids
    
    if ids_to_append:
        df_to_append = df_transformed[df_transformed['survey_ID'].isin(ids_to_append)].copy()
        print(f"✓ Found {len(df_to_append)} new records to append")
        
        # Show year breakdown of new records
        print(f"\nNew records by year:")
        year_counts = df_to_append['year'].value_counts().sort_index()
        for year, count in year_counts.items():
            print(f"  {year}: {count} records")
        
        print(f"\nSample of new records:")
        display(df_to_append.head())
    else:
        df_to_append = None
        print("⚠ No new records found - all survey_IDs already exist in table")
        print("  Nothing to append.")
    
    # Check for any duplicates
    duplicate_ids = existing_ids & new_ids
    if duplicate_ids:
        print(f"\n⚠ Warning: {len(duplicate_ids)} survey_IDs already exist in table")
        print(f"  These will be skipped during append.")
        if len(duplicate_ids) <= 10:
            print(f"  Duplicate IDs: {list(duplicate_ids)[:10]}")
else:
    # No existing table, so all records are new
    df_to_append = df_transformed.copy()
    print(f"✓ No existing table - will create new table with {len(df_to_append)} records")


## Backup Existing Table

Before making any changes, create a backup of the existing table to GCS.


In [None]:
# Backup existing table to GCS
if df_existing is not None and BACKUP_BUCKET and df_to_append is not None:
    # Generate backup path with timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = f"gs://{BACKUP_BUCKET}/{BACKUP_PREFIX}/{timestamp}/*.csv"
    
    print(f"Creating backup of existing table...")
    print(f"  Destination: {backup_path}")
    
    # Export table to GCS
    extract_job = bq_client.extract_table(
        BQ_TABLE_ID,
        backup_path,
        location="US"
    )
    
    extract_job.result()  # Wait for job to complete
    
    print(f"✓ Backup completed successfully")
    print(f"  Files: {backup_path}")
elif df_existing is None:
    print("⚠ No existing table to backup (table doesn't exist yet)")
elif not BACKUP_BUCKET:
    print("⚠ Backup bucket not configured in config.yml")
    print("  Set 'gridveg_survey_metadata.gcs.backup_bucket' to enable automatic backups")
elif df_to_append is None:
    print("⚠ No new records to append, skipping backup")


## Append New Records to BigQuery

⚠️ **IMPORTANT**: This will APPEND new rows to the existing table (not replace).

Review the comparison above before proceeding.


In [None]:
# Append new records to BigQuery
if df_to_append is not None and len(df_to_append) > 0:
    print("=" * 60)
    print("APPENDING TO BIGQUERY TABLE")
    print("=" * 60)
    print(f"\nTable: {BQ_TABLE_ID}")
    print(f"Rows to append: {len(df_to_append)}")
    print(f"Mode: WRITE_APPEND (add to existing table)")
    print(f"\nStarting append at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}...")
    
    # Configure job to append to existing table
    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_APPEND"  # Append to existing table
    )
    
    # Load dataframe to BigQuery
    load_job = bq_client.load_table_from_dataframe(
        df_to_append,
        BQ_TABLE_ID,
        job_config=job_config
    )
    
    # Wait for job to complete
    load_job.result()
    
    print(f"\n✓ Append completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"  Rows appended: {load_job.output_rows}")
    print(f"  Job ID: {load_job.job_id}")
else:
    print("=" * 60)
    print("NO RECORDS TO APPEND")
    print("=" * 60)
    print("\nNo new records found or no records to append.")
    print("Table remains unchanged.")


## Verify Append

Read back the table to verify the append was successful.


In [None]:
# Read updated table
print("Verifying append...")
query = f"SELECT * FROM `{BQ_TABLE_ID}`"
df_updated = bq_client.query(query).to_dataframe()

print(f"\n✓ Verification complete")
print(f"  Rows in table: {len(df_updated)}")
print(f"  Columns: {list(df_updated.columns)}")

# Show records by year
print(f"\nRecords by year:")
year_counts = df_updated['year'].value_counts().sort_index()
for year, count in year_counts.items():
    print(f"  {year}: {count} records")

print(f"\nUpdated table preview:")
df_updated.tail(10)


In [None]:
# Verify row counts
if df_to_append is not None and len(df_to_append) > 0:
    expected_rows = len(df_existing) + len(df_to_append) if df_existing is not None else len(df_to_append)
    actual_rows = len(df_updated)
    
    print("Data integrity check:")
    if df_existing is not None:
        print(f"  Previous rows:   {len(df_existing)}")
        print(f"  Rows appended:   {len(df_to_append)}")
        print(f"  Expected total:  {expected_rows}")
        print(f"  Actual total:    {actual_rows}")
    else:
        print(f"  Rows written:    {len(df_to_append)}")
        print(f"  Rows in table:   {actual_rows}")
    
    if expected_rows == actual_rows:
        print(f"\n✓ Row count verified - all {len(df_to_append)} new rows successfully appended")
    else:
        print(f"\n⚠ Row count mismatch!")
        print(f"  Expected: {expected_rows}")
        print(f"  Actual:   {actual_rows}")
        print(f"  Difference: {actual_rows - expected_rows}")
else:
    print("No new records were appended.")


## Summary Report

Complete summary of the append operation.


In [None]:
# Generate summary report
print("=" * 60)
print("GRIDVEG SURVEY METADATA APPEND SUMMARY")
print("=" * 60)

print(f"\n📅 Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

print(f"\n📂 Source:")
print(f"  CSV: {GCS_CSV_URL.split('/')[-1]}")
print(f"  Location: {'/'.join(GCS_CSV_URL.split('/')[:-1])}")

print(f"\n🎯 Target:")
print(f"  Table: {BQ_TABLE_ID}")
print(f"  Project: {bq_client.project}")

print(f"\n📊 Data Changes:")
if df_existing is not None:
    print(f"  Previous rows: {len(df_existing)}")
    print(f"  New rows:      {len(df_updated)}")
    print(f"  Rows appended: {len(df_updated) - len(df_existing):+d}")
    
    if df_to_append is not None and len(df_to_append) > 0:
        print(f"\n  Appended records by year:")
        year_counts = df_to_append['year'].value_counts().sort_index()
        for year, count in year_counts.items():
            print(f"    {year}: {count} records")
else:
    print(f"  New table created with {len(df_updated)} rows")

print(f"\n🔄 Transformations Applied:")
print(f"  ✓ Renamed {len(column_mapping)} columns to match BigQuery schema")
print(f"  ✓ Converted date format to ISO (YYYY-MM-DD)")
print(f"  ✓ Created survey_sequence variable (2011/2012 → 2011-12)")

if BACKUP_BUCKET and df_existing is not None and df_to_append is not None and len(df_to_append) > 0:
    print(f"\n💾 Backup:")
    print(f"  Location: gs://{BACKUP_BUCKET}/{BACKUP_PREFIX}/")
    print(f"  Status: ✓ Created before append")

if df_to_append is not None and len(df_to_append) > 0:
    print(f"\n✅ Append completed successfully!")
else:
    print(f"\n✅ No changes needed - table is up to date!")
print("=" * 60)


## Rollback Instructions (If Needed)

If you need to rollback to the previous version, use the backup created at the beginning of this notebook.

```python
# To rollback, first delete the appended rows:
# df_rollback = df_updated[~df_updated['survey_ID'].isin(df_to_append['survey_ID'])]
# job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
# bq_client.load_table_from_dataframe(df_rollback, BQ_TABLE_ID, job_config=job_config)

# Or restore from backup:
# backup_path = "gs://BACKUP_BUCKET/BACKUP_PREFIX/TIMESTAMP/*.csv"
# df_backup = pd.read_csv(backup_path)
# job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
# bq_client.load_table_from_dataframe(df_backup, BQ_TABLE_ID, job_config=job_config)
```

The backup location was printed in the backup cell above.
