Übung: IoT Broker und Filter mit K-native
-----------------------------

![](https://user-images.githubusercontent.com/16281246/116248768-1fe56080-a73a-11eb-9a85-8bdccb82d16c.png)

Quelle: K-native

- - - 

**Broker**
Ein Broker in Knative ist eine zentrale Komponente, die als Verteiler von Ereignissen (Events) fungiert. Er empfängt Ereignisse von verschiedenen Quellen und sorgt dafür, dass diese an die richtigen Abonnenten weitergeleitet werden. Ein Broker besteht aus zwei Hauptteilen:

* Ingress: Hier kommen die Ereignisse an.
* Channel: Ein internes Kommunikationssystem, das die Ereignisse an die Abonnenten weiterleitet.

**Filter (Trigger)**
Ein Filter, auch Trigger genannt, definiert eine Regel, die bestimmt, welche Ereignisse an eine bestimmte Knative-Services oder Endpunkte gesendet werden. Triggers filtern die Ereignisse basierend auf bestimmten Kriterien, wie z.B. dem Ereignistyp oder anderen Attributen, und leiten sie dann an den entsprechenden Empfänger weiter.

**Zusammenspiel von Broker und Trigger**
* Ereignisaufnahme: Der Broker empfängt Ereignisse von verschiedenen Quellen.
* Verteilung: Der Broker verteilt die Ereignisse an die entsprechenden Triggers.
* Filterung: Triggers filtern die Ereignisse gemäß den definierten Regeln.
* Zustellung: Gefilterte Ereignisse werden an die entsprechenden Services oder Endpunkte zugestellt.

Durch diese Architektur ermöglicht Knative eine lose Kopplung von Ereignisquellen und -empfängern, was die Skalierbarkeit und Flexibilität von serverlosen Anwendungen erhöht.

- - -

Zuerst erstellen wir den Kubernetes Namespace



In [None]:
import os
os.environ['NS_IOT']='ms-iot'
! kubectl create namespace ${NS_IOT}
! # kubectl label  namespace ${NS_IOT} istio-injection=enabled

### Dashboard

Jetzt ist ein guter Zeitpunkt um das Kubernetes Dashboard zu starten und dort im Pulldownmenu den Namespace "ms-brkr" auszuwählen.

Wählt nachfolgenden Link an und aktzeptiert das Zertifikat um dann ohne Token, drückt "Überspringen" oder "Skip", ins Dashboard zu wechseln.

In [None]:
! echo "https://"$(cat ~/work/server-ip)":8443"

Anschliessend folgen die Standard Microservices

Wir starten **kn-iot-consumer** und **kn-iot-pipe** mit genau einer Instanz.

In [None]:
%%bash
kn service create kn-iot-consumer --scale 1 --image registry.gitlab.com/ch-mc-b/autoshop-ms/app/iot/iot-consumer:1.0.0 --namespace ${NS_IOT}
kn service create kn-iot-pipe     --scale 1 --image registry.gitlab.com/ch-mc-b/autoshop-ms/app/iot/iot-pipe:1.0.0 --namespace ${NS_IOT}

### Hack

Damit die K-native Service wie normale Kubernetes Service ansprechbar sind, erzeugen wir Standard Services:

In [None]:
%%bash
cat <<EOF | kubectl apply --namespace ${NS_IOT} -f - 
apiVersion: v1
kind: Service
metadata:
  name: iot-alert
spec:
  selector:
    serving.knative.dev/service: kn-iot-alert
  ports:
  - port: 8080
    protocol: TCP
    targetPort: 8080    
---
apiVersion: v1
kind: Service
metadata:
  name: iot-consumer
spec:
  selector:
    serving.knative.dev/service: kn-iot-consumer
  ports:
  - port: 8080
    protocol: TCP
    targetPort: 8080     
---      
apiVersion: v1
kind: Service
metadata:
  name: iot-pipe
spec:
  selector:
    serving.knative.dev/service: kn-iot-pipe
  ports:
  - port: 8080
    protocol: TCP
    targetPort: 8080     
EOF

- - -
### IoT-Services Menu

Mittels des Menu können wir in die Microservices verzweigen und die empfangenen Nachrichten (Events) anschauen:

In [None]:
%%bash
cat <<%EOF% | kubectl --namespace ${NS_IOT} apply -f -
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: reverse-proxy
  name: reverse-proxy
spec:
  containers:
  - image: registry.gitlab.com/ch-mc-b/autoshop-ms/app/iot/reverse-proxy:1.0.0
    name: reverse-proxy
---    
apiVersion: v1
kind: Service
metadata:
  labels:
    app.kubernetes.io/name: reverse-proxy
  name: reverse-proxy
spec:
  ports:
  - port: 8080
    protocol: TCP
    targetPort: 8080
  selector:
    app.kubernetes.io/name:  reverse-proxy
  type: LoadBalancer
%EOF%
echo "http://"$(cat ~/work/server-ip)":"$(kubectl get service --namespace ${NS_IOT} reverse-proxy -o=jsonpath='{ .spec.ports[0].nodePort }')

- - -
### Broker

Für die Kommunkation erstellen wir einen Default Broker und stellen sicher, dass der Brocker (Ingress) mittels Port von aussen erreichbar ist.

In [None]:
! kn broker create default --namespace ${NS_IOT}
! kubectl patch service broker-ingress -n knative-eventing -p '{"spec": {"type": "LoadBalancer"}}'

- - -
### Trigger

Und drei Trigger welche gezielt die Nachrichten mit `type`: `alert`, `consumer` und `pipe` behandeln.

`iot` wird an `consumer` und `pipe` gesendent die anderen nur an die entsprechenden Namensvetter.

In [None]:
%%bash

cat <<EOF | kubectl apply --namespace ${NS_IOT}  -f -
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: iot-consumer-iot
spec:
  broker: default
  filter:
    attributes:
      type: iot 
  subscriber:
    ref:
     apiVersion: serving.knative.dev/v1
     kind: Service
     name: kn-iot-consumer
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: iot-pipe-iot
spec:
  broker: default
  filter:
    attributes:
      type: iot
  subscriber:
    ref:
     apiVersion: serving.knative.dev/v1
     kind: Service
     name: kn-iot-pipe
EOF

In [None]:
%%bash
cat <<EOF | kubectl apply --namespace ${NS_IOT} -f -
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: iot-pipe
spec:
  broker: default
  filter:
    attributes:
      type: iot-pipe 
  subscriber:
    ref:
     apiVersion: serving.knative.dev/v1
     kind: Service
     name: kn-iot-pipe
EOF

In [None]:
%%bash
cat <<EOF | kubectl apply --namespace ${NS_IOT}  -f -
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: iot-consumer
spec:
  broker: default
  filter:
    attributes:
      type: iot-consumer
  subscriber:
    ref:
     apiVersion: serving.knative.dev/v1
     kind: Service
     name: kn-iot-consumer
EOF

### Gesamtübersicht

Die Subscriptions filtern `"Ce-Type:` und leiten die Events weiter an die Microservices **consumer** und/oder **pipe**.

In [None]:
%%bash
echo "Broker URL (intern)"
kubectl get broker default -o jsonpath='{.status.address.url}' --namespace ${NS_IOT}
echo ""
echo "------------------------"
echo "Broker URL (extern)"
echo "http://$(cat ~/work/server-ip):"$(kubectl get svc broker-ingress -n knative-eventing -o jsonpath='{.spec.ports[?(@.port==80)].nodePort}')/${NS_IOT}/default
echo "------------------------"
echo "Triggers"
kn trigger list --namespace ${NS_IOT}
echo "------------------------"
echo "Microservice Menu"
echo "http://$(cat ~/work/server-ip):"$(kubectl get service --namespace ${NS_IOT} reverse-proxy -o=jsonpath='{ .spec.ports[0].nodePort }')

- - -
### Testen

Wir senden zuerst an beide Microservices Daten **consumer** und **pipe**

In [None]:
%%bash
PORT=$(kubectl get svc broker-ingress -n knative-eventing -o jsonpath='{.spec.ports[?(@.port==80)].nodePort}')
curl -s -X POST http://localhost:${PORT}/ms-iot/default \
-H "Host: broker-ingress.knative-eventing.svc.cluster.local" \
-H "Ce-Id: iot-hello" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: iot" \
-H "Ce-Source: curl" \
-H "Content-Type: text/plain" \
-d '0xBC,25.0,30.0'

Dann senden wir einen Event an die Microservice **consumer**

In [None]:
%%bash
PORT=$(kubectl get svc broker-ingress -n knative-eventing -o jsonpath='{.spec.ports[?(@.port==80)].nodePort}')
curl -s -X POST http://localhost:${PORT}/ms-iot/default \
-H "Host: broker-ingress.knative-eventing.svc.cluster.local" \
-H "Ce-Id: iot-hello" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: iot-consumer" \
-H "Ce-Source: curl" \
-H "Content-Type: text/plain" \
-d '0xBC,26.0,30.0'

Dann senden wir einen Event an den Microservice **pipe**

In [None]:
%%bash
curl -s -X POST http://10.152.183.194/ms-iot/default \
-H "Ce-Id: iot-hello" \
-H "Host: broker-ingress.knative-eventing.svc.cluster.local" \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: iot-pipe" \
-H "Ce-Source: curl" \
-H "Content-Type: text/plain" \
-d '0xBC,27.0,30.0'

- - -

### Message Broker vs. K-native Eventing

In einer Microservices-Architektur können Services direkt mit einem Message Broker wie Kafka kommunizieren, indem sie Nachrichten senden und empfangen. Dies ermöglicht eine robuste und skalierbare Kommunikation, erfordert jedoch, dass die Microservices die spezifischen APIs und Konfigurationen des Brokers kennen und verwalten.

Mit K-native Eventing hingegen wird der Message Broker abstrahiert, wodurch die Microservices von den Details des Brokers entkoppelt werden. Knative Eventing bietet ein standardisiertes Eventing-Modell und erleichtert das Routing und Verwalten von Events. Dies ermöglicht eine flexiblere und einfacher zu verwaltende Architektur, da die Microservices sich auf das Verarbeiten von Events konzentrieren können, ohne sich um die Details der Broker-Implementierung kümmern zu müssen.

Hier der gekürzte Code von **consumer** 

- - -

    @app.route('/', methods=['POST'])
    def receive_event():
        if request.headers.get('Content-Type') == 'text/plain':
            event_data = request.get_data(as_text=True)

            if event_data and event_data.startswith("0x"):
                print(f"value = {event_data}")
                sys.stdout.flush()
                received_messages.append(event_data)
                with open('/data/ml-data.csv', 'a', newline='') as csvfile:
                    csvwriter = csv.writer(csvfile)
                    csvwriter.writerow([event_data])

            return jsonify({"message": "Event received"}), 200
            
- - -            
und **pipe**

    @app.route('/', methods=['POST'])
    def receive_event():
        if request.headers.get('Content-Type') == 'text/plain':
            event_data = request.get_data(as_text=True)
            values = event_data.split(',')

            # Temperatur und Luftfeuchtigsensor
            if values[0].strip() == "0xBC":
                humtemp_data = {
                    "humtemp": {
                        "temp": values[1],
                        "hum": values[2]
                    }
                }
                received_messages.append(humtemp_data)
                print(humtemp_data)
                sys.stdout.flush()            
                return jsonify(humtemp_data)

- - -

Aufräumen


In [None]:
! kubectl delete pod --all --namespace ${NS_IOT} --grace-period=0 --force
! kubectl delete namespace ${NS_IOT} --force --grace-period=0