# GDELT Demo Notebook

This notebook demonstrates working with GDELT (Global Database of Events, Language, and Tone) data for graph analysis.


In [None]:
# Configuration variables
GCP_PROJECT_ID = "graph-demo-471710"  # Replace with your actual GCP project ID
PROJECT_REGION = "us-central1"
BIGQUERY_DATASET = "gdelt"  # Replace with your actual BigQuery dataset name
BIGQUERY_TABLES = ["gkg_partitioned", "events_partitioned","eventmentions_partitioned"]  # List of tables to copy
GDELT_PROJECT_ID = "gdelt-bq"
GDELT_DATASET = "gdeltv2"  
GDELT_REGION = "us"
GCS_BUCKET = "gdelt_graph"

# Derived variables - will be generated for each table
print(f"Configuration loaded:")
print(f"  GCP Project: {GCP_PROJECT_ID}")
print(f"  BigQuery Dataset: {BIGQUERY_DATASET}")
print(f"  BigQuery Tables: {BIGQUERY_TABLES}")
print(f"  GDELT Project: {GDELT_PROJECT_ID}")
print(f"  GDELT Dataset: {GDELT_DATASET}")
print(f"  GDELT Region: {GDELT_REGION}")
print(f"  GCS Bucket: {GCS_BUCKET}")

Configuration loaded:
  GCP Project: graph-demo-471710
  BigQuery Dataset: gdelt
  BigQuery Tables: ['gkg_partitioned', 'events_partitioned']
  GDELT Project: gdelt-bq
  GDELT Dataset: gdeltv2
  GDELT Region: us
  GCS Bucket: gdelt_graph


In [None]:
# GCP Authentication Setup
import subprocess
import os
import shutil
from google.auth import default
from google.auth.exceptions import DefaultCredentialsError

def setup_gcp_authentication():
    """Complete GCP authentication setup with error handling"""
    print("🔐 Setting up GCP Authentication...")
    
    try:
        # Step 1: Try to use existing credentials first
        print("🔍 Checking for existing credentials...")
        try:
            credentials, default_project = default()
            print(f"✅ Found existing credentials for project: {default_project}")
            
            # If the project matches, we're good
            if default_project == GCP_PROJECT_ID:
                print(f"🎯 Project matches target project: {GCP_PROJECT_ID}")
                os.environ['GOOGLE_CLOUD_PROJECT'] = GCP_PROJECT_ID
                return credentials, GCP_PROJECT_ID
            else:
                print(f"⚠️  Project mismatch: {default_project} vs {GCP_PROJECT_ID}")
                print("🔄 Will re-authenticate with correct project...")
        except DefaultCredentialsError:
            print("❌ No existing credentials found")
            print("🔄 Will authenticate from scratch...")
        
        # Step 2: Clear old credentials if needed
        print("🗑️  Clearing old credentials...")
        adc_path = os.path.expanduser("~/.config/gcloud/application_default_credentials.json")
        if os.path.exists(adc_path):
            os.remove(adc_path)
            print("✅ Removed old application default credentials")
        
        # Step 3: Set the correct project
        print(f"🎯 Setting gcloud project to: {GCP_PROJECT_ID}")
        result = subprocess.run(['gcloud', 'config', 'set', 'project', GCP_PROJECT_ID], 
                              capture_output=True, text=True, check=True)
        print("✅ Project set successfully")
        
        # Step 4: Re-authenticate
        print("🔄 Re-authenticating with application default credentials...")
        print("   This will open a browser window for authentication...")
        
        result = subprocess.run(['gcloud', 'auth', 'application-default', 'login'], 
                              check=True)
        print("✅ Re-authentication successful")
        
        # Step 5: Set quota project to avoid warnings
        print("💰 Setting quota project...")
        try:
            subprocess.run(['gcloud', 'auth', 'application-default', 'set-quota-project', GCP_PROJECT_ID], 
                          capture_output=True, text=True, check=True)
            print("✅ Quota project set successfully")
        except:
            print("⚠️  Could not set quota project (this is usually fine)")
        
        # Step 6: Verify the setup
        print("🧪 Verifying authentication...")
        credentials, project = default()
        print(f"✅ Authentication successful - Project: {project}")
        
        # Set environment variable
        os.environ['GOOGLE_CLOUD_PROJECT'] = GCP_PROJECT_ID
        print(f"🌍 Set GOOGLE_CLOUD_PROJECT environment variable to: {GCP_PROJECT_ID}")
        
        return credentials, GCP_PROJECT_ID
        
    except subprocess.CalledProcessError as e:
        print(f"❌ Command failed: {e}")
        print("💡 Manual steps required:")
        print(f"   1. gcloud config set project {GCP_PROJECT_ID}")
        print("   2. gcloud auth application-default login")
        print(f"   3. gcloud auth application-default set-quota-project {GCP_PROJECT_ID}")
        return None, None
    except Exception as e:
        print(f"❌ Error: {e}")
        return None, None

# Run authentication setup
credentials, authenticated_project = setup_gcp_authentication()


In [None]:
# Import required libraries
import os
import pandas as pd
from google.cloud import bigquery
from google.cloud import storage
import json
from datetime import datetime

print("Libraries imported successfully!")


In [None]:
# Test GCP connectivity
def test_gcp_connectivity():
    """Test basic connectivity to GCP services"""
    print("🔍 Testing GCP connectivity...")
    
    # Check if authentication was successful
    if not credentials or not authenticated_project:
        print("❌ Authentication required - please run the authentication cell first")
        return False
    
    print(f"✅ Using authenticated project: {authenticated_project}")
    
    # Test 1: Test BigQuery connectivity
    try:
        # Use explicit credentials and project
        client = bigquery.Client(credentials=credentials, project=authenticated_project)
        print(f"🔗 BigQuery client created for project: {client.project}")
        
        # Simple query to test connectivity
        query = "SELECT 1 as test_value"
        result = client.query(query).result()
        for row in result:
            print(f"✅ BigQuery connectivity successful - Test query result: {row.test_value}")
            break  # Only need first row
    except Exception as e:
        error_str = str(e)
        if "has been deleted" in error_str or "USER_PROJECT_DENIED" in error_str:
            print(f"❌ BigQuery connectivity failed: Project mismatch detected")
            print(f"   Error: {e}")
            print(f"🔧 This usually means your credentials are cached for a different project")
            print(f"   💡 Try running the authentication cell again")
            print(f"   📋 Or manually run: gcloud auth application-default login")
            return False
        else:
            print(f"❌ BigQuery connectivity failed: {e}")
            return False
    
    # Test 2: Test BigQuery dataset access
    try:
        client = bigquery.Client(credentials=credentials, project=authenticated_project)
        dataset_ref = client.dataset(BIGQUERY_DATASET)
        dataset = client.get_dataset(dataset_ref)
        print(f"✅ BigQuery dataset '{BIGQUERY_DATASET}' accessible")
        
        # List tables in the dataset
        tables = list(client.list_tables(dataset_ref))
        print(f"📊 Found {len(tables)} tables in dataset")
        for table in tables[:5]:  # Show first 5 tables
            print(f"   - {table.table_id}")
        if len(tables) > 5:
            print(f"   ... and {len(tables) - 5} more tables")
            
    except Exception as e:
        print(f"❌ BigQuery dataset access failed: {e}")
        print(f"   Make sure dataset '{BIGQUERY_DATASET}' exists in project '{authenticated_project}'")
        return False
    
    # Test 3: Test Cloud Storage connectivity
    try:
        storage_client = storage.Client(credentials=credentials, project=authenticated_project)
        # List buckets to test connectivity
        buckets = list(storage_client.list_buckets())
        print(f"✅ Cloud Storage connectivity successful - Found {len(buckets)} buckets")
    except Exception as e:
        print(f"❌ Cloud Storage connectivity failed: {e}")
        return False
    
    print("🎉 All GCP connectivity tests passed!")
    return True

# Run the connectivity test
test_gcp_connectivity()


In [None]:
# Ready for GDELT analysis!
print("🎉 Setup complete! Ready to work with GDELT data.")
print(f"📊 Project: {GCP_PROJECT_ID}")
print(f"🗄️  Dataset: {BIGQUERY_DATASET}")
print("🚀 You can now run queries against your GDELT data!")


In [None]:
# List datasets in the GDELT_PROJECT_ID project
def list_gdelt_datasets():
    """List all datasets in the GDELT_PROJECT_ID project"""
    print(f"🔍 Listing datasets in GDELT project: {GDELT_PROJECT_ID}")
    
    try:
        # Create BigQuery client for the GDELT project
        gdelt_client = bigquery.Client(project=GDELT_PROJECT_ID)
        print(f"✅ Connected to GDELT project: {gdelt_client.project}")
        
        # List all datasets in the project
        datasets = list(gdelt_client.list_datasets())
        
        if not datasets:
            print("📭 No datasets found in the GDELT project")
            return []
        
        print(f"📊 Found {len(datasets)} datasets in {GDELT_PROJECT_ID}:")
        print("-" * 60)
        
        dataset_info = []
        for dataset in datasets:
            # Get dataset details
            dataset_ref = gdelt_client.dataset(dataset.dataset_id)
            full_dataset = gdelt_client.get_dataset(dataset_ref)
            
            # Count tables in the dataset
            tables = list(gdelt_client.list_tables(dataset_ref))
            
            info = {
                'dataset_id': dataset.dataset_id,
                'description': full_dataset.description or 'No description',
                'created': full_dataset.created,
                'modified': full_dataset.modified,
                'location': full_dataset.location,
                'table_count': len(tables)
            }
            dataset_info.append(info)
            
            print(f"📁 Dataset: {dataset.dataset_id}")
            print(f"   Description: {info['description']}")
            print(f"   Created: {info['created']}")
            print(f"   Modified: {info['modified']}")
            print(f"   Location: {info['location']}")
            print(f"   Tables: {info['table_count']}")
            
            # Show first few tables if any
            if tables:
                print(f"   Sample tables:")
                for table in tables[:5]:
                    print(f"     - {table.table_id}")
                if len(tables) > 5:
                    print(f"     ... and {len(tables) - 5} more")
            print()
        
        return dataset_info
        
    except Exception as e:
        print(f"❌ Error listing datasets: {e}")
        return []

# Run the function to list datasets
gdelt_datasets = list_gdelt_datasets()


## Cross-Region GDELT Data Copy Function

This function efficiently copies GDELT data from the US region to your local US-CENTRAL1 region using a smart multi-step approach:

### 🎯 **Purpose**
- Copies GDELT data for a specific date (September 11, 2025) from the public GDELT dataset
- Handles cross-region data transfer from US region to US-CENTRAL1 region
- Optimizes for cost and speed with intelligent caching

### 🔄 **Process Flow**
1. **Destination Check**: Verifies if target table already exists (skips if data present)
2. **Dataset Setup**: Creates required datasets in both US and US-CENTRAL1 regions
3. **Temporary Table Check**: Checks if temp table exists in US region (reuses if available)
4. **Data Query**: Queries GDELT data and saves to temporary table in US region
5. **Cross-Region Copy**: Copies data from US region to US-CENTRAL1 region
6. **Cleanup**: Removes temporary table and verifies final data

### ⚡ **Optimizations**
- **Smart Caching**: Skips expensive operations if data already exists
- **Cost Efficient**: Reuses temporary tables when possible
- **Error Resilient**: Handles various BigQuery errors gracefully
- **Progress Tracking**: Detailed logging throughout the process

### 📊 **Output**
- Creates table: `{GCP_PROJECT_ID}.gdelt.gkg_partitioned` in US-CENTRAL1 region
- Shows row counts and verification details
- Provides troubleshooting tips if errors occur


In [69]:
# Copy GDELT data for specific partition (September 11, 2025) - Cross-region approach
from google.cloud import bigquery
from datetime import datetime

def copy_gdelt_partition_cross_region():
    """
    Copy data from GDELT table (US region) to local table (US-CENTRAL1 region).
    Uses a temporary table approach to handle cross-region data access.
    """
    print("🔄 Starting cross-region GDELT data copy...")
    print(f" Target date: September 11, 2025")
    print(f" Source: {GDELT_TABLE} (US region)")
    print(f" Destination: {GCP_PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE} (US-CENTRAL1 region)")
    print("-" * 70)
    
    try:
        # Create BigQuery client
        local_client = bigquery.Client(project=GCP_PROJECT_ID)
        print("✅ BigQuery client created")
        
        # Step 0: Check if destination table already exists
        print("🔍 Checking if destination table already exists...")
        dest_table_ref = f"{GCP_PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE}"
        try:
            existing_dest_table = local_client.get_table(dest_table_ref)
            print(f"✅ Destination table already exists: {existing_dest_table.full_table_id}")
            print(f"   Rows: {existing_dest_table.num_rows:,}")
            print("⏭️  Skipping data copy process, destination table already has data")
            
            # Optional: Verify the data is for the correct date
            print("🔍 Verifying existing data...")
            try:
                # Try a simple count first
                simple_query = f"SELECT COUNT(*) as row_count FROM `{dest_table_ref}`"
                result = local_client.query(simple_query, location="US-CENTRAL1").result()
                for row in result:
                    print(f"📊 Existing data summary:")
                    print(f"   Total rows: {row.row_count:,}")
                    print("✅ Data verification completed")
            except Exception as verify_error:
                print(f"⚠️  Could not verify existing data: {verify_error}")
            
            return True
            
        except Exception as e:
            if "notFound" in str(e) or "404" in str(e):
                print("📝 Destination table doesn't exist, proceeding with data copy...")
            else:
                print(f"⚠️  Error checking destination table: {e}")
                print("📝 Proceeding with data copy...")
        
        # Step 1: Create dataset if it doesn't exist
        print(f" Checking if dataset '{BIGQUERY_DATASET}' exists...")
        dataset_ref = local_client.dataset(BIGQUERY_DATASET)
        
        try:
            dataset = local_client.get_dataset(dataset_ref)
            print(f"✅ Dataset '{BIGQUERY_DATASET}' already exists")
        except Exception:
            print(f"📝 Dataset '{BIGQUERY_DATASET}' doesn't exist, creating it...")
            
            # Create dataset with proper location
            dataset = bigquery.Dataset(dataset_ref)
            dataset.location = "US-CENTRAL1"  # Specify the region
            dataset.description = "GDELT data for graph analysis"
            
            dataset = local_client.create_dataset(dataset, timeout=30)
            print(f"✅ Dataset '{BIGQUERY_DATASET}' created successfully in us-central1")
        
        # Step 2: Create dataset in US region for temporary table
        print("📝 Creating dataset in US region for temporary table...")
        us_dataset_name = f"{BIGQUERY_DATASET}_us"
        us_dataset_ref = bigquery.DatasetReference(GCP_PROJECT_ID, us_dataset_name)
        
        try:
            us_dataset = local_client.get_dataset(us_dataset_ref)
            print(f"✅ Dataset '{us_dataset_name}' already exists in US region")
        except Exception as e:
            if "notFound" in str(e) or "404" in str(e):
                print(f"📝 Creating dataset '{us_dataset_name}' in US region...")
                us_dataset = bigquery.Dataset(us_dataset_ref)
                us_dataset.location = "US"
                us_dataset.description = "GDELT data for graph analysis (US region - temporary)"
                try:
                    us_dataset = local_client.create_dataset(us_dataset, timeout=30)
                    print(f"✅ Dataset '{us_dataset_name}' created in US region")
                except Exception as create_error:
                    if "Already Exists" in str(create_error) or "409" in str(create_error):
                        print(f"✅ Dataset '{us_dataset_name}' already exists in US region (created by another process)")
                    else:
                        raise create_error
            else:
                print(f"⚠️  Unexpected error checking dataset in US region: {e}")
                raise e
        
        # Step 3: Check if temporary table already exists, if not query GDELT data
        temp_table_ref = local_client.dataset(us_dataset_name).table(f"temp_{BIGQUERY_TABLE}")
        
        print("🔍 Checking if temporary table already exists...")
        try:
            existing_temp_table = local_client.get_table(temp_table_ref)
            print(f"✅ Temporary table already exists: {existing_temp_table.full_table_id}")
            print(f"   Rows: {existing_temp_table.num_rows:,}")
            print("⏭️  Skipping data query, using existing temporary table")
        except Exception as e:
            if "notFound" in str(e) or "404" in str(e):
                print("📊 Temporary table doesn't exist, querying GDELT data and saving to temporary table...")
                
                # Configure the query job to save to temporary table in US region
                job_config = bigquery.QueryJobConfig()
                job_config.destination = temp_table_ref
                job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
                job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
                
                # Query the GDELT table
                query = f"""
                SELECT *
                FROM `{GDELT_TABLE}`
                WHERE _PARTITIONTIME = TIMESTAMP('2025-09-11')
                """
                
                print("📊 Executing query...")
                print(f"🔍 Query: {query}")
                print(f"🎯 Destination: {GCP_PROJECT_ID}.{us_dataset_name}.temp_{BIGQUERY_TABLE}")
                
                # Run the query - this will automatically handle cross-region data transfer
                query_job = local_client.query(
                    query,
                    job_config=job_config,
                    location="US"  # Query in US region where GDELT table exists
                )
                
                print(f"⏳ Query job started: {query_job.job_id}")
                print("⏳ Waiting for query to complete...")
                query_job.result()  # Wait for job to complete
                print("✅ Data copied to temporary table in US region")
            else:
                print(f"⚠️  Unexpected error checking temporary table: {e}")
                raise e
        
        # Define source table reference
        source_table_ref = bigquery.TableReference.from_string(f"{GCP_PROJECT_ID}.{us_dataset_name}.temp_{BIGQUERY_TABLE}")
        
        # Step 4: Copy data from US region temp table to US-CENTRAL1 region
        print("🔄 Copying data from US region to US-CENTRAL1 region...")
        
        # Configure the copy job
        copy_job_config = bigquery.CopyJobConfig()
        copy_job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
        copy_job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
        
        # Destination table (in US-CENTRAL1 region)
        dest_table_ref = local_client.dataset(BIGQUERY_DATASET).table(BIGQUERY_TABLE)
        
        # Copy the data - need to specify source location
        copy_job = local_client.copy_table(
            source_table_ref,
            dest_table_ref,
            job_config=copy_job_config,
            location="US"  # Source is in US region
        )
        
        print(f"⏳ Copy job started: {copy_job.job_id}")
        print("⏳ Waiting for copy to complete...")
        copy_job.result()  # Wait for job to complete
        print("✅ Data copied to US-CENTRAL1 region successfully")
        
        # Step 5: Clean up temporary table
        print("🧹 Cleaning up temporary table...")
        try:
            local_client.delete_table(source_table_ref)
            print("✅ Temporary table deleted")
        except Exception as e:
            print(f"⚠️  Could not delete temporary table: {e}")
        
        # Step 6: Verify the data
        print("🔍 Verifying imported data...")
        
        # First, check what columns are available in the table
        print("🔍 Checking table schema...")
        try:
            table = local_client.get_table(f"{GCP_PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE}")
            print(f"✅ Table found: {table.full_table_id}")
            print(f"   Rows: {table.num_rows:,}")
            print(f"   Columns: {len(table.schema)}")
            
            # Check if _PARTITIONTIME column exists
            partition_columns = [field.name for field in table.schema if 'partition' in field.name.lower() or 'time' in field.name.lower()]
            print(f"   Partition-related columns: {partition_columns}")
            
            # Try a simple count query first
            simple_verification_query = f"""
            SELECT COUNT(*) as row_count
            FROM `{GCP_PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE}`
            """
            
            verification_result = local_client.query(simple_verification_query, location="US-CENTRAL1").result()
            for row in verification_result:
                print(f"📊 Imported data summary:")
                print(f"   Total rows: {row.row_count:,}")
                
            # If _PARTITIONTIME exists, try the full verification
            if '_PARTITIONTIME' in [field.name for field in table.schema]:
                print("🔍 Running detailed verification with _PARTITIONTIME...")
                detailed_verification_query = f"""
                SELECT 
                    COUNT(*) as row_count,
                    MIN(_PARTITIONTIME) as min_partition_time,
                    MAX(_PARTITIONTIME) as max_partition_time
                FROM `{GCP_PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_TABLE}`
                WHERE _PARTITIONTIME = TIMESTAMP('2025-09-11')
                """
                
                detailed_result = local_client.query(detailed_verification_query, location="US-CENTRAL1").result()
                for row in detailed_result:
                    print(f"   Rows for 2025-09-11: {row.row_count:,}")
                    print(f"   Min partition time: {row.min_partition_time}")
                    print(f"   Max partition time: {row.max_partition_time}")
            else:
                print("⚠️  _PARTITIONTIME column not found, skipping detailed verification")
                
        except Exception as e:
            print(f"❌ Error during verification: {e}")
            print("💡 Table may have been created but verification failed")
        
        print("🎉 Cross-region data copy completed successfully!")
        return True
        
    except Exception as e:
        print(f"❌ Error during data copy: {e}")
        print("💡 Troubleshooting tips:")
        print("   - Check if the GDELT_TABLE exists and is accessible")
        print("   - Verify your project has BigQuery API enabled")
        print("   - Ensure you have the necessary permissions")
        return False

# Run the cross-region copy process
copy_success = copy_gdelt_partition_cross_region()

🔄 Starting cross-region GDELT data copy...
 Target date: September 11, 2025
 Source: gdelt-bq.gdeltv2.gkg_partitioned (US region)
 Destination: graph-demo-471710.gdelt.gkg_partitioned (US-CENTRAL1 region)
----------------------------------------------------------------------
✅ BigQuery client created
🔍 Checking if destination table already exists...
📝 Destination table doesn't exist, proceeding with data copy...
 Checking if dataset 'gdelt' exists...
✅ Dataset 'gdelt' already exists
📝 Creating dataset in US region for temporary table...
✅ Dataset 'gdelt_us' already exists in US region
🔍 Checking if temporary table already exists...
📊 Temporary table doesn't exist, querying GDELT data and saving to temporary table...
📊 Executing query...
🔍 Query: 
                SELECT *
                FROM `gdelt-bq.gdeltv2.gkg_partitioned`
                WHERE _PARTITIONTIME = TIMESTAMP('2025-09-11')
                
🎯 Destination: graph-demo-471710.gdelt_us.temp_gkg_partitioned
⏳ Query job started