In [1]:
!pip install apache-beam[gcp]

Collecting apache-beam[gcp]
  Using cached apache_beam-2.68.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam[gcp])
  Using cached crcmod-1.7.tar.gz (89 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dill<0.3.2,>=0.3.1.1 (from apache-beam[gcp])
  Using cached dill-0.3.1.1.tar.gz (151 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastavro<2,>=0.23.6 (from apache-beam[gcp])
  Using cached fastavro-1.12.1-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (5.8 kB)
Collecting fasteners<1.0,>=0.3 (from apache-beam[gcp])
  Using cached fasteners-0.20-py3-none-any.whl.metadata (4.8 kB)
Collecting grpcio!=1.48.0,!=1.59.*,!=1.60.*,!=1.61.*,!=1.62.0,!=1.62.1,<1.66.0,<2,>=1.33.1 (from apache-beam[gcp])
  Using cached grpcio-1.65.5-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.3 kB)
Collecting hdfs<3.0.0,>=2.1.0 (from a

In [7]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import tempfile
import os


def parse_weather_data(line):
    """Parse CSV line into weather data dictionary"""
    try:
        if not line or line.strip() == '':
            return None

        parts = line.split(',')

        # Skip only the header line (first field is literally "station")
        if parts[0].strip().lower() == 'station':
            return None

        if len(parts) >= 3:
            station = parts[0].strip()
            date = parts[1].strip()
            temp_str = parts[2].strip()

            if temp_str and temp_str != '':
                try:
                    temp = float(temp_str)
                    return {
                        'station': station,
                        'date': date[:10] if len(date) >= 10 else date,
                        'temperature': temp
                    }
                except ValueError:
                    return None
    except Exception:
        return None

    return None


def format_station_result(element):
    """Format station average output"""
    station, temps = element
    temps_list = list(temps)
    avg_temp = sum(temps_list) / len(temps_list)
    return f"Station {station}: Avg Temp = {avg_temp:.2f}°C, Readings = {len(temps_list)}"


def format_date_result(element):
    """Format date average output"""
    date, temps = element
    temps_list = list(temps)
    avg_temp = sum(temps_list) / len(temps_list)
    return f"Date {date}: Avg Temp = {avg_temp:.2f}°C, Readings = {len(temps_list)}"


class CalculateExtremesCombineFn(beam.CombineFn):
    """Custom CombineFn for calculating temperature extremes"""

    def create_accumulator(self):
        return {'temps': []}

    def add_input(self, accumulator, element):
        accumulator['temps'].append(element)
        return accumulator

    def merge_accumulators(self, accumulators):
        merged = {'temps': []}
        for acc in accumulators:
            merged['temps'].extend(acc['temps'])
        return merged

    def extract_output(self, accumulator):
        temps = accumulator['temps']
        if not temps:
            return {
                'max': 0.0,
                'min': 0.0,
                'avg': 0.0,
                'count': 0
            }
        return {
            'max': max(temps),
            'min': min(temps),
            'avg': sum(temps) / len(temps),
            'count': len(temps)
        }


def format_extremes(stats):
    """Format extreme temperature results"""
    return (f"Max: {stats['max']:.2f}°C, "
            f"Min: {stats['min']:.2f}°C, "
            f"Avg: {stats['avg']:.2f}°C, "
            f"Total Readings: {stats['count']}")


def classify_temperature(record):
    """Classify temperature into ranges"""
    temp = record['temperature']
    if temp < 0:
        return ('Very Cold (<0°C)', 1)
    elif temp < 10:
        return ('Cold (0-10°C)', 1)
    elif temp < 20:
        return ('Mild (10-20°C)', 1)
    elif temp < 30:
        return ('Warm (20-30°C)', 1)
    else:
        return ('Hot (>30°C)', 1)


def format_distribution(element):
    """Format temperature distribution"""
    category, count = element
    return f"{category}: {count} readings"


def run_pipeline():
    """Execute the weather analysis pipeline"""

    # Sample weather data
    sample_data = [
        'station,date,temperature,humidity,pressure',
        'STATION_001,2025-09-01,15.5,65,1013',
        'STATION_001,2025-09-01,16.2,63,1012',
        'STATION_002,2025-09-01,12.8,70,1015',
        'STATION_001,2025-09-02,14.9,68,1014',
        'STATION_002,2025-09-02,11.5,72,1016',
        'STATION_003,2025-09-01,25.3,45,1010',
        'STATION_003,2025-09-02,26.1,43,1009',
        'STATION_001,2025-09-03,17.8,60,1011',
        'STATION_002,2025-09-03,13.2,68,1014',
        'STATION_003,2025-09-03,24.5,47,1012',
    ]

    # Create temp directory for output
    output_dir = tempfile.mkdtemp()

    # Pipeline options
    options = PipelineOptions([
        '--runner=DirectRunner',
    ])


    with beam.Pipeline(options=options) as pipeline:

        # Read and parse data
        weather_data = (
            pipeline
            | 'Create Sample Data' >> beam.Create(sample_data)
            | 'Parse Weather Data' >> beam.Map(parse_weather_data)
            | 'Filter Valid Records' >> beam.Filter(lambda x: x is not None)
        )

        # Analysis 1: Average temperature by station
        avg_by_station = (
            weather_data
            | 'Extract Station-Temp' >> beam.Map(lambda x: (x['station'], x['temperature']))
            | 'Group By Station' >> beam.GroupByKey()
            | 'Calculate Station Avg' >> beam.Map(format_station_result)
            | 'Write Station Results' >> beam.io.WriteToText(
                os.path.join(output_dir, 'station_avg'),
                file_name_suffix='.txt',
                shard_name_template=''
            )
        )

        # Analysis 2: Average temperature by date
        avg_by_date = (
            weather_data
            | 'Extract Date-Temp' >> beam.Map(lambda x: (x['date'], x['temperature']))
            | 'Group By Date' >> beam.GroupByKey()
            | 'Calculate Date Avg' >> beam.Map(format_date_result)
            | 'Write Date Results' >> beam.io.WriteToText(
                os.path.join(output_dir, 'date_avg'),
                file_name_suffix='.txt',
                shard_name_template=''
            )
        )

        # Analysis 3: Extreme temperatures
        extremes = (
            weather_data
            | 'Extract Temperatures' >> beam.Map(lambda x: x['temperature'])
            | 'Calculate Extremes' >> beam.CombineGlobally(CalculateExtremesCombineFn())
            | 'Format Extremes' >> beam.Map(format_extremes)
            | 'Write Extremes' >> beam.io.WriteToText(
                os.path.join(output_dir, 'extremes'),
                file_name_suffix='.txt',
                shard_name_template=''
            )
        )

        # Analysis 4: Temperature distribution
        distribution = (
            weather_data
            | 'Classify Temps' >> beam.Map(classify_temperature)
            | 'Sum By Category' >> beam.CombinePerKey(sum)
            | 'Format Distribution' >> beam.Map(format_distribution)
            | 'Write Distribution' >> beam.io.WriteToText(
                os.path.join(output_dir, 'distribution'),
                file_name_suffix='.txt',
                shard_name_template=''
            )
        )

    # Read and display results
    print("\n--- Analysis 1: Average Temperature by Station ---")
    with open(os.path.join(output_dir, 'station_avg.txt'), 'r') as f:
        print(f.read())

    print("--- Analysis 2: Average Temperature by Date ---")
    with open(os.path.join(output_dir, 'date_avg.txt'), 'r') as f:
        print(f.read())

    print("--- Analysis 3: Extreme Temperatures ---")
    with open(os.path.join(output_dir, 'extremes.txt'), 'r') as f:
        print(f.read())

    print("--- Analysis 4: Temperature Distribution ---")
    with open(os.path.join(output_dir, 'distribution.txt'), 'r') as f:
        print(f.read())


if __name__ == '__main__':
    run_pipeline()


--- Analysis 1: Average Temperature by Station ---
Station STATION_001: Avg Temp = 16.10°C, Readings = 4
Station STATION_002: Avg Temp = 12.50°C, Readings = 3
Station STATION_003: Avg Temp = 25.30°C, Readings = 3

--- Analysis 2: Average Temperature by Date ---
Date 2025-09-01: Avg Temp = 17.45°C, Readings = 4
Date 2025-09-02: Avg Temp = 17.50°C, Readings = 3
Date 2025-09-03: Avg Temp = 18.50°C, Readings = 3

--- Analysis 3: Extreme Temperatures ---
Max: 26.10°C, Min: 11.50°C, Avg: 17.78°C, Total Readings: 10

--- Analysis 4: Temperature Distribution ---
Mild (10-20°C): 7 readings
Warm (20-30°C): 3 readings

