Skip to content

tspannhw/FLiP-PulsarDevPython101

main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Code

Latest commit

 

Git stats

Files

Permalink
Failed to load latest commit information.
Type
Name
Latest commit message
Commit time
 
 
 
 
 
 

FLiP-PulsarDevPython101

Apache Pulsar Development 101 with Python

Setup Python Libraries 3.6++


pip3 install requests
pip3 install paho-mqtt
pip3 install kafka-python
pip3 install websocket-client
pip3 install pulsar-client=='2.9.1[all]'

Example

    # MQTT
    import paho.mqtt.client as mqtt
    client = mqtt.Client("rpi4-iot")
    client.connect("pulsar1", 1883, 180)
    client.publish("persistent://public/default/mqtt-2", payload=json_string, qos=0, retain=True)
    print("sent mqtt: " + json_string) 

    # Apache Kafka
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers='pulsar1:9092',retries=3)
    producer.send('rp4-kafka-1', json.dumps(row).encode('utf-8'))
    producer.flush()
    print("sent kafka")
    
    # Web Sockets
    import socket
    import requests
    import websocket, base64, json
    scheme = 'ws' # or wss
    TOPIC = scheme + '://pulsar1:8080/ws/v2/producer/persistent/public/default/energy'
    ws = websocket.create_connection(TOPIC)
    message = str(json.dumps(row) )
    message_bytes = message.encode('ascii')
    base64_bytes = base64.b64encode(message_bytes)
    base64_message = base64_bytes.decode('ascii')
    ws.send(json.dumps({ 'payload' : base64_message, 'properties': { 'device' : 'jetson2gb'},'key': str(uuid2), 'context' : 5 }))
    response =  json.loads(ws.recv())
    if response['result'] == 'ok':
            print ('Message published successfully')
    else:
            print ('Failed to publish message:', response)
    ws.close()
    
    # Native Pulsar Protocol
    import pulsar
    from pulsar.schema import *
    class enviroplus(Record):
        adjtemp = String()
    
    client = pulsar.Client('pulsar://pulsar1:6650')
    producer = client.create_producer(topic='persistent://public/default/rp4enviroplus' ,schema=JsonSchema(enviroplus),properties={"producer-name": 
               "enviroplus-py-sensor","producer-id": "enviroplus-sensor" })
    enviroRec = enviroplus()
    producer.send(enviroRec,partition_key=str(uniqueid))
    

Communication to Apache Pulsar

  • Via Pulsar Native Protocol
  • Via Apache Kafka Protocol: (Kafka-on-Pulsar: KoP)
  • Via MQTT Protocol (MQTT-on-Pulsar: MoP)
  • via Web Sockets
  • Via AMQP Protocol (AMQP-on-Pulsar: AoP) ie. RabbitMQ
  • Via HTTP Post REST

Main Resources

Pulsar Python Examples

AoP, MoP, KoP, RoP

References

Videos

About

Apache Pulsar Development 101 with Python

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published