In [2]:
"""
Apache Beam pipeline for calculating temperature variance and statistics for different sensors.
This module demonstrates windowing, CombineFn usage, and integration with SciPy in Apache Beam.
"""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode
import numpy as np
# from scipy import stats # Scipy isn't strictly needed for basic std/var if numpy is available

# Import for Interactive Runner
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

class SensorStatisticsCombineFn(beam.CombineFn):
    """
    Calculates statistics (variance, standard deviation) for temperature values.
    """

    def create_accumulator(self):
        return []

    def add_input(self, accumulator, element, *args, **kwargs):
        accumulator.append(element)
        return accumulator

    def merge_accumulators(self, accumulators, *args, **kwargs):
        merged = []
        for acc in accumulators:
            merged.extend(acc)
        return merged

    def extract_output(self, accumulator, *args, **kwargs):
        if not accumulator:
            return {
                'count': 0,
                'mean': float('nan'),
                'std_dev': float('nan'),
                'variance': float('nan')
            }

        values = np.array(accumulator)

        std_dev = np.std(values, ddof=1) # Sample standard deviation
        variance = np.var(values, ddof=1) # Sample variance

        return {
            'count': len(values),
            'mean': np.mean(values),
            'std_dev': std_dev,
            'variance': variance
        }


def run_interactive_pipeline():
    """
    Creates and runs the Apache Beam pipeline with InteractiveRunner.
    """
    options = PipelineOptions()

    # Initialize the pipeline with InteractiveRunner
    with beam.Pipeline(runner=InteractiveRunner(), options=options) as p:
        data = [
            ('sensor_A', 20.0, 1678886400),  # March 15, 2023 00:00:00 GMT
            ('sensor_B', 25.0, 1678886410),
            ('sensor_A', 20.5, 1678886430),
            ('sensor_A', 21.0, 1678886460),
            ('sensor_B', 24.5, 1678886470),
            ('sensor_A', 20.3, 1678886490),
            ('sensor_A', 22.0, 1678886520),  # March 15, 2023 00:02:00 GMT (ends first window for A)
            ('sensor_B', 26.0, 1678886530),
            ('sensor_A', 23.0, 1678886580),  # March 15, 2023 00:03:00 GMT
            ('sensor_A', 20.0, 1678886640),  # Next window for sensor_A
            ('sensor_A', 21.0, 1678886650),
            ('sensor_A', 22.0, 1678886660),
        ]

        timed_data = (p
                      | 'Create Data' >> beam.Create(data)
                      | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue((x[0], x[1]), x[2]))
                      | 'To KV' >> beam.Map(lambda x: (x[0], x[1])))

        windowed_data = (timed_data
                         | 'Window' >> beam.WindowInto(
                    beam.window.FixedWindows(60),
                    trigger=AfterWatermark(),
                    accumulation_mode=AccumulationMode.DISCARDING
                ))

        statistics_results = (windowed_data
                            | 'Group and Calculate Statistics' >> beam.CombinePerKey(
                    SensorStatisticsCombineFn())
                            | 'Add Window Info' >> beam.MapTuple(
                    lambda k, v, window=beam.DoFn.WindowParam:
                    {
                        'sensor': k,
                        'window_start': window.start.to_utc_datetime(),
                        'statistics': v
                    }
                ))

        # Use ib.show() as before. The visualization options are now passed directly
        # or configured via ib.options.
        ib.show(statistics_results, visualize_data=True, n=100)

        # You can also use ib.collect() to get the results into a Python list
        # collected_results = ib.collect(statistics_results)
        # for result in collected_results:
        #     print(f"Collected: Sensor: {result['sensor']}, Window: {result['window_start']}, Stats: {result['statistics']}")

        # Optionally, run the full pipeline explicitly if not relying solely on ib.show/collect
        # p.run().wait_until_finish()


if __name__ == '__main__':
    # Corrected way to set interactive runner display options:
    # Use ib.options.pipeline_graph_display_enabled = True
    # and ib.options.data_display_enabled = True
    # Or for more explicit configuration, you might pass them as pipeline options.

    # Option 1: Set directly on ib.options (this is usually the most convenient)
    ib.options.pipeline_graph_display_enabled = True
    ib.options.data_display_enabled = True

    # Option 2 (Less common for these specific flags, but shows the pattern for PipelineOptions):
    # You could also potentially pass specific runner options via PipelineOptions,
    # but the interactive runner's display toggles are typically set via ib.options directly.
    # For example, to enable interactive_beam's graph display:
    # options = PipelineOptions([
    #    '--runner=InteractiveRunner',
    #    '--interactive_pipeline_graph_display_enabled=True',
    #    '--interactive_data_display_enabled=True'
    # ])
    # and then pass these options to beam.Pipeline(options=options)

    print("Running pipeline with Interactive Runner (best in Jupyter/IPython):")
    run_interactive_pipeline()

Running pipeline with Interactive Runner (best in Jupyter/IPython):


  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  ret = ret.dtype.type(ret / rcount)
  return _methods._var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  data = data.applymap(lambda x: str(x) if isinstance(x, dict) else x)
  rows = data.applymap(lambda x: str(x)).to_dict('split')['data']


  flattened = x.ravel()


  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  ret = ret.dtype.type(ret / rcount)
  return _methods._var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
