In [None]:
# Setup dan Import
!pip install -q apache-beam[gcp]
!pip install -q google-cloud-aiplatform
!pip install -q google-cloud-bigquery
!pip install -q google-cloud-firestore

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.firestore import WriteToFirestore
import json
import datetime
from google.cloud import aiplatform

In [None]:
# Setup project
PROJECT_ID = "your-project-id"
BUCKET = "your-bucket"
REGION = "your-region"
ENDPOINT_NAME = "air-quality-model"

In [None]:
# Define schema untuk BigQuery
SCHEMA = {
    'fields': [
        {'name': 'timestamp', 'type': 'TIMESTAMP'},
        {'name': 'device_id', 'type': 'STRING'},
        {'name': 'raw_pm25', 'type': 'FLOAT'},
        {'name': 'raw_pm10', 'type': 'FLOAT'},
        {'name': 'raw_o3', 'type': 'FLOAT'},
        {'name': 'raw_co', 'type': 'FLOAT'},
        {'name': 'raw_no2', 'type': 'FLOAT'},
        {'name': 'calibrated_pm25', 'type': 'FLOAT'},
        {'name': 'calibrated_pm10', 'type': 'FLOAT'},
        {'name': 'calibrated_o3', 'type': 'FLOAT'},
        {'name': 'calibrated_co', 'type': 'FLOAT'},
        {'name': 'calibrated_no2', 'type': 'FLOAT'},
        {'name': 'temperature', 'type': 'FLOAT'},
        {'name': 'humidity', 'type': 'FLOAT'}
    ]
}

In [None]:
# Define transformasi untuk pipeline
class PreprocessData(beam.DoFn):
    def process(self, element):
        """Preprocess raw data dari IoT devices"""
        try:
            # Parse JSON
            data = json.loads(element)
            
            # Validasi data
            required_fields = [
                'timestamp', 'device_id', 'pm25', 'pm10', 
                'o3', 'co', 'no2', 'temperature', 'humidity'
            ]
            
            if not all(field in data for field in required_fields):
                return []
            
            # Format data
            processed = {
                'timestamp': datetime.datetime.fromisoformat(data['timestamp']),
                'device_id': data['device_id'],
                'raw_pm25': float(data['pm25']),
                'raw_pm10': float(data['pm10']),
                'raw_o3': float(data['o3']),
                'raw_co': float(data['co']),
                'raw_no2': float(data['no2']),
                'temperature': float(data['temperature']),
                'humidity': float(data['humidity'])
            }
            
            return [processed]
            
        except Exception as e:
            print(f"Error processing data: {str(e)}")
            return []

class CalibrationTransform(beam.DoFn):
    def setup(self):
        """Initialize Vertex AI endpoint"""
        self.endpoint = aiplatform.Endpoint(
            endpoint_name=f"projects/{PROJECT_ID}/locations/{REGION}/endpoints/{ENDPOINT_NAME}"
        )
    
    def process(self, element):
        try:
            # Prepare data for prediction
            instance = {
                'pm25': element['raw_pm25'],
                'pm10': element['raw_pm10'],
                'o3': element['raw_o3'],
                'co': element['raw_co'],
                'no2': element['raw_no2'],
                'temperature': element['temperature'],
                'humidity': element['humidity']
            }
            
            # Get predictions from endpoint
            response = self.endpoint.predict(instances=[instance])
            calibrated = response.predictions[0]
            
            # Combine raw and calibrated data
            output = element.copy()
            output.update({
                'calibrated_pm25': calibrated['pm25_calibrated'],
                'calibrated_pm10': calibrated['pm10_calibrated'],
                'calibrated_o3': calibrated['o3_calibrated'],
                'calibrated_co': calibrated['co_calibrated'],
                'calibrated_no2': calibrated['no2_calibrated']
            })
            
            return [output]
            
        except Exception as e:
            print(f"Error in calibration: {str(e)}")
            return [element]

In [None]:
# Define pipeline
def build_pipeline(pipeline_args=None):
    pipeline_options = PipelineOptions(
        pipeline_args,
        streaming=True,
        project=PROJECT_ID,
        region=REGION,
        job_name='air-quality-pipeline'
    )
    
    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Read from Pub/Sub
        raw_data = (pipeline
                   | 'Read from PubSub' >> beam.io.ReadFromPubSub(
                       topic=f'projects/{PROJECT_ID}/topics/air-quality-data'
                   )
                   | 'Decode' >> beam.Map(lambda x: x.decode('utf-8')))
        
        # Process data
        processed_data = (raw_data
                         | 'Preprocess' >> beam.ParDo(PreprocessData())
                         | 'Calibrate' >> beam.ParDo(CalibrationTransform()))
        
        # Write to BigQuery
        (processed_data
         | 'Write to BigQuery' >> WriteToBigQuery(
             table=f'{PROJECT_ID}:air_quality.measurements',
             schema=SCHEMA,
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
         ))
        
        # Write to Firestore
        (processed_data
         | 'Format for Firestore' >> beam.Map(
             lambda x: {
                 'document_id': f"{x['device_id']}_{x['timestamp'].strftime('%Y%m%d%H%M%S')}",
                 'data': x
             }
         )
         | 'Write to Firestore' >> WriteToFirestore(
             project_id=PROJECT_ID,
             collection='air_quality_measurements'
         ))

In [None]:
# Test pipeline locally
def test_pipeline():
    # Test data
    test_data = {
        'timestamp': '2025-02-24T10:00:00',
        'device_id': 'test-device-001',
        'pm25': 25.0,
        'pm10': 45.0,
        'o3': 0.035,
        'co': 1.2,
        'no2': 0.045,
        'temperature': 25.0,
        'humidity': 65.0
    }
    
    # Run preprocessor
    preprocessor = PreprocessData()
    processed = next(preprocessor.process(json.dumps(test_data)))
    print("Preprocessed data:", processed)
    
    # Run calibration
    calibrator = CalibrationTransform()
    calibrator.setup()
    calibrated = next(calibrator.process(processed))
    print("Calibrated data:", calibrated)

In [None]:
# Run test
test_pipeline()

In [None]:
# Deploy pipeline
if __name__ == '__main__':
    build_pipeline([
        '--runner=DataflowRunner',
        f'--project={PROJECT_ID}',
        f'--region={REGION}',
        '--streaming'
    ])