In [None]:
import requests
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"


In [5]:
from google.cloud import storage

# 1. Setup (Replace with your specific Project ID)
project_id = "avid-circle-484315-e5" 
client = storage.Client(project=project_id)

# 2. Pick a name (MUST BE GLOBALLY UNIQUE)
bucket_name = "ingest-test-jianpeng-123" # <--- CHANGE THIS

# 3. Create it
try:
    # We create the bucket reference
    bucket = client.bucket(bucket_name)
    
    # We send the "create" command to Google
    # location="US" is standard. Use "EU" if you are in Europe.
    new_bucket = client.create_bucket(bucket, location="US")
    
    print(f"✅ Success! Bucket created: {new_bucket.name}")
    print(f"Location: {new_bucket.location}")

except Exception as e:
    print("❌ Creation failed.")
    print(e)



✅ Success! Bucket created: ingest-test-jianpeng-123
Location: US


In [1]:
import pandas as pd
from google.cloud import bigquery
from google.cloud import storage
import io

# --- CONFIGURATION (UPDATE THESE!) ---
PROJECT_ID = "avid-circle-484315-e5"
BUCKET_NAME = "ingest-test-jianpeng-123" # The one you just created
DATASET_NAME = "hello_world_dataset"        # We will create this inside BQ

# --- STEP 1: GENERATE DATA ---
print("1. Generating 'Hello World' data...")
data = [
    {"id": 1, "message": "Hello World", "status": "Success"},
    {"id": 2, "message": "This is a test", "status": "Pending"},
    {"id": 3, "message": "BigQuery is cool", "status": "Done"}
]
df = pd.read_json(io.StringIO(str(data).replace("'", '"')))

# --- STEP 2: UPLOAD TO BUCKET ---
print(f"2. Uploading to Bucket: {BUCKET_NAME}...")
storage_client = storage.Client(project=PROJECT_ID)
bucket = storage_client.bucket(BUCKET_NAME)
blob = bucket.blob("hello_world.parquet")

# Save DataFrame to Parquet format in memory, then upload
blob.upload_from_string(df.to_parquet(), content_type="application/octet-stream")
print("   ✅ Upload complete.")

# --- STEP 3: LOAD INTO BIGQUERY ---
print("3. Instructing BigQuery to ingest...")
bq_client = bigquery.Client(project=PROJECT_ID)

# Create the Dataset if it doesn't exist
dataset_ref = bq_client.dataset(DATASET_NAME)
try:
    bq_client.get_dataset(dataset_ref)
    print(f"   Dataset {DATASET_NAME} already exists.")
except Exception:
    bq_client.create_dataset(dataset_ref)
    print(f"   Created new dataset: {DATASET_NAME}")

# Configure the Load Job
table_ref = dataset_ref.table("my_first_table")
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # Overwrite table if exists
)

# The Magic Command: Load from URI (Not from script memory!)
uri = f"gs://{BUCKET_NAME}/hello_world.parquet"
load_job = bq_client.load_table_from_uri(uri, table_ref, job_config=job_config)

print("   ⏳ Job running...")
load_job.result() # Waits for the job to complete

print(f"   ✅ Job finished! Loaded {load_job.output_rows} rows into {DATASET_NAME}.my_first_table")

1. Generating 'Hello World' data...
2. Uploading to Bucket: ingest-test-jianpeng-123...




   ✅ Upload complete.
3. Instructing BigQuery to ingest...




   Created new dataset: hello_world_dataset
   ⏳ Job running...
   ✅ Job finished! Loaded 3 rows into hello_world_dataset.my_first_table


In [15]:
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.cloud import storage
import requests

#config
BUCKET_NAME = "data-engineering-zoomcamp-jianpeng-20260122"
PARQUET_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
PROJECT_ID = "avid-circle-484315-e5"


storage_client = storage.Client(project=PROJECT_ID)

#step 1, try to get the bucket, or create it if it doesn't exist
try:
    bucket = storage_client.get_bucket(BUCKET_NAME)
    print(f"Bucket {BUCKET_NAME} exists.")
except NotFound:
    print(f"Bucket {BUCKET_NAME} not found. Creating it...")
    # Note: location="US" (or "EU", "ASIA") is important for performance/compliance
    bucket = storage_client.create_bucket(BUCKET_NAME, location="US")
    print(f"Bucket {BUCKET_NAME} created successfully.")

print(f"2. Uploading to Bucket {BUCKET_NAME} ...")
blob = bucket.blob("hello_world.parquet")

#upload form parquet url
print(f"Starting stream from url")
with requests.get(PARQUET_URL, stream = True) as response:
    response.raise_for_status()
    response.raw.decode_content= True
    blob.upload_from_file(response.raw)
    print("   ✅ Upload complete.")

Bucket data-engineering-zoomcamp-jianpeng-20260122 exists.
2. Uploading to Bucket data-engineering-zoomcamp-jianpeng-20260122 ...
Starting stream from url
   ✅ Upload complete.


In [17]:
# --- STEP 3: LOAD INTO BIGQUERY ---
DATASET_NAME= "ny_taxi"

print("3. Instructing BigQuery to ingest...")
bq_client = bigquery.Client(project=PROJECT_ID)

# Create the Dataset if it doesn't exist
dataset_ref = bq_client.dataset(DATASET_NAME)
try:
    bq_client.get_dataset(dataset_ref)
    print(f"   Dataset {DATASET_NAME} already exists.")
except Exception:
    bq_client.create_dataset(dataset_ref)
    print(f"   Created new dataset: {DATASET_NAME}")

# Configure the Load Job
table_ref = dataset_ref.table("yellow_taxi_data")
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # Overwrite table if exists
)

# The Magic Command: Load from URI (Not from script memory!)
uri = f"gs://{BUCKET_NAME}/hello_world.parquet"
load_job = bq_client.load_table_from_uri(uri, table_ref, job_config=job_config)

print("   ⏳ Job running...")
load_job.result() # Waits for the job to complete

print(f"   ✅ Job finished! Loaded {load_job.output_rows} rows into {DATASET_NAME}.my_first_table")

3. Instructing BigQuery to ingest...
   Created new dataset: ny_taxi
   ⏳ Job running...
   ✅ Job finished! Loaded 2964624 rows into ny_taxi.my_first_table
