In [1]:
import json
import uuid

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin.new_topic import NewTopic
from kafka.errors import TopicAlreadyExistsError

### Configuration Parameters 

> **TODO:** Change the configuration prameters to the appropriate values for your setup.

In [2]:
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Bret',
    last_name='Young'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

config

{'bootstrap_servers': ['kafka.kafka.svc.cluster.local:9092'],
 'first_name': 'Bret',
 'last_name': 'Young',
 'client_id': 'YoungBret',
 'topic_prefix': 'YoungBret'}

### Create Topic Utility Function

The `create_kafka_topic` helps create a Kafka topic based on your configuration settings.  For instance, if your first name is *John* and your last name is *Doe*, `create_kafka_topic('locations')` will create a topic with the name `DoeJohn-locations`.  The function will not create the topic if it already exists. 

In [3]:
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    name = '{}-{}'.format(topic_prefix, topic_name)
    
    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers, 
        client_id=client_id
    )
    
    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Created topic "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))
    
create_kafka_topic('locations')

Topic "YoungBret-locations" already exists


### Kafka Producer

The following code creates a `KafkaProducer` object which you can use to send Python objects that are serialized as JSON.

**Note:** This producer serializes Python objects as JSON. This means that object must be JSON serializable.  As an example, Python `DateTime` values are not JSON serializable and must be converted to a string (e.g. ISO 8601) or a numeric value (e.g. a Unix timestamp) before being sent.

In [4]:
producer = KafkaProducer(
  bootstrap_servers=config['bootstrap_servers'],
  value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

### Send Data Function

The `send_data` function sends a Python object to a Kafka topic. This function adds the `topic_prefix` to the topic so `send_data('locations', data)` sends a JSON serialized message to `DoeJohn-locations`. The function also registers callbacks to let you know if the message has been sent or if an error has occured. 

In [5]:
def on_send_success(record_metadata):
    print('Message sent:\n    Topic: "{}"\n    Partition: {}\n    Offset: {}'.format(
        record_metadata.topic,
        record_metadata.partition,
        record_metadata.offset
    ))
    
def on_send_error(excp):
    print('I am an errback', exc_info=excp)
    # handle exception

def send_data(topic, data, config=config, producer=producer, msg_key=None):
    topic_prefix = config['topic_prefix']
    topic_name = '{}-{}'.format(topic_prefix, topic)
    
    if msg_key is not None:
        key = msg_key
    else:
        key = uuid.uuid4().hex
    
    producer.send(
        topic_name, 
        value=data,
        key=key.encode('utf-8')
    ).add_callback(on_send_success).add_errback(on_send_error)

In [6]:
example_data = dict(
    key1='value1',
    key2='value2'
)

send_data('locations', example_data)

Message sent:
    Topic: "YoungBret-locations"
    Partition: 0
    Offset: 1066


In [7]:
from collections import namedtuple  
import pyarrow.parquet as pq

location_columns = [
    'offset',
    'id',
    'ride_id',
    'uuid',
    'course',
    'latitude',
    'longitude',
    'geohash',
    'speed',
    'accuracy',
    't'
]

Location = namedtuple('Location', location_columns)

def read_locations():
    df = pq.ParquetDataset(
        '/home/jovyan/dsc650/data/processed/bdd/locations',
    ).read_pandas().to_pandas()
    
    df = df[location_columns].sort_values(by=['offset'])
    
    records = [Location(*record) for record in df.to_records(index=False)]
    
    return records
    

locations = read_locations()
locations[0:2]

[Location(offset=1.0779125295566454, id='85c61911b7fe2ced1000c33c9e932706', ride_id='6760ffa3f41908695d1405b776c3e8d5', uuid='dad7eae44e784b549c8c5a3aa051a8c7', course=158.203125, latitude=40.677641336844, longitude=-73.81793000742218, geohash='dr5x2jpkmtcy', speed=2.119999885559082, accuracy=10.0, t='000.0'),
 Location(offset=1.525060886522843, id='58682c5d48cad9d9e103431d773615bf', ride_id='c9a2b46c9aa515b632eddc45c4868482', uuid='19b9aa10588646b3bf22c9b4865a7995', course=299.619140625, latitude=40.76287002542555, longitude=-73.96194855681718, geohash='dr5ruuwscttz', speed=0.0, accuracy=10.0, t='000.0')]

In [8]:
import numpy as np

def myconverter(date):
    if isinstance(date, np.datetime64):
        return date.__str__()

In [9]:
acceleration_columns = [
    'offset',
    'id',
    'ride_id',
    'uuid',
    'timestamp',
    'x',
    'y',
    'z',
    'timelapse',
    'filename',
    't'
]

Acceleration = namedtuple('Acceleration', acceleration_columns)

def read_accelerations():
    df = pq.ParquetDataset(
        '/home/jovyan/dsc650/data/processed/bdd/accelerations',
    ).read_pandas().to_pandas()

    df['timestamp'] = df['timestamp'].apply(lambda x: x.strftime('%Y-%m-%dT%H:%M:%S.%f')) # "1970-01-01T00:25:03.882586000"
    
    df['timelapse'] = df['timelapse'].astype(str)
    
    df = df[acceleration_columns].sort_values(by=['offset'])
    
    records = [Acceleration(*record) for record in df.to_records(index=False)]
    
    return records
    

accelerations = read_accelerations()
accelerations[0:2]

[Acceleration(offset=0.8220608865228429, id='58682c5d48cad9d9e103431d773615bf', ride_id='c9a2b46c9aa515b632eddc45c4868482', uuid='19b9aa10588646b3bf22c9b4865a7995', timestamp='1970-01-01T00:25:03.882586', x=-0.994, y=0.045, z=-0.036000000000000004, timelapse='False', filename='e2f795a7-6a7d-4500-b5d7-4569de996811.mov', t='000.0'),
 Acceleration(offset=0.8420608865228429, id='58682c5d48cad9d9e103431d773615bf', ride_id='c9a2b46c9aa515b632eddc45c4868482', uuid='19b9aa10588646b3bf22c9b4865a7995', timestamp='1970-01-01T00:25:03.882586', x=-0.998, y=0.046, z=-0.04, timelapse='False', filename='e2f795a7-6a7d-4500-b5d7-4569de996811.mov', t='000.0')]

In [10]:
# build a timer
import time
import heapq

Sec = 0 

l = list(locations)
a = list(accelerations)

# Begin Process
while Sec < 123:
    
    Sec += 1
    time.sleep(1)
    
    for i in range(0, len(locations)):
        if float(l[0][10]) <= float(Sec):
            send_data('locations', heapq.heappop(l))
    for i in range(0, len(accelerations)):
        if float(a[0][10]) <= float(Sec):
            send_data('accelerations', heapq.heappop(a))  
            
            

Message sent:
    Topic: "YoungBret-locations"
    Partition: 0
    Offset: 1067
Message sent:
    Topic: "YoungBret-locations"
    Partition: 0
    Offset: 1068
Message sent:
    Topic: "YoungBret-accelerations"
    Partition: 0
    Offset: 51451
Message sent:
    Topic: "YoungBret-accelerations"
    Partition: 0
    Offset: 51452
Message sent:
    Topic: "YoungBret-accelerations"
    Partition: 0
    Offset: 51453
Message sent:
    Topic: "YoungBret-accelerations"
    Partition: 0
    Offset: 51454
Message sent:
    Topic: "YoungBret-accelerations"
    Partition: 0
    Offset: 51455
Message sent:
    Topic: "YoungBret-accelerations"
    Partition: 0
    Offset: 51456
Message sent:
    Topic: "YoungBret-accelerations"
    Partition: 0
    Offset: 51457
Message sent:
    Topic: "YoungBret-accelerations"
    Partition: 0
    Offset: 51458
Message sent:
    Topic: "YoungBret-accelerations"
    Partition: 0
    Offset: 51459
Message sent:
    Topic: "YoungBret-accelerations"
    Partition

KeyboardInterrupt: 