# Timeseries Workshop Notebook

## Prerequisites Installation

Ensure Python 3.8+ is installed
Install required Python packages

In [1]:
! pip install pymongo matplotlib pandas



---
## 1. Comparing Timeseries with Regular Collection
**Description**: We create a regular collection and a timeseries collection with identical schema, ingest identical data, then compare storage sizes.

In [None]:
MONGO_URI = "mongodb://localhost:27017"
CLUSTER = "sql-demo"

In [6]:
from pymongo import MongoClient
import time
import os

client = MongoClient(MONGO_URI)
db = client['timeseries_workshop']
# Drop existing collections if present
try:
    db.drop_collection('regular_collection')
    db.drop_collection('ts_collection')
except:
    pass

# Create regular collection
regular = db['regular_collection']
# Create timeseries collection with timeField="timestamp" and metaField="metadata"
ts = db.create_collection(
    'ts_collection',
    timeseries={
        'timeField': 'timestamp',
        'metaField': 'metadata',
        'granularity': 'seconds'
    }
)

ServerSelectionTimeoutError: sql-demo-shard-00-02.z2poe.mongodb.net:27017: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006) (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms),sql-demo-shard-00-01.z2poe.mongodb.net:27017: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006) (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms),sql-demo-shard-00-00.z2poe.mongodb.net:27017: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006) (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms), Timeout: 30s, Topology Description: <TopologyDescription id: 6841572e1e196a9a9078bcc8, topology_type: ReplicaSetNoPrimary, servers: [<ServerDescription ('sql-demo-shard-00-00.z2poe.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('sql-demo-shard-00-00.z2poe.mongodb.net:27017: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006) (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>, <ServerDescription ('sql-demo-shard-00-01.z2poe.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('sql-demo-shard-00-01.z2poe.mongodb.net:27017: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006) (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>, <ServerDescription ('sql-demo-shard-00-02.z2poe.mongodb.net', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('sql-demo-shard-00-02.z2poe.mongodb.net:27017: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1006) (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>]>

The code above establishes a connection to a local MongoDB instance, drops any existing collections named `regular_collection` and `ts_collection`, then creates a standard collection and a new timeseries collection specifying `timestamp` as the time field and `metadata` as the metadata field. Timeseries collections in MongoDB automatically partition data by time buckets, optimizing storage and queries on time-ordered data.

In [None]:
import random
from datetime import datetime, timedelta

# Function to generate a single sample document with well-distributed and statistically relevant values
def generate_sample_doc(base_time):
    # Distribute deviceId and sensorId
    device_ids = [f'PLC-Rig{str(i).zfill(3)}' for i in range(1, 11)]
    sensor_ids = [f'TempSensor-Bit{str(i).zfill(2)}' for i in range(1, 6)]
    locations = [
        {'site': 'Drilling Rig A', 'well': 'Well-XYZ', 'section': 'Drill Bit', 'latitude': 29.73656, 'longitude': -95.36980},
        {'site': 'Drilling Rig B', 'well': 'Well-ABC', 'section': 'Pump', 'latitude': 30.12345, 'longitude': -94.56789},
        {'site': 'Drilling Rig C', 'well': 'Well-DEF', 'section': 'Pipe', 'latitude': 28.98765, 'longitude': -96.54321}
    ]
    asset_metadata = [
        {'oilField': 'Permian Basin', 'rigType': 'Land-Based', 'operator': 'OperatorCorp'},
        {'oilField': 'Eagle Ford', 'rigType': 'Offshore', 'operator': 'DrillMasters'},
        {'oilField': 'Bakken', 'rigType': 'Land-Based', 'operator': 'PetroWorks'}
    ]
    # Simulate temperature with normal distribution, some outliers
    temp = round(random.gauss(150, 20), 2)
    if random.random() < 0.01:
        temp += random.choice([-40, 40])  # 1% outliers
    # Status distribution
    status = random.choices(['OK', 'WARN', 'FAIL'], weights=[0.85, 0.10, 0.05])[0]
    thresholds = {
        'lowWarning': 100.0,
        'highWarning': 150.0,
        'maxLimit': 175.0
    }
    return {
        'timestamp': base_time,
        'metadata': {
            'deviceId': random.choice(device_ids),
            'sensorId': random.choice(sensor_ids),
            'location': random.choice(locations),
            'assetMetadata': random.choice(asset_metadata)
        },
        'data': {
            'temperature': {
                'value': temp,
                'unit': 'C'
            },
            'status': status,
            'thresholds': thresholds
        }
    }

# Generate a list of 100,000 documents spaced by one second
docs = []
start_time = datetime.utcnow()
for i in range(100_000):
    docs.append(generate_sample_doc(start_time + timedelta(seconds=i)))

print(f"Generated {len(docs)} documents.")

This cell defines a helper function `generate_sample_doc` that returns a document matching the provided sample schema. We generate 100,000 documents spaced one second apart starting from the current UTC time. Each document has randomized temperature data between 100°C and 200°C.

In [None]:
# Regular inserts for regular collection
start = time.time()
regular.insert_many(docs)
elapsed_regular = time.time() - start

# Timeseries inserts (same docs)
start = time.time()
ts.insert_many(docs)
elapsed_ts = time.time() - start

print(f"InsertMany Regular: {elapsed_regular:.2f}s")
print(f"InsertMany Timeseries: {elapsed_ts:.2f}s")

We measure the time taken to insert 100,000 documents into a regular collection and a timeseries collection using `insert_many`. This highlights the throughput benefit, if any, of using timeseries collections.

In [None]:
stats_reg = db.command("collStats", "regular_collection")
stats_ts = db.command("collStats", "ts_collection")
size_reg = stats_reg['storageSize']
size_ts = stats_ts['storageSize']
print(f"Regular Collection Storage Size: {size_reg/1024/1024:.2f} MB")
print(f"Timeseries Collection Storage Size: {size_ts/1024/1024:.2f} MB")

The `collStats` command returns storage statistics for each collection. We extract `storageSize` to compare the disk usage of the regular versus timeseries collection. Typically, timeseries collections use bucket compression and fewer indexes, reducing disk footprint.

In [None]:
import matplotlib.pyplot as plt

labels = ['Regular', 'Timeseries']
sizes = [size_reg/1024/1024, size_ts/1024/1024]
plt.bar(labels, sizes)
plt.ylabel('Storage Size (MB)')
plt.title('Regular vs Timeseries Storage Size')
plt.show()

We use a simple bar chart to visualize the storage size comparison. This helps quickly identify the storage efficiency offered by timeseries collections.

---
## 2. Ingestion & Throughput Benchmarks
**Actions**: Benchmark three insertion methods (`insert_one`, `insert_many`, and bulk operations) for timeseries collections.

In [None]:
# Code Cell: Prepare fresh timeseries collection for benchmark
db.drop_collection('ts_benchmark')
ts_bench = db.create_collection(
    'ts_benchmark',
    timeseries={
        'timeField': 'timestamp',
        'metaField': 'metadata',
        'granularity': 'seconds'
    }
)
# Generate 10,000 new documents
docs_bench = [generate_sample_doc(start_time + timedelta(seconds=i)) for i in range(10_000)]

To isolate benchmarks from prior data, we drop and recreate a `ts_benchmark` collection. We generate 10,000 documents for throughput testing.

In [None]:
start = time.time()
for doc in docs_bench:
    ts_bench.insert_one(doc)
elapsed_insert_one = time.time() - start
print(f"InsertOne 10k documents: {elapsed_insert_one:.2f}s")

The above loop performs 10,000 `insert_one` operations, measuring the time taken. This method tends to be slower due to round-trips to the server for each document.

In [None]:
# Reset collection to empty
db.drop_collection('ts_benchmark')
ts_bench = db.create_collection('ts_benchmark', timeseries={'timeField': 'timestamp', 'metaField': 'metadata', 'granularity': 'seconds'})
start = time.time()
ts_bench.insert_many(docs_bench)
elapsed_insert_many = time.time() - start
print(f"InsertMany 10k documents: {elapsed_insert_many:.2f}s")

`insert_many` groups all documents into a single operation, reducing network overhead. We reset the collection and measure the time.

In [None]:
from pymongo import InsertOne
# Reset collection
db.drop_collection('ts_benchmark')
ts_bench = db.create_collection('ts_benchmark', timeseries={'timeField': 'timestamp', 'metaField': 'metadata', 'granularity': 'seconds'})
requests = [InsertOne(doc) for doc in docs_bench]
start = time.time()
ts_bench.bulk_write(requests)
elapsed_bulk = time.time() - start
print(f"BulkWrite 10k documents: {elapsed_bulk:.2f}s")

In [None]:
Using `bulk_write` with `InsertOne` operations also batches requests but allows finer control (e.g., ordering). We measure its performance here.

In [None]:
methods = ['InsertOne', 'InsertMany', 'BulkWrite']
times = [elapsed_insert_one, elapsed_insert_many, elapsed_bulk]
plt.figure(figsize=(8,5))
plt.bar(methods, times)
plt.ylabel('Time (s)')
plt.title('Insertion Method Throughput for 10k Docs')
plt.show()

A bar chart shows the relative performance: lower times indicate higher throughput. In general, `bulk_write` and `insert_many` outperform `insert_one` significantly, especially at scale.

---
## 3. Kafka Connector for Timeseries
**Description**: Demonstrate how to configure MongoDB Kafka Connector to stream data into a timeseries collection.

This configuration snippet defines a Kafka Connect sink connector that reads from a Kafka topic `sensor_readings_topic` and writes into a MongoDB timeseries collection `ts_kafka`. The `timeseries.*` properties ensure the target collection is treated as a timeseries collection. To test: start Kafka, create the topic, and produce JSON messages matching the document schema.

---
## 4. Querying Timeseries Data
### 4.1 Basic Queries
**Description**: Query ranges of time and filter by metadata.

In [None]:
# Reuse docs_bench list but insert limited subset

# Ensure ts_kafka exists as timeseries
try:
    db.drop_collection('ts_kafka')
except:
    pass
kwargs = {'timeseries': {'timeField': 'timestamp', 'metaField': 'metadata', 'granularity': 'seconds'}}
ts_kafka = db.create_collection('ts_kafka', **kwargs)
ts_kafka.insert_many(docs_bench[:5000])  # Insert 5k docs for querying

# Query: time range filter
from datetime import datetime
t_start = start_time + timedelta(seconds=100)
t_end = start_time + timedelta(seconds=200)
res = ts_kafka.find({
    'timestamp': {'$gte': t_start, '$lte': t_end},
    'metadata.deviceId': 'PLC-Rig123'
})
count = res.count()
print(f"Found {count} documents between {t_start} and {t_end}.")

We create a small timeseries collection `ts_kafka`, insert 5,000 documents, and perform a basic date range query filtering by `timestamp` and `metadata.deviceId`. The output `count` shows how many documents match the criteria.

### 4.2 Window Functions (Aggregation)
**Description**: Use `$setWindowFields` to compute moving averages of temperature over a sliding window.

In [None]:
# Code Cell: Moving Average of Temperature over 1-minute windows
pipeline = [
    {'$match': {'metadata.sensorId': 'TempSensor-Bit01'}},
    {'$setWindowFields': {
        'partitionBy': '$metadata.sensorId',
        'sortBy': {'timestamp': 1},
        'output': {
            'avgTemp': {
                '$avg': '$data.temperature.value',
                'window': {
                    'range': [-60, 0],  # Last 60 seconds
                    'unit': 'second'
                }
            }
        }
    }},
    {'$limit': 5}
]
for doc in ts_kafka.aggregate(pipeline):
    print(doc)

The aggregation pipeline uses `$setWindowFields` to partition data by `sensorId`, sort by `timestamp`, and compute a moving average (`avgTemp`) over the last 60 seconds. This gives a real-time rolling metric for temperature.

In [None]:
import pandas as pd
# Retrieve full timeseries with moving average
df = pd.DataFrame(list(ts_kafka.aggregate([
    {'$match': {'metadata.sensorId': 'TempSensor-Bit01'}},
    {'$setWindowFields': {
        'partitionBy': '$metadata.sensorId',
        'sortBy': {'timestamp': 1},
        'output': {
            'avgTemp': {
                '$avg': '$data.temperature.value',
                'window': {'range': [-60, 'unit': 'second'}, 'unit': 'second'}
            }
        }
    }}
])))

plt.figure(figsize=(10,5))
plt.plot(df['timestamp'], df['data.temperature.value'], label='Instant Temp')
plt.plot(df['timestamp'], df['avgTemp'], label='1-min Moving Avg')
plt.xlabel('Timestamp')
plt.ylabel('Temperature (C)')
plt.legend()
plt.title('Temperature and 1-minute Moving Average')
plt.show()

We load the aggregation results into a Pandas DataFrame and plot both the instantaneous temperature and its 1-minute moving average. This visual showcases how window functions enable real-time analytics directly in MongoDB without external processing.

---
## 5. Realtime Analytics & Materialized Views
### 5.1 Materialized View: Initial Creation
**Description**: Create a materialized view that summarizes maximum temperature per minute per sensor.

In [None]:
# Drop if exists
try:
    db.drop_collection('mv_max_temp_per_min')
except:
    pass

# Create aggregation pipeline for view
timeseries_view = [
    {'$match': {'metadata.sensorId': 'TempSensor-Bit01'}},
    {'$group': {
        '_id': {
            'sensorId': '$metadata.sensorId',
            'minute': {
                '$dateTrunc': {
                    'date': '$timestamp',
                    'unit': 'minute'
                }
            }
        },
        'maxTemp': {'$max': '$data.temperature.value'}
    }}
]
# Create view
db.create_collection(
    'mv_max_temp_per_min',
    viewOn='ts_kafka',
    pipeline=timeseries_view
)
print("Materialized view 'mv_max_temp_per_min' created.")

A view can be defined using an aggregation pipeline. Here, we create a view `mv_max_temp_per_min` that groups documents by sensor and minute, computing the maximum temperature.

In [None]:
# Code Cell: Query the materialized view
for doc in db['mv_max_temp_per_min'].find().limit(5):
    print(doc)

Querying the view returns aggregated results. In production, you could schedule an aggregation to write output to a new collection for fully materialized data.

### 5.2 Materialized View: Updating
**Description**: Demonstrate how to refresh materialized data by writing aggregated results to a new collection periodically.

In [None]:
# Code Cell: Refresh materialized data
pipeline_write = [
    {'$match': {'metadata.sensorId': 'TempSensor-Bit01'}},
    {'$group': {
        '_id': {
            'sensorId': '$metadata.sensorId',
            'minute': {'$dateTrunc': {'date': '$timestamp', 'unit': 'minute'}}
        },
        'maxTemp': {'$max': '$data.temperature.value'}
    }},
    {'$merge': {
        'into': 'materialized_max_temp_per_min',
        'on': ['_id.sensorId', '_id.minute'],
        'whenMatched': 'replace',
        'whenNotMatched': 'insert'
    }}
]
# Execute aggregation to merge into target collection
res = ts_kafka.aggregate(pipeline_write, allowDiskUse=True)
print("Materialized collection 'materialized_max_temp_per_min' updated.")

Using `$merge`, we write aggregated results into `materialized_max_temp_per_min`, replacing existing entries for each sensor-minute combination. In a production environment, this can run in a cron job or trigger pipeline for near real-time materialized data.

In [None]:
# Code Cell: Verify materialized collection
for doc in db['materialized_max_temp_per_min'].find().limit(5):
    print(doc)

The output confirms that aggregated documents are now stored in a dedicated collection, serving as a true materialized view.

---
## 6. Atlas Stream Processing
**Description**: Illustrate setting up Atlas Stream Processing (ASP) to perform real-time transformations.

In [None]:
# (Requires 'pymongo[srv]' and Atlas API credentials set in environment variables)

from pymongo.mongo_client import MongoClient as AtlasClient
from pymongo import InsertOne

# Initialize Atlas client (replace <uri> with your Atlas connection string)
atlas_client = AtlasClient(MONGO_URI)
atlas_db = atlas_client['timeseries_workshop_atlas']

# Define a sample ASP pipeline that triggers on ts_kafka and writes to analytics collection
pipeline_def = {
    'name': 'real_time_temp_processor',
    'match': {'operationType': 'insert'},
    'project': { 'timestamp': 1, 'data.temperature.value': 1, 'metadata.sensorId': 1 },
    'function': "function(event) { return { sensorId: event.fullDocument.metadata.sensorId, timestamp: event.fullDocument.timestamp, temperature: event.fullDocument.data.temperature.value }; }",
    'sink': {'atlasCluster': CLUSTER, 'database': 'analytics', 'collection': 'temp_events'}
}

# Create or update ASP pipeline (pseudocode; actual syntax may vary)
atlas_db.command('createStreamPipeline', pipeline_def)
print("Atlas Stream Processing pipeline 'real_time_temp_processor' configured.")

The code above provides a conceptual example of defining an ASP pipeline that listens to inserts on `ts_kafka`, projects relevant fields, and writes simplified events to a target analytics collection. Replace `<username>`, `<password>`, and `<targetCluster>` with actual Atlas credentials.

---
## 7. Data Tiering & Online Archive
**Description**: Show how to configure data tiering for timeseries, moving older data to an archived tier (e.g., AWS S3) and querying both in-place and archived data.

In [None]:
# Requires Atlas admin access and pymongo[srv]

from pymongo import MongoClient
atlas_client = MongoClient(MONGO_URI)
policy = {
    'collection': 'ts_kafka',
    'database': 'timeseries_workshop',
    'archiveName': 'ts_kafka_archive',
    'expireAfterDays': 30,
    'storage': {'type': 'AWS', 'bucketName': 'my-timeseries-archive-bucket', 'region': 'us-east-1'}
}

# Create Online Archive policy (actual Atlas CLI or API calls required)
atlas_client.admin.command('configureOnlineArchive', policy)
print("Online Archive policy for 'ts_kafka' configured to archive data >30 days old.")

Using Atlas APIs or UI, configure an Online Archive for `ts_kafka` so that documents older than 30 days automatically move to the specified AWS S3 bucket. Queries on the collection will transparently include both live and archived data.

---
## 8. Visualization: Atlas Charts
**Description**: Provide a conceptual guide for building charts in Atlas Charts to visualize timeseries data (no code cell executed here).

1. **Connect to Data Source**: In Atlas Charts, create a new dataset pointing to the `timeseries_workshop.ts_kafka` collection.
2. **Build a Line Chart**: Select `timestamp` on the X-axis, `data.temperature.value` on the Y-axis. Filter by `metadata.sensorId` if needed.
3. **Add Moving Average**: Use the aggregation feature in Charts to define a 1-minute moving average pipeline (similar to `$setWindowFields`).
4. **Dashboard Layout**: Create a dashboard combining:
   - Current temperature gauge (use a single-value chart with `$max` aggregation on `data.temperature.value`).
   - Time-series line chart with moving average overlay.
   - Heatmap of temperature distribution by hour of day (aggregate by `$hour` of `timestamp`).

Atlas Charts allows drag-and-drop chart building without code, leveraging MongoDB’s aggregation under the hood. Dashboards update in near-real-time for timeseries collections.

---
### High-Volume Sample Document Generator
```python
# Code Cell: Generate high-volume documents and write to JSON file for bulk ingestion
import json

n = 200_000  # Number of documents to generate
base_time = datetime.utcnow()

with open('high_volume_docs.json', 'w') as f:
    for i in range(n):
        doc = generate_sample_doc(base_time + timedelta(seconds=i))
        f.write(json.dumps(doc) + '\n')
print(f"Wrote {n} documents to high_volume_docs.json")
```

This cell generates 200,000 sample documents and writes them to a JSON file, one document per line. Use this file with `mongoimport` for high-throughput ingestion:
```bash
mongoimport --uri "mongodb://localhost:27017/timeseries_workshop" --collection ts_bulk --file high_volume_docs.json --numInsertionWorkers 4
```

---
### Performance Metrics Summary
- **Storage Efficiency**: Timeseries collections typically reduce storage by ~50% using internal bucketing and compression.
- **Insert Throughput**:
  - `insert_one`: ~X docs/sec (dependent on hardware; for 50k docs, e.g., 5s => 10k docs/sec).
  - `insert_many` / `bulk_write`: ~Y docs/sec (e.g., 1s => 50k docs/sec).
- **Query Latency**: Range queries on timeseries use index on `timestamp`, yielding sub-100ms for tens of thousands of docs.

Actual metrics vary by system. Always benchmark on representative infrastructure. MongoDB’s internal optimization for timeseries (bucket compression, fewer indexes) accelerates both writes and reads.

---