In [1]:
pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.2.10-py2.py3-none-any.whl.metadata (10.0 kB)
Downloading kafka_python-2.2.10-py2.py3-none-any.whl (309 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/309.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/309.3 kB[0m [31m2.5 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m309.3/309.3 kB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.2.10


In [2]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [3]:
import json
import time
import random
import threading
import queue
import datetime

In [4]:
# Configuration
SOURCE_FILE = "/content/drive/MyDrive/SmartFactory/Dataset/factory_sensor_data.json"
BATCH_SIZE = 10
STREAM_INTERVAL = 1
ENABLE_ANOMALY = True
USE_KAFKA = False  # Set to True when integrating Kafka later

In [5]:
# Load historical sensor data
with open(SOURCE_FILE, 'r') as f:
    sensor_data = json.load(f)

In [6]:
# Queue to simulate streaming pipeline
data_queue = queue.Queue()
stream_manifest = []

In [7]:
def inject_anomalies(record):
    """Randomly inject small anomalies for simulation."""
    anomaly_chance = 0.05
    if random.random() < anomaly_chance:
        record["vibration"] *= random.uniform(1.5, 3.0)
        record["failure"] = 1  # force failure label
        record["note"] = "anomaly_injected"
    return record

In [8]:
def data_streamer():
    """Simulates real-time IoT streaming."""
    idx = 0
    while idx < len(sensor_data):
        batch = sensor_data[idx:idx + BATCH_SIZE]
        for record in batch:
            record["timestamp"] = datetime.datetime.now().isoformat()
            if ENABLE_ANOMALY:
                record = inject_anomalies(record)
            data_queue.put(record)
        idx += BATCH_SIZE
        time.sleep(STREAM_INTERVAL)

In [9]:
def file_data_ingestor():
    """Ingest data from stream and write JSON files."""
    buffer = []
    file_index = 0
    total_records = 0

    while True:
        try:
            record = data_queue.get(timeout=10)
            buffer.append(record)

            if len(buffer) >= BATCH_SIZE:
                filename = f"/content/drive/MyDrive/SmartFactory/Results/Week5/StreamBatch/stream_batch_{file_index}.json"
                with open(filename, 'w') as f_out:
                    json.dump(buffer, f_out, indent=2)
                stream_manifest.append({
                    "batch_file": filename,
                    "records": len(buffer),
                    "timestamp": datetime.datetime.now().isoformat()
                })
                print(f"[SAVED] Batch {file_index} | Records: {len(buffer)}")
                total_records += len(buffer)
                buffer.clear()
                file_index += 1

        except queue.Empty:
            print(f"[INFO] Ingestion complete. Total records saved: {total_records}")
            break

    # Save manifest
    with open(f"/content/drive/MyDrive/SmartFactory/Results/Week5/stream_manifest.json", 'w') as f_manifest:
        json.dump(stream_manifest, f_manifest, indent=2)
    print(f"[MANIFEST] Created: stream_manifest.json")

In [10]:
def kafka_data_ingestor():
    """Placeholder for Kafka integration (extend later)."""
    from kafka import KafkaProducer
    producer = KafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    topic = "smartfactory-stream"

    total_sent = 0
    while True:
        try:
            record = data_queue.get(timeout=10)
            producer.send(topic, record)
            total_sent += 1
            if total_sent % 100 == 0:
                print(f"[KAFKA] Total messages sent: {total_sent}")
        except queue.Empty:
            print(f"[KAFKA] Streaming complete. Total messages sent: {total_sent}")
            break
    producer.flush()

In [11]:
# Setup streamer and appropriate ingestor
stream_thread = threading.Thread(target=data_streamer)
ingest_thread = threading.Thread(
    target=kafka_data_ingestor if USE_KAFKA else file_data_ingestor
)

stream_thread.start()
ingest_thread.start()

stream_thread.join()
ingest_thread.join()

print("Extended streaming simulation complete.")

[SAVED] Batch 0 | Records: 10
[SAVED] Batch 1 | Records: 10
[SAVED] Batch 2 | Records: 10
[SAVED] Batch 3 | Records: 10
[SAVED] Batch 4 | Records: 10
[SAVED] Batch 5 | Records: 10
[SAVED] Batch 6 | Records: 10
[SAVED] Batch 7 | Records: 10
[SAVED] Batch 8 | Records: 10
[SAVED] Batch 9 | Records: 10
[SAVED] Batch 10 | Records: 10
[SAVED] Batch 11 | Records: 10
[SAVED] Batch 12 | Records: 10
[SAVED] Batch 13 | Records: 10
[SAVED] Batch 14 | Records: 10
[SAVED] Batch 15 | Records: 10
[SAVED] Batch 16 | Records: 10
[SAVED] Batch 17 | Records: 10
[SAVED] Batch 18 | Records: 10
[SAVED] Batch 19 | Records: 10
[SAVED] Batch 20 | Records: 10
[SAVED] Batch 21 | Records: 10
[SAVED] Batch 22 | Records: 10
[SAVED] Batch 23 | Records: 10
[SAVED] Batch 24 | Records: 10
[SAVED] Batch 25 | Records: 10
[SAVED] Batch 26 | Records: 10
[SAVED] Batch 27 | Records: 10
[SAVED] Batch 28 | Records: 10
[SAVED] Batch 29 | Records: 10
[SAVED] Batch 30 | Records: 10
[SAVED] Batch 31 | Records: 10
[SAVED] Batch 32 |