In [12]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os
from apache_beam import window
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode, AfterCount
from datetime import datetime
from dotenv import load_dotenv, find_dotenv

# Load environment variables
_ = load_dotenv(find_dotenv())

GCP_PROJECT_ID = os.getenv('GCP_PROJECT_ID')

BUCKET_NAME = "poc-beams-k"
REGION = "us-east1"

# Define pipeline options for Dataflow
options = PipelineOptions(
    runner="DataflowRunner",
    project=GCP_PROJECT_ID,
    job_name="jupyter-dataflow-job",
    temp_location=f"gs://{BUCKET_NAME}/temp/",
    region=REGION,
    streaming=True  # Set to False for batch processing
)

PROJECT_ID = GCP_PROJECT_ID
INPUT_SUBSCRIPTION_ADDRESS = f"projects/{GCP_PROJECT_ID}/subscriptions/poc-apache-beam-sub"
OUTPUT_TOPIC_ADDRESS = f"projects/{GCP_PROJECT_ID}/topics/apache-beam-bounded"

options.view_as(StandardOptions).streaming = True 

p = beam.Pipeline(options=options)

def calculateProfit(elements):
    """Calculates profit by subtracting cost from revenue"""
    try:
        revenue = int(elements[6])
        cost = int(elements[5])
        profit = revenue - cost
        return elements + [profit]
    except (IndexError, ValueError):
        return elements + [0]  # Default profit to 0 in case of issues

def custom_timestamp(elements):
    """Apply a timestamp to the elements"""
    try:
        event_timestamp = int(elements[-2])  # Assuming timestamp is second last column
        return beam.window.TimestampedValue(elements, event_timestamp)
    except (IndexError, ValueError):
        return beam.window.TimestampedValue(elements, 0)

def encode_byte_string(elements):
    """Encode elements as byte strings for Pub/Sub"""
    print(elements)
    return str(elements).encode('utf-8')


pubsub_data = (
    p 
    | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=INPUT_SUBSCRIPTION_ADDRESS)  
    | 'Decode Byte Data' >> beam.Map(lambda data: data.decode('utf-8').strip())  
    | 'Split Row' >> beam.Map(lambda row: row.split(','))  
    | 'Filter By Country' >> beam.Filter(lambda elements: len(elements) > 1 and elements[1] in ["Mumbai", "Bangalore"])  
    | 'Create Profit Column' >> beam.Map(calculateProfit)  
    | 'Apply Custom Timestamp' >> beam.Map(custom_timestamp)  
    | 'Form Key-Value Pair' >> beam.Map(lambda elements: (elements[0], int(elements[-1])) if len(elements) > 8 else (elements[0], 0))  
    | 'Apply Windowing' >> beam.WindowInto(window.FixedWindows(20))  
    | 'Sum Values' >> beam.CombinePerKey(sum)  
    | 'Encode to Byte String' >> beam.Map(encode_byte_string)  
    | 'Write to Pub/Sub' >> beam.io.WriteToPubSub(OUTPUT_TOPIC_ADDRESS)  
)

# Run the pipeline on Dataflow
result = p.run()
result.wait_until_finish()




ERROR:apache_beam.runners.dataflow.dataflow_runner:2025-02-10T01:40:39.642Z: JOB_MESSAGE_ERROR: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 690, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/opt/conda/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 2086, in <lambda>
    wrapper = lambda x: [fn(x)]
                         ^^^^^
  File "/tmp/ipykernel_72/1040416404.py", line 49, in custom_timestamp
    _ENABLE_GOOGLE_CLOUD_PROFILER = 'enable_google_cloud_profiler'
                   ^^^^
NameError: name 'beam' is not defined

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 311, in _execute
    response = task()
               ^^^^^^
  File "/usr/

AssertionError: Job did not reach to a terminal state after waiting indefinitely. Console URL: https://console.cloud.google.com/dataflow/jobs/<RegionId>/2025-02-09_17_37_08-11738697405577880303?project=<ProjectId>