# ContentPulse Data Ingestion
Load JSON files from Volume into Delta Table using Spark Streaming


In [2]:
%run ./contentpulse_config


üì∞ ContentPulse Project Configuration
   Project: ContentPulse_Publishing_Analytics
   User: kunal.gaurav@databricks.com
   Volume: /Volumes/kunal/publishing/publishing_data
   Table: kunal.publishing.content_engagement_events
üóÑÔ∏è  Lakebase Configuration
   Instance: kunal-gaurav-lakebase-instance
   Catalog: pg_contentpulse_kunal-gaurav
   Synced Table: kunal.publishing.content_engagement_synced
‚úÖ Catalog 'kunal' ready
‚úÖ Schema 'kunal.publishing' ready
‚úÖ Volume '/Volumes/kunal/publishing/publishing_data' ready
‚úÖ Table 'kunal.publishing.content_engagement_events' ready with Change Data Feed enabled


In [7]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, DoubleType, IntegerType, TimestampType, BooleanType

# Define schema matching ContentPulse data generation
schema = StructType() \
    .add("event_id", StringType()) \
    .add("timestamp", TimestampType()) \
    .add("event_type", StringType()) \
    .add("reader_id", StringType()) \
    .add("article_id", StringType()) \
    .add("article_title", StringType()) \
    .add("category", StringType()) \
    .add("publication", StringType()) \
    .add("device_type", StringType()) \
    .add("country", StringType()) \
    .add("city", StringType()) \
    .add("latitude", DoubleType()) \
    .add("longitude", DoubleType()) \
    .add("time_on_page_seconds", IntegerType()) \
    .add("scroll_depth_percent", IntegerType()) \
    .add("num_comments", IntegerType()) \
    .add("num_shares", IntegerType()) \
    .add("ad_impressions", IntegerType()) \
    .add("estimated_ad_revenue", DoubleType()) \
    .add("is_subscriber", BooleanType()) \
    .add("subscription_tier", StringType())

# Volume path for Content Events (matches data generation path)
volume_path = f"{VOLUME_PATH}/CONTENT_EVENTS"
print(f"üìÅ Source Volume: {volume_path}")

# Checkpoint location
checkpoint_path = f"{VOLUME_PATH}/checkpoint"
print(f"üíæ Checkpoint: {checkpoint_path}")

# Read from JSON files using Auto Loader (cloudFiles)
stream_df = (spark.readStream
    .format("cloudFiles")
    .schema(schema)
    .option("cloudFiles.format", "json")
    .load(volume_path))

# Write stream to Delta table
query = (stream_df
 .writeStream
 .format("delta")
 .outputMode("append")
 .option("checkpointLocation", checkpoint_path)
 .trigger(processingTime="10 seconds")  # Process every 10 seconds
 .queryName("contentpulse_streaming")  # Named query
 .table(FULL_TABLE_NAME)
)

print(f"‚úÖ Streaming started: {query.name}")
print(f"üìä Query ID: {query.id}")
print(f"üéØ Target Table: {FULL_TABLE_NAME}")


üìÅ Source Volume: /Volumes/kunal/publishing/publishing_data/CONTENT_EVENTS
üíæ Checkpoint: /Volumes/kunal/publishing/publishing_data/checkpoint
‚úÖ Streaming started: contentpulse_streaming
üìä Query ID: 23e94ea2-d9e4-4be8-8e32-51b18a0b65c9
üéØ Target Table: kunal.publishing.content_engagement_events


In [9]:
# Get query stats
query_status = query.status

print("=" * 70)
print("üìä STREAMING QUERY STATUS")
print("=" * 70)
print(f"Query Name: {query.name}")
print(f"Query ID: {query.id}")
print(f"Is Active: {query.isActive}")
print(f"Message: {query_status['message']}")
print(f"Data Available: {query_status['isDataAvailable']}")
print(f"Trigger Active: {query_status['isTriggerActive']}")
print("=" * 70)

# Get recent progress
recent_progress = query.recentProgress
if recent_progress:
    latest = recent_progress[-1]
    print("\nüìà LATEST PROGRESS:")
    print(f"Batch ID: {latest.get('batchId', 'N/A')}")
    print(f"Input Rows: {latest.get('numInputRows', 0)}")
    print(f"Processing Time: {latest.get('durationMs', {}).get('triggerExecution', 'N/A')} ms")
    print(f"Timestamp: {latest.get('timestamp', 'N/A')}")
else:
    print("\n‚è≥ No progress yet - waiting for data...")


	status = StatusCode.INTERNAL
	details = "[INVALID_HANDLE.SESSION_CLOSED] The handle 3b868867-0b3b-4a07-aa20-9db4bcc88f4a is invalid. Session was closed. SQLSTATE: HY000"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_status:13, grpc_message:"[INVALID_HANDLE.SESSION_CLOSED] The handle 3b868867-0b3b-4a07-aa20-9db4bcc88f4a is invalid. Session was closed. SQLSTATE: HY000"}"
>.


SparkConnectGrpcException: (org.apache.spark.SparkSQLException) [INVALID_HANDLE.SESSION_CLOSED] The handle 3b868867-0b3b-4a07-aa20-9db4bcc88f4a is invalid. Session was closed. SQLSTATE: HY000

In [5]:
# Check table data
print("üìä Sample data from Delta table:")
display(spark.sql(f"SELECT * FROM {FULL_TABLE_NAME} LIMIT 10"))


üìä Sample data from Delta table:


Unnamed: 0,event_id,timestamp,event_type,reader_id,article_id,article_title,category,publication,device_type,country,city,latitude,longitude,time_on_page_seconds,scroll_depth_percent,num_comments,num_shares,ad_impressions,estimated_ad_revenue,is_subscriber,subscription_tier
0,evt_df086c127063,2025-12-16 12:59:31.248774,page_view,reader_8894,art_6986,Oscar Predictions: Who Will Win Big?,Entertainment,Architectural Digest,mobile,USA,Los Angeles,34.0522,-118.2437,229,66,0,0,5,0.39,False,free
1,evt_eeebe583685a,2025-12-16 12:59:48.763339,page_view,reader_59963,art_8714,Travel Safety Tips for Solo Travelers,Travel,Architectural Digest,mobile,Brazil,S√£o Paulo,-23.5505,-46.6333,597,47,0,0,3,0.26,True,none
2,evt_6e11e17eda74,2025-12-16 12:59:10.648137,page_view,reader_83240,art_1177,Budget Travel: See the World for Less,Travel,Architectural Digest,mobile,Australia,Sydney,-33.8688,151.2093,324,46,0,0,5,0.24,True,basic
3,evt_8fb4263b2bc6,2025-12-16 12:59:44.996809,comment,reader_18603,art_3867,Startup Success Stories: Lessons Learned,Business,The New Yorker,mobile,Brazil,S√£o Paulo,-23.5505,-46.6333,0,0,1,0,0,0.0,True,premium
4,evt_4a7696c364d3,2025-12-16 12:59:16.196076,share,reader_72298,art_4850,AI Revolution: How ChatGPT is Changing Everything,Technology,Architectural Digest,mobile,UK,London,51.5074,-0.1278,0,0,0,1,0,0.0,True,free
5,evt_bd160935546b,2025-12-16 12:59:28.483958,page_view,reader_17693,art_1073,Healthy Eating: Nutritionist's Top Tips,Food,Architectural Digest,mobile,USA,New York,40.7128,-74.006,552,33,0,0,5,0.46,False,premium
6,evt_a9033b53e5a6,2025-12-16 12:59:38.907894,scroll,reader_32741,art_3328,The Return of Y2K Style: What's Hot Now,Fashion,Architectural Digest,desktop,USA,Los Angeles,34.0522,-118.2437,0,94,0,0,0,0.0,True,free
7,evt_839b2466fd2e,2025-12-16 12:59:50.818020,page_view,reader_25274,art_2978,The Future of Electric Vehicles,Technology,Bon App√©tit,mobile,Canada,Toronto,43.6532,-79.3832,135,54,0,0,8,0.15,False,basic
8,evt_1c01b99acebf,2025-12-16 13:00:20.143518,subscribe,reader_50994,art_6171,Sports Technology: The Future of Athletics,Sports,Glamour,desktop,Australia,Sydney,-33.8688,151.2093,0,0,0,0,0,0.0,False,basic
9,evt_9971fb868f9a,2025-12-16 12:59:49.274358,scroll,reader_28584,art_8234,Super Bowl Preview: Who Will Win?,Sports,Architectural Digest,mobile,Canada,Toronto,43.6532,-79.3832,0,56,0,0,0,0.0,False,basic


In [6]:
# Stop the streaming query (run this when you want to stop)
try:
    query.stop()
    print(f"‚èπÔ∏è  Streaming query '{query.name}' stopped successfully")
    print(f"   Final status: {query.isActive}")
except Exception as e:
    print(f"‚ùå Error stopping query: {e}")

# Verify all streaming queries are stopped
active_queries = spark.streams.active
if active_queries:
    print(f"\n‚ö†Ô∏è  Still {len(active_queries)} active streaming queries:")
    for q in active_queries:
        print(f"   - {q.name} (ID: {q.id})")
else:
    print("\n‚úÖ No active streaming queries")


‚èπÔ∏è  Streaming query 'contentpulse_streaming' stopped successfully
   Final status: False

‚úÖ No active streaming queries
