## Prerequisites

- Google Cloud SDK (`gcloud`) installed
- GCP project with billing enabled
- Dataproc API enabled
- Compute Engine API enabled (for Solr VM)
- Appropriate IAM permissions

In [1]:
import os
import sys
import subprocess
import json
import time
from google.cloud import storage
from dotenv import load_dotenv

# Ensure we are in the project root
if os.path.basename(os.getcwd()) == "notebooks":
    os.chdir("..")
print(f"Current working directory: {os.getcwd()}")

# Load environment variables from .env file
load_dotenv()
print("✓ Loaded configuration from .env file")

Current working directory: /Users/r39132/Projects/spark-solr-indexer
✓ Loaded configuration from .env file


## Configuration

Load GCP project details from `.env` file. Edit `.env` in the project root to customize settings.

In [2]:
# Load configuration from environment variables
PROJECT_ID = os.getenv("GCP_PROJECT_ID", "your-project-id")
REGION = os.getenv("GCP_REGION", "us-central1")
ZONE = os.getenv("GCP_ZONE", "us-central1-a")
BUCKET_NAME = os.getenv("GCS_BUCKET_NAME", f"{PROJECT_ID}-spark-solr-data")

# Dataproc Configuration
CLUSTER_NAME = os.getenv("DATAPROC_CLUSTER_NAME", "spark-solr-cluster")
DATAPROC_MASTER_TYPE = os.getenv("DATAPROC_MASTER_TYPE", "n1-standard-4")
DATAPROC_WORKER_TYPE = os.getenv("DATAPROC_WORKER_TYPE", "n1-standard-4")
DATAPROC_WORKER_COUNT = int(os.getenv("DATAPROC_WORKER_COUNT", "2"))

# Solr Configuration (VM-based)
SOLR_VM_NAME = os.getenv("SOLR_VM_NAME", "solr-instance")
SOLR_VM_TYPE = os.getenv("SOLR_VM_TYPE", "n1-standard-2")
SOLR_EXTERNAL_IP = None  # Will be set after VM creation

print(f"Project: {PROJECT_ID}")
print(f"Region: {REGION}")
print(f"Zone: {ZONE}")
print(f"Bucket: {BUCKET_NAME}")
print(f"Dataproc Cluster: {CLUSTER_NAME} ({DATAPROC_WORKER_COUNT} workers)")
print(f"Solr VM: {SOLR_VM_NAME}")

Project: family-tree-469815
Region: us-central1
Zone: us-central1-a
Bucket: family-tree-469815-spark-solr-data
Dataproc Cluster: spark-solr-cluster (2 workers)
Solr VM: solr-instance


## 1. Authenticate with GCP

Login to Google Cloud Platform.

In [4]:
!gcloud auth login
!gcloud config set project {PROJECT_ID}

Your browser has been opened to visit:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=32555940559.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8085%2F&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fappengine.admin+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=RnMD6UmwQgHtpEOtFd2ker4e7924Xj&access_type=offline&code_challenge=N7n-OopMkZ_MHl2B8aBfynei82SmBxg1hBGiGtVHKK8&code_challenge_method=S256


You are now logged in as [r39132@gmail.com].
Your current project is [family-tree-469815].  You can change this setting by running:
  $ gcloud config set project PROJECT_ID
Updated property [core/project].


## 2. Create GCS Bucket

Create a Cloud Storage bucket to store data and scripts.

In [3]:
# Check if bucket exists using gsutil (uses your authenticated credentials)
check_result = subprocess.run(
    f"gsutil ls -b gs://{BUCKET_NAME}/",
    shell=True,
    capture_output=True,
    text=True
)

if check_result.returncode == 0:
    print(f"✓ Using existing bucket: {BUCKET_NAME}")
else:
    # Bucket doesn't exist, create it
    print(f"Creating bucket: {BUCKET_NAME}...")
    create_result = subprocess.run(
        f"gsutil mb -p {PROJECT_ID} -c STANDARD -l {REGION} gs://{BUCKET_NAME}/",
        shell=True,
        capture_output=True,
        text=True
    )
    
    if create_result.returncode == 0:
        print(f"✓ Created bucket: {BUCKET_NAME}")
    else:
        print(f"✗ Failed to create bucket: {create_result.stderr}")
        raise Exception(f"Bucket creation failed: {create_result.stderr}")

# Initialize storage client for Python API access (for blob operations)
# storage_client = storage.Client(project=PROJECT_ID)
# bucket = storage_client.bucket(BUCKET_NAME)

✓ Using existing bucket: family-tree-469815-spark-solr-data


## 3. Generate and Upload Data

Generate dummy data locally and upload to GCS.

In [4]:
# Check if data already exists locally
local_data_exists = False
if os.path.exists("data/dummy_data.json"):
    with open("data/dummy_data.json") as f:
        lines = f.readlines()
    if len(lines) > 0:
        print(f"⏭️  Using existing local data: {len(lines)} records")
        local_data_exists = True

# Generate data if needed
if not local_data_exists:
    print("Generating data...")
    !python3 data_gen/generate_data.py
    with open("data/dummy_data.json") as f:
        lines = f.readlines()
    print(f"✓ Generated {len(lines)} records locally")

# Check if data already exists in GCS using gsutil
check_result = subprocess.run(
    f"gsutil ls gs://{BUCKET_NAME}/data/dummy_data.json",
    shell=True,
    capture_output=True,
    text=True
)

if check_result.returncode == 0:
    print(f"⏭️  Data already exists in GCS: gs://{BUCKET_NAME}/data/dummy_data.json")
else:
    # Upload to GCS using gsutil
    print(f"Uploading data to GCS...")
    upload_result = subprocess.run(
        f"gsutil cp data/dummy_data.json gs://{BUCKET_NAME}/data/",
        shell=True,
        capture_output=True,
        text=True
    )
    
    if upload_result.returncode == 0:
        print(f"✓ Uploaded to gs://{BUCKET_NAME}/data/dummy_data.json")
    else:
        print(f"✗ Upload failed: {upload_result.stderr}")
        raise Exception(f"Upload failed: {upload_result.stderr}")

⏭️  Using existing local data: 1000 records
⏭️  Data already exists in GCS: gs://family-tree-469815-spark-solr-data/data/dummy_data.json
⏭️  Data already exists in GCS: gs://family-tree-469815-spark-solr-data/data/dummy_data.json


## 4. Create Solr VM on GCE

Launch a Compute Engine VM and install Solr Cloud.

In [5]:
# Set Solr IP from existing VM
ip_result = subprocess.run(
    f"gcloud compute instances describe {SOLR_VM_NAME} --zone={ZONE} --format='get(networkInterfaces[0].accessConfigs[0].natIP)'",
    shell=True, capture_output=True, text=True
)
SOLR_EXTERNAL_IP = ip_result.stdout.strip()
print(f"✓ Solr VM IP: {SOLR_EXTERNAL_IP}")
print(f"  Solr URL: http://{SOLR_EXTERNAL_IP}:8983")

✓ Solr VM IP: 34.121.247.203
  Solr URL: http://34.121.247.203:8983


In [6]:
# Check if Solr VM already exists
vm_exists_cmd = f"gcloud compute instances describe {SOLR_VM_NAME} --zone={ZONE} --format='get(name)' 2>/dev/null"
result = subprocess.run(vm_exists_cmd, shell=True, capture_output=True, text=True)

if result.stdout.strip() == SOLR_VM_NAME:
    print(f"⏭️  Solr VM '{SOLR_VM_NAME}' already exists")
    # Get existing IP
    ip_result = subprocess.run(
        f"gcloud compute instances describe {SOLR_VM_NAME} --zone={ZONE} --format='get(networkInterfaces[0].accessConfigs[0].natIP)'",
        shell=True, capture_output=True, text=True
    )
    SOLR_EXTERNAL_IP = ip_result.stdout.strip()
    print(f"  Solr URL: http://{SOLR_EXTERNAL_IP}:8983")
else:
    # Create Solr VM with startup script
    startup_script = """#!/bin/bash
apt-get update
apt-get install -y openjdk-11-jdk wget

# Download and setup Solr
cd /opt
wget https://archive.apache.org/dist/lucene/solr/8.11.3/solr-8.11.3.tgz
tar xzf solr-8.11.3.tgz
cd solr-8.11.3

# Start Solr in cloud mode
bin/solr start -c -m 2g

# Create collection
bin/solr create -c dummy_data -s 1 -rf 1

echo "Solr started on port 8983"
"""

    # Create VM
    create_vm_cmd = f"""
gcloud compute instances create {SOLR_VM_NAME} \\
    --project={PROJECT_ID} \\
    --zone={ZONE} \\
    --machine-type={SOLR_VM_TYPE} \\
    --image-family=debian-11 \\
    --image-project=debian-cloud \\
    --boot-disk-size=50GB \\
    --tags=solr-server \\
    --metadata=startup-script='{startup_script}'
"""

    print("Creating Solr VM...")
    !{create_vm_cmd}

    # Create firewall rule for Solr
    !gcloud compute firewall-rules create allow-solr \
        --project={PROJECT_ID} \
        --allow=tcp:8983 \
        --target-tags=solr-server \
        --description="Allow Solr traffic" \
        2>/dev/null || echo "Firewall rule already exists"

    # Wait for VM to be ready
    time.sleep(60)

    # Get external IP
    result = subprocess.run(
        f"gcloud compute instances describe {SOLR_VM_NAME} --zone={ZONE} --format='get(networkInterfaces[0].accessConfigs[0].natIP)'",
        shell=True, capture_output=True, text=True
    )
    SOLR_EXTERNAL_IP = result.stdout.strip()
    print(f"✓ Solr VM created with IP: {SOLR_EXTERNAL_IP}")
    print(f"  Solr URL: http://{SOLR_EXTERNAL_IP}:8983")

⏭️  Solr VM 'solr-instance' already exists
  Solr URL: http://34.121.247.203:8983
  Solr URL: http://34.121.247.203:8983


## 5. Create Dataproc Cluster

Launch a Dataproc cluster for running Spark jobs.

In [7]:
# Check if Dataproc cluster already exists
cluster_exists_cmd = f"gcloud dataproc clusters describe {CLUSTER_NAME} --region={REGION} --format='get(clusterName)' 2>/dev/null"
result = subprocess.run(cluster_exists_cmd, shell=True, capture_output=True, text=True)

if result.stdout.strip() == CLUSTER_NAME:
    print(f"⏭️  Dataproc cluster '{CLUSTER_NAME}' already exists")
else:
    # Create Dataproc cluster
    create_cluster_cmd = f"""
gcloud dataproc clusters create {CLUSTER_NAME} \\
    --project={PROJECT_ID} \\
    --region={REGION} \\
    --master-machine-type={DATAPROC_MASTER_TYPE} \\
    --worker-machine-type={DATAPROC_WORKER_TYPE} \\
    --num-workers={DATAPROC_WORKER_COUNT} \\
    --image-version=2.1-debian11 \\
    --enable-component-gateway \\
    --optional-components=JUPYTER \\
    --max-idle=3600s
"""

    print("Creating Dataproc cluster (this may take 3-5 minutes)...")
    !{create_cluster_cmd}
    print(f"✓ Dataproc cluster '{CLUSTER_NAME}' created")

⏭️  Dataproc cluster 'spark-solr-cluster' already exists


## 6. Index Data with Dataproc

Submit the Spark job to index data into GCP Solr. This step checks if indexing is already complete and skips if verified.

In [8]:
# Pre-download spark-solr JAR to GCS for faster job execution (avoids Maven dependency resolution)
import os
import urllib.request

# Check if JAR exists using gsutil (avoids Python client permission issues)
check_jar_cmd = f"gsutil -q stat gs://{BUCKET_NAME}/jars/spark-solr-4.0.0-shaded.jar"
jar_exists = subprocess.run(check_jar_cmd, shell=True).returncode == 0

if not jar_exists:
    print("Downloading spark-solr JAR (one-time setup)...")
    
    # Create local jars directory
    os.makedirs("jars", exist_ok=True)
    local_jar_path = "jars/spark-solr-4.0.0-shaded.jar"
    
    # Download JAR if not already local
    if not os.path.exists(local_jar_path):
        jar_url = "https://repo1.maven.org/maven2/com/lucidworks/spark/spark-solr/4.0.0/spark-solr-4.0.0-shaded.jar"
        print(f"  Downloading from Maven Central...")
        urllib.request.urlretrieve(jar_url, local_jar_path)
        print(f"  ✓ Downloaded to {local_jar_path}")
    
    # Upload to GCS using gsutil
    upload_cmd = f"gsutil cp {local_jar_path} gs://{BUCKET_NAME}/jars/"
    result = subprocess.run(upload_cmd, shell=True, capture_output=True, text=True)
    
    if result.returncode == 0:
        print(f"✓ Uploaded JAR to gs://{BUCKET_NAME}/jars/")
    else:
        print(f"✗ Upload failed: {result.stderr}")
else:
    print(f"⏭️  JAR already exists at gs://{BUCKET_NAME}/jars/spark-solr-4.0.0-shaded.jar")

Downloading spark-solr JAR (one-time setup)...
  Downloading from Maven Central...
  ✓ Downloaded to jars/spark-solr-4.0.0-shaded.jar
  ✓ Downloaded to jars/spark-solr-4.0.0-shaded.jar
✓ Uploaded JAR to gs://family-tree-469815-spark-solr-data/jars/
✓ Uploaded JAR to gs://family-tree-469815-spark-solr-data/jars/


In [None]:
# Check if indexing is already complete
import requests

def check_gcp_indexing_complete():
    """Check if data is already indexed in GCP Solr"""
    try:
        # Get local document count
        with open("data/dummy_data.json") as f:
            local_count = sum(1 for _ in f)
        
        # Get Solr document count
        response = requests.get(
            f"http://{SOLR_EXTERNAL_IP}:8983/solr/dummy_data/select?q=*:*&rows=0", 
            timeout=10
        )
        if response.status_code == 200:
            solr_count = response.json()['response']['numFound']
            
            if local_count == solr_count and solr_count > 0:
                # Verify sample document exists
                with open("data/dummy_data.json") as f:
                    first_doc = json.loads(f.readline())
                    doc_id = first_doc['id']
                
                check_response = requests.get(
                    f"http://{SOLR_EXTERNAL_IP}:8983/solr/dummy_data/select?q=id:{doc_id}&rows=1",
                    timeout=10
                )
                if check_response.status_code == 200:
                    match_count = check_response.json()['response']['numFound']
                    if match_count > 0:
                        return True, solr_count
        return False, 0
    except Exception as e:
        print(f"Check failed: {e}")
        return False, 0

already_indexed, doc_count = check_gcp_indexing_complete()

if already_indexed:
    print(f"⏭️  Skipping indexing: {doc_count} documents already indexed and verified in GCP Solr")
else:
    # Get Solr Internal IP for ZooKeeper connection (required by spark-solr)
    ip_result = subprocess.run(
        f"gcloud compute instances describe {SOLR_VM_NAME} --zone={ZONE} --format='get(networkInterfaces[0].networkIP)'",
        shell=True, capture_output=True, text=True
    )
    SOLR_INTERNAL_IP = ip_result.stdout.strip()
    print(f"✓ Solr Internal IP: {SOLR_INTERNAL_IP}")

    # Always upload the latest version of the job to ensure fixes are applied
    # Modify the Spark job to use GCS paths and remote Solr
    gcp_spark_job = f"""
from pyspark.sql import SparkSession
import os

def main():
    spark = SparkSession.builder \\
        .appName("SolrIndexer-GCP") \\
        .getOrCreate()

    # Read JSON data from GCS
    input_file = "gs://{BUCKET_NAME}/data/dummy_data.json"
    print(f"Reading data from {{input_file}}")
    
    df = spark.read.json(input_file)
    
    print("Schema:")
    df.printSchema()
    
    # Solr configuration
    # Use Internal IP for ZooKeeper (accessible within VPC)
    zk_host = "{SOLR_INTERNAL_IP}:9983"
    collection = "dummy_data"
    
    print(f"Indexing to Solr collection '{{collection}}' via ZK '{{zk_host}}'...")
    
    # Write to Solr using ZK (standard method)
    df.write.format("solr") \\
        .option("zkhost", zk_host) \\
        .option("collection", collection) \\
        .option("gen_uniq_key", "true") \\
        .option("commit_within", "1000") \\
        .mode("overwrite") \\
        .save()
        
    print("Indexing complete.")
    spark.stop()

if __name__ == "__main__":
    main()
"""
    # Save and upload job
    os.makedirs("spark_job", exist_ok=True)
    with open("spark_job/index_to_solr_gcp.py", "w") as f:
        f.write(gcp_spark_job)
    
    # Upload using gsutil
    upload_job_cmd = f"gsutil cp spark_job/index_to_solr_gcp.py gs://{BUCKET_NAME}/jobs/"
    subprocess.run(upload_job_cmd, shell=True, check=True)
    print(f"✓ Uploaded job to gs://{BUCKET_NAME}/jobs/index_to_solr_gcp.py")
    
    # Submit job to Dataproc using pre-downloaded JAR (much faster)
    submit_job_cmd = f"""
gcloud dataproc jobs submit pyspark \\
    gs://{BUCKET_NAME}/jobs/index_to_solr_gcp.py \\
    --cluster={CLUSTER_NAME} \\
    --region={REGION} \\
    --jars=gs://{BUCKET_NAME}/jars/spark-solr-4.0.0-shaded.jar
"""

    print("Submitting Spark job to Dataproc...")
    !{submit_job_cmd}
    print("✓ Job completed")

✓ Solr Internal IP: 10.128.0.2


Copying file://spark_job/index_to_solr_gcp.py [Content-Type=text/x-python]...
/ [1 files][   1000 B/   1000 B]                                                
Operation completed over 1 objects/1000.0 B.                                     
/ [1 files][   1000 B/   1000 B]                                                
Operation completed over 1 objects/1000.0 B.                                     


✓ Uploaded job to gs://family-tree-469815-spark-solr-data/jobs/index_to_solr_gcp.py
Submitting Spark job to Dataproc...
Job [9e330051be42431ba95b7060b09f71c8] submitted.
Waiting for job output...
Job [9e330051be42431ba95b7060b09f71c8] submitted.
Waiting for job output...
25/11/25 02:40:07 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
25/11/25 02:40:07 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
25/11/25 02:40:08 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
25/11/25 02:40:08 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
25/11/25 02:40:08 INFO org.sparkproject.jetty.util.log: Logging initialized @4271ms to org.sparkproject.jetty.util.log.Slf4jLog
25/11/25 02:40:08 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_472-b08
25/11/25 02:40:08 INFO org.sparkproject.jetty.server.Server: Started @4388ms
25/1

## 7. Verify Indexing

Query the GCP-hosted Solr instance to verify data was indexed.

In [11]:
import requests

# Query Solr for document count
try:
    response = requests.get(f"http://{SOLR_EXTERNAL_IP}:8983/solr/dummy_data/select?q=*:*&rows=0")
    if response.status_code == 200:
        result = response.json()
        num_docs = result['response']['numFound']
        print(f"✓ Indexed {num_docs} documents in Solr")
    else:
        print("✗ Failed to query Solr")
except Exception as e:
    print(f"✗ Query failed: {e}")

# Show sample documents
print("\nSample documents:")
!curl -s "http://{SOLR_EXTERNAL_IP}:8983/solr/dummy_data/select?q=*:*&rows=3" | python3 -m json.tool

✓ Indexed 0 documents in Solr

Sample documents:
{
    "responseHeader": {
        "zkConnected": true,
        "status": 0,
        "QTime": 1,
        "params": {
            "q": "*:*",
            "rows": "3"
        }
    },
    "response": {
        "numFound": 0,
        "start": 0,
        "numFoundExact": true,
        "docs": []
    }
}
{
    "responseHeader": {
        "zkConnected": true,
        "status": 0,
        "QTime": 1,
        "params": {
            "q": "*:*",
            "rows": "3"
        }
    },
    "response": {
        "numFound": 0,
        "start": 0,
        "numFoundExact": true,
        "docs": []
    }
}


## 8. Cleanup Resources

**Important:** Delete GCP resources to avoid ongoing charges.

In [None]:
# Uncomment to clean up all GCP resources

# # Delete Dataproc cluster
# !gcloud dataproc clusters delete {CLUSTER_NAME} --region={REGION} --quiet
# print("✓ Deleted Dataproc cluster")

# # Delete Solr VM
# !gcloud compute instances delete {SOLR_VM_NAME} --zone={ZONE} --quiet
# print("✓ Deleted Solr VM")

# # Delete firewall rule
# !gcloud compute firewall-rules delete allow-solr --quiet
# print("✓ Deleted firewall rule")

# # Delete GCS bucket
# !gsutil -m rm -r gs://{BUCKET_NAME}
# print("✓ Deleted GCS bucket")

print("To clean up, uncomment the commands above")

## Cost Estimation

- **Dataproc Cluster**: ~$0.50-1.00/hour (2 workers + 1 master)
- **Solr VM**: ~$0.10-0.20/hour (n1-standard-2)
- **Storage**: ~$0.02/GB/month
- **Network Egress**: Variable

**Remember to delete resources when not in use!**