# MapR Streams

MapR Streams brings integrated publish/subscribe messaging to the MapR Converged Data Platform. MapR Streams is ideal for a variety of use cases, including:

- **Application event pipelines:** Many types of applications generate event or log data that needs to be centrally stored and analyzed to gain insights about user activity or application performance. MapR Streams simplifies these pipelines by transporting events to a central location where they can undergo event-by-event transformation and analysis.
- **Database change capture:** Most modern databases allow users to generate an event each time an entry is added or modified. These events can be produced to MapR Streams to keep systems like search indices and caches synchronized, as well as feed security or notification applications.
- **Internet of Things:** The explosion in the number of smart devices and sensors has created many situations in which billions of data points are created by millions of geographically dispersed sensors. MapR Streams provides a reliable, global transport for these messages, allowing analytics to be done both at the source and at a central location.

## Sample Producer

The following example code produces three messages to a topic named mytopic in a stream named my_stream.

In [8]:
from mapr_streams_python import Producer
p = Producer({'streams.producer.default.stream': '/user/mapr/mystream:mytopic'})
some_data_source= ["msg1", "msg2", "msg3", '\0']
for data in some_data_source:
    p.produce('/user/mapr/mystream:mytopic', data.encode('utf-8'))
p.flush()

## Sample Consumer

In the following example, the MapR Streams consumer is subscribed to my_stream/mytopic and it prints the content of each message that it reads.

In [9]:
# Consumer
from mapr_streams_python import Consumer, KafkaError
c = Consumer({'group.id': 'mygroup',
              'default.topic.config': {'auto.offset.reset': 'earliest'}})
c.subscribe(['/user/mapr/mystream:mytopic'])
running = True
while running:
  msg = c.poll(timeout=1.0)
  if msg is None: continue
  if not msg.error():
    msg_value = msg.value().decode('utf-8')
    if msg_value  is '\0':
      running = False
    print('Received message: %s' % msg_value)
  elif msg.error().code() != KafkaError._PARTITION_EOF:
    print(msg.error())
    running = False
c.close()

Received message: msg1
Received message: msg2
Received message: msg3
Received message:  
