In [4]:
import pandas as pd
import requests
from pymongo import MongoClient
import time
from datetime import datetime
import schedule

# MongoDB setup
client = MongoClient("mongodb://localhost:27017/")
db = client["water_management"]
collection = db["sensor_data"]

def fetch_and_store_data():
    # Read CSV file
    df = pd.read_csv("/home/ubuntu/Data_Science/Big_Data/Water-Resource-Management-Platform/data/index_of_sensors.csv")

    # Loop through each row in the CSV
    for index, row in df.iterrows():
        endpoint_url = row[-2]  # Assuming the endpoint URL is in the second last column
        response = requests.get(endpoint_url)
        
        if response.status_code == 200:
            json_data = response.json()

            # Extract and transform data
            for feature in json_data["features"]:
                geometry = feature["geometry"]
                properties = feature["properties"]

                # Handle timestamp data
                timestamp_data = properties["data"]["timestamp"]

                # If timestamp_data is a list, iterate over it
                if isinstance(timestamp_data, list):
                    for timestamp_epoch in timestamp_data:
                        process_and_store_data(geometry, properties, timestamp_epoch)
                else:
                    process_and_store_data(geometry, properties, timestamp_data)
        else:
            print(f"Failed to fetch data from {endpoint_url}, status code: {response.status_code}")

def process_and_store_data(geometry, properties, timestamp_epoch):
    try:
        # Convert epoch to human-readable timestamp
        timestamp_human = datetime.utcfromtimestamp(timestamp_epoch).strftime('%Y-%m-%d %H:%M:%S')

        # Prepare document
        document = {
            "geometry": geometry,
            "properties": properties,
            "timestamp": timestamp_human,
            "value": properties["data"]["value"]
        }

        # Insert document into MongoDB
        collection.update_one(
            {"timestamp": timestamp_human, "value": properties["data"]["value"]},
            {"$set": document},
            upsert=True
        )
    except Exception as e:
        print(f"Error processing data: {e}")

# Schedule the job to run every 45 minutes
schedule.every(45).minutes.do(fetch_and_store_data)

# Run the job immediately at startup
fetch_and_store_data()

while True:
    schedule.run_pending()
    time.sleep(1)


  endpoint_url = row[-2]  # Assuming the endpoint URL is in the second last column


KeyboardInterrupt: 