## Welcome to ACME eCommerce LLC 

This is your first day on the job. Everyone of our empoyees is its own special flower...

Now make us money

ACME eCommerce is in the business of selling things to people. Your first task is to tell me how many people we had on the site.

## Lets create our first kafka topic
Open a bash session inside the kafka image and create a new topic 
```
docker ps | grep kafka
# Copy kafka hash
docker exec -it DOCKER_CONTAINER_ID /bin/bash

# if you only have one kafka running the follwing will work:
docker exec -it `docker ps | grep kafka | awk '{print $1}'` /bin/bash
# from inside the docker image
$KAFKA_HOME/bin/kafka-topics.sh --create --topic visit_log \
    --zookeeper zk --partitions 2 --replication-factor 1

```

Now, lets setup some visit data first. Please open the notebook [03 - Producer](/notebooks/03%20-%20Producer.ipynb) in a new tab and run through all the code there. Then return here.

In [1]:
import json
import pykafka
 
docker_ip = '192.168.99.100'
client = pykafka.KafkaClient(hosts="{}:9092".format(docker_ip))

In [None]:
client.topics

In [None]:
topic = client.topics['visit_log'] 

In [None]:
topic.partitions

In [None]:
c = topic.get_simple_consumer()

In [None]:
consumer = topic.get_simple_consumer()

In [None]:
message = consumer.consume(block=False)

In [None]:
message.offset

In [None]:
# WE HAVE OUR FIRST MESSAGE.
message.value

We can loop through all the messages with this code. But be careful if you have lots of messages in your topic!

In [None]:
%%time
count = 0

while True:
    message = consumer.consume(block=False)
    if not message:
        break
    # YOUR CODE HERE
    count += 1
print "We counsumed: {} messages".format(count)

So your first coding task. 
Given this stream answers your bosses first question. How many unique users have visited the site?

In [None]:
consumer = topic.get_simple_consumer()

In [None]:
%%time

while True:
    message = consumer.consume(block=False)
    if not message:
        break
    # YOUR CODE HERE

In [None]:
%%time
counter = Counter()
count = 0

while True:
    message = consumer.consume(block=False)
    if not message:
        break
    # YOUR CODE HERE
    user_id = message.value.split(':')[1]
    counter[user_id] += 1
    count += 1
print "We counsumed: {} messages".format(count)

In [None]:
count

In [None]:
%matplotlib inline
import pandas as pd
pd.Series(counter.values()).value_counts()

In [None]:
import seaborn as sns
pd.Series(counter.values()).hist()

# Enrichment

A common tasks when processing streams is enriching the data.

Take for example, how many users do we have per country?

We need to enrich the current data because it has no idea of country.

In [None]:
# lets build a lookup table in in-memory
import random
countries = ['USA', 'USA', 'USA', 'UK', 'UK', 'Canada', 'Germany']
user_country_lookup = {str(i): random.choice(countries) for i in range(10000)}

In [None]:
consumer = topic.get_simple_consumer()

In [None]:
from collections import Counter
counter = Counter()

In [None]:
count = 0
while True:
    message = consumer.consume(block=False)
    if not message:
        break
    # YOUR CODE HERE
    user_id = message.value.split(':')[1]
    if user_id in user_country_lookup:
        counter[user_country_lookup[user_id]] += 1
    else:
        counter['Unknown'] += 1
    count += 1
print "We counsumed: {} messages".format(count)

Now ACME eCommerce LLC is an agile, out-side-the-box, Big Data driven, industry disrupting innovation machine. So your not the only one who wants this data...

All your wonderful work on data enrichment has gotten around. People are clamoring to get their hands on it.

How?

First lets create a new kafka topic.

Like we did before open a terminal. Open a bash session inside the kafka image and 

```
# if you only have one kafka running the follwing will work:
docker exec -it `docker ps | grep kafka | awk '{print $1}'` /bin/bash

$KAFKA_HOME/bin/kafka-topics.sh --create --topic enriched_visit_log \
    --zookeeper zk --partitions 2 --replication-factor 1

```

In [None]:
client.topics

In [None]:
enriched_visit_topic = client.topics['enriched_visit_log']
enriched_visit_producer = enriched_visit_topic.get_producer()

In [None]:
consumer = topic.get_simple_consumer()

In [None]:
def process_message(message):
    """
    Takes a raw visit message and 
    returns a dict of attributes about that message.
    """
    message_data = {}
    # WRITE YOUR CODE HERE
    user_id = message.value.split(':')[1]
    message_data['user_id'] = user_id
    
    return message_data

In [None]:
process_message(message)

In [None]:
# now process the topic, saving the results to another topic
while True:
    message = consumer.consume(block=False)
    if not message:
        break
    # YOUR CODE HERE
    enriched_message = process_message(message)
    enriched_visit_producer.produce([json.dumps(enriched_message)])

In [None]:
enriched_visit_consumer = enriched_visit_topic.get_simple_consumer()

In [None]:
message = enriched_visit_consumer.consume(block=False)

In [None]:
message.value

# Stateful processing

What we have built so far is all good but there is a problem... its got all this nasty state.

### What do you mean by state? 

Examples in sql.

**Stateless**:
```sql
select ltrim(user_id, 0, 1) 
    from visit_log;
```

```py
# Stream version
for message in visit_consumer:
    user_id = message.value.split(':')[1]
```


**Stateful:**

```sql
select user_id, count(*) 
    from visit_log 
    group by user_id;
```

```py
# Stream version
counter = Counter()
for message in visit_consumer:
    user_id = message.value.split(':')[1]
    counter[user_id] += 1
```


Lets write this code to simulate real life...

```py
import random
import sys
# Stream version
counter = Counter()
for message in visit_consumer:
    user_id = message.value.split(':')[1]
    counter[user_id] += 1
    
    # 1% chance your process is nuked by a gremlin
    if random.random() > 0.01:
        sys.exit(-1)
```


What happened to your counter...

and how can we fix this?

# Redis

In [None]:
import redis
redis_client = redis.Redis(docker_ip, 6379)

In [None]:
%%timeit
for i in range(1000):
    redis_client.set(i, i)

In [None]:
import os
import ipyparallel as ipp

rc = ipp.Client()
ar = rc[:].apply_async(os.getpid)
pid_map = ar.get_dict()

In [None]:
v = rc[:]

In [None]:
v.scatter('keys', range(100000)).get()

In [None]:
%%px
keys[0]

In [None]:
%%time
%%px
import redis
docker_ip = '192.168.99.100'
redis_client = redis.Redis(docker_ip, 6379)
pipe = redis_client.pipeline()
for i in keys:
    pipe.set(i, i)
    if i % 10000 == 0:
        pipe.execute()
        pipe = redis_client.pipeline()

_ = pipe.execute()

In [None]:
%%time
%%px
import redis
docker_ip = '192.168.99.100'
redis_client = redis.Redis(docker_ip, 6379)
for i in keys:
    redis_client.set(i, i)

In [None]:
5000000.0 / 38

In [None]:
%%time
pipe = redis_client.pipeline()
for i in range(10000000):
    pipe.set(i, i)
    if i % 100000 == 0:
        pipe.execute()
        pipe = redis_client.pipeline()

pipe.execute()

In [None]:
consumer = client.topics['output'].get_simple_consumer()


In [None]:
balanced_consumer.consume().value

In [6]:
balanced_consumer = client.topics['input'].get_balanced_consumer(
    consumer_group='testgroup7',
    auto_commit_enable=True,
    zookeeper_connect='192.168.99.100:2181'
)

ERROR:pykafka.cluster:Error discovering offset manager.
ERROR:pykafka.cluster:Error discovering offset manager.


In [None]:
%%time

count = 0

while True:
    message = balanced_consumer.consume(block=False)
    if not message:
        break
    # YOUR CODE HERE
    count += 1
print "We counsumed: {} messages".format(count)


In [10]:
balanced_consumer.consume(block=False)

<pykafka.balancedconsumer.BalancedConsumer at 0x10e6a1ea8 (consumer_group=testgroup7)>